Class EventProcessorClientBuilder

java.lang.Object
com.azure.messaging.eventhubs.EventProcessorClientBuilder
All Implemented Interfaces:
AmqpTrait<EventProcessorClientBuilder>, AzureNamedKeyCredentialTrait<EventProcessorClientBuilder>, AzureSasCredentialTrait<EventProcessorClientBuilder>, ConfigurationTrait<EventProcessorClientBuilder>, ConnectionStringTrait<EventProcessorClientBuilder>, TokenCredentialTrait<EventProcessorClientBuilder>

This class provides a fluent builder API to help aid the configuration and instantiation of the EventProcessorClient. Calling buildEventProcessorClient() constructs a new instance of EventProcessorClient.

To create an instance of EventProcessorClient, the following fields are required:

Creating an EventProcessorClient

 public EventProcessorClient createEventProcessor() {
     String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};"
         + "SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";

     EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
         .consumerGroup("consumer-group")
         .checkpointStore(new SampleCheckpointStore())
         .processEvent(eventContext -> {
             System.out.printf("Partition id = %s and sequence number of event = %s%n",
                 eventContext.getPartitionContext().getPartitionId(),
                 eventContext.getEventData().getSequenceNumber());
         })
         .processError(errorContext -> {
             System.out.printf("Error occurred in partition processor for partition %s, %s%n",
                 errorContext.getPartitionContext().getPartitionId(),
                 errorContext.getThrowable());
         })
         .connectionString(connectionString)
         .buildEventProcessorClient();
     return eventProcessorClient;
 }
 
See Also:
  • Field Details

    • DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL

      public static final Duration DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
      Default load balancing update interval. Balancing interval should account for latency between the client and the storage account.
    • DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL

      public static final Duration DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
      Default ownership expiration.
  • Constructor Details

  • Method Details

    • fullyQualifiedNamespace

      public EventProcessorClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
      Sets the fully qualified name for the Event Hubs namespace.
      Parameters:
      fullyQualifiedNamespace - The fully qualified name for the Event Hubs namespace. This is likely to be similar to "{your-namespace}.servicebus.windows.net".
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      IllegalArgumentException - if fullyQualifiedNamespace is an empty string.
      NullPointerException - if fullyQualifiedNamespace is null.
    • eventHubName

      public EventProcessorClientBuilder eventHubName(String eventHubName)
      Sets the name of the Event Hub to connect the client to.
      Parameters:
      eventHubName - The name of the Event Hub to connect the client to.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      IllegalArgumentException - if eventHubName is an empty string.
      NullPointerException - if eventHubName is null.
    • connectionString

      public EventProcessorClientBuilder connectionString(String connectionString)
      Sets the credential information given a connection string to the Event Hub instance.

      If the connection string is copied from the Event Hubs namespace, it will likely not contain the name to the desired Event Hub, which is needed. In this case, the name can be added manually by adding "EntityPath=EVENT_HUB_NAME" to the end of the connection string. For example, "EntityPath=telemetry-hub".

      If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that Event Hub will result in a connection string that contains the name.

      Specified by:
      connectionString in interface ConnectionStringTrait<EventProcessorClientBuilder>
      Parameters:
      connectionString - The connection string to use for connecting to the Event Hub instance. It is expected that the Event Hub name and the shared access key properties are contained in this connection string.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      NullPointerException - if connectionString is null.
      IllegalArgumentException - if connectionString is empty. Or, the connectionString does not contain the "EntityPath" key, which is the name of the Event Hub instance.
      AzureException - If the shared access signature token credential could not be created using the connection string.
    • connectionString

      public EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)
      Sets the credential information given a connection string to the Event Hubs namespace and name to a specific Event Hub instance.
      Parameters:
      connectionString - The connection string to use for connecting to the Event Hubs namespace; it is expected that the shared access key properties are contained in this connection string, but not the Event Hub name.
      eventHubName - The name of the Event Hub to connect the client to.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      NullPointerException - if connectionString or eventHubName is null.
      IllegalArgumentException - if connectionString or eventHubName is an empty string. Or, if the connectionString contains the Event Hub name.
      AzureException - If the shared access signature token credential could not be created using the connection string.
    • configuration

      public EventProcessorClientBuilder configuration(Configuration configuration)
      Sets the configuration store that is used during construction of the service client. If not specified, the default configuration store is used to configure the EventHubAsyncClient. Use Configuration.NONE to bypass using configuration settings during construction.
      Specified by:
      configuration in interface ConfigurationTrait<EventProcessorClientBuilder>
      Parameters:
      configuration - The configuration store used to configure the EventHubAsyncClient.
      Returns:
      The updated EventProcessorClientBuilder object.
    • credential

      public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
      Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
      Parameters:
      fullyQualifiedNamespace - The fully qualified name for the Event Hubs namespace. This is likely to be similar to "{your-namespace}.servicebus.windows.net".
      eventHubName - The name of the Event Hub to connect the client to.
      credential - The token credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      IllegalArgumentException - if fullyQualifiedNamespace or eventHubName is an empty string.
      NullPointerException - if fullyQualifiedNamespace, eventHubName, credentials is null.
    • credential

      public EventProcessorClientBuilder credential(TokenCredential credential)
      Sets the TokenCredential used to authorize requests sent to the service. Refer to the Azure SDK for Java identity and authentication documentation for more details on proper usage of the TokenCredential type.
      Specified by:
      credential in interface TokenCredentialTrait<EventProcessorClientBuilder>
      Parameters:
      credential - The token credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      NullPointerException - if credential is null.
    • credential

      public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)
      Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
      Parameters:
      fullyQualifiedNamespace - The fully qualified name for the Event Hubs namespace. This is likely to be similar to "{your-namespace}.servicebus.windows.net".
      eventHubName - The name of the Event Hub to connect the client to.
      credential - The shared access name and key credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      IllegalArgumentException - if fullyQualifiedNamespace or eventHubName is an empty string.
      NullPointerException - if fullyQualifiedNamespace, eventHubName, credentials is null.
    • credential

      public EventProcessorClientBuilder credential(AzureNamedKeyCredential credential)
      Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
      Specified by:
      credential in interface AzureNamedKeyCredentialTrait<EventProcessorClientBuilder>
      Parameters:
      credential - The shared access name and key credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      NullPointerException - if credentials is null.
    • credential

      public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)
      Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
      Parameters:
      fullyQualifiedNamespace - The fully qualified name for the Event Hubs namespace. This is likely to be similar to "{your-namespace}.servicebus.windows.net".
      eventHubName - The name of the Event Hub to connect the client to.
      credential - The shared access signature credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      IllegalArgumentException - if fullyQualifiedNamespace or eventHubName is an empty string.
      NullPointerException - if fullyQualifiedNamespace, eventHubName, credentials is null.
    • credential

      public EventProcessorClientBuilder credential(AzureSasCredential credential)
      Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
      Specified by:
      credential in interface AzureSasCredentialTrait<EventProcessorClientBuilder>
      Parameters:
      credential - The shared access signature credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      NullPointerException - if credentials is null.
    • customEndpointAddress

      public EventProcessorClientBuilder customEndpointAddress(String customEndpointAddress)
      Sets a custom endpoint address when connecting to the Event Hubs service. This can be useful when your network does not allow connecting to the standard Azure Event Hubs endpoint address, but does allow connecting through an intermediary. For example: https://my.custom.endpoint.com:55300.

      If no port is specified, the default port for the transport type is used.

      Parameters:
      customEndpointAddress - The custom endpoint address.
      Returns:
      The updated EventProcessorClientBuilder object.
      Throws:
      IllegalArgumentException - if customEndpointAddress cannot be parsed into a valid URL.
    • proxyOptions

      public EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)
      Sets the proxy configuration to use for EventHubAsyncClient. When a proxy is configured, AmqpTransportType.AMQP_WEB_SOCKETS must be used for the transport type.
      Specified by:
      proxyOptions in interface AmqpTrait<EventProcessorClientBuilder>
      Parameters:
      proxyOptions - The proxy options to use.
      Returns:
      The updated EventProcessorClientBuilder object.
    • transportType

      public EventProcessorClientBuilder transportType(AmqpTransportType transport)
      Sets the transport type by which all the communication with Azure Event Hubs occurs. Default value is AmqpTransportType.AMQP.
      Specified by:
      transportType in interface AmqpTrait<EventProcessorClientBuilder>
      Parameters:
      transport - The transport type to use.
      Returns:
      The updated EventProcessorClientBuilder object.
    • retry

      Deprecated.
      Sets the retry policy for EventHubAsyncClient. If not specified, the default retry options are used.
      Parameters:
      retryOptions - The retry policy to use.
      Returns:
      The updated EventProcessorClientBuilder object.
    • retryOptions

      public EventProcessorClientBuilder retryOptions(AmqpRetryOptions retryOptions)
      Sets the retry policy for EventHubAsyncClient. If not specified, the default retry options are used.
      Specified by:
      retryOptions in interface AmqpTrait<EventProcessorClientBuilder>
      Parameters:
      retryOptions - The retry options to use.
      Returns:
      The updated EventProcessorClientBuilder object.
    • clientOptions

      public EventProcessorClientBuilder clientOptions(ClientOptions clientOptions)
      Sets the client options for the processor client. The application id set on the client options will be used for tracing. The headers set on ClientOptions are currently not used but can be used in later releases to add to AMQP message.
      Specified by:
      clientOptions in interface AmqpTrait<EventProcessorClientBuilder>
      Parameters:
      clientOptions - The client options.
      Returns:
      The updated EventProcessorClientBuilder object.
    • consumerGroup

      public EventProcessorClientBuilder consumerGroup(String consumerGroup)
      Sets the consumer group name from which the EventProcessorClient should consume events.
      Parameters:
      consumerGroup - The consumer group name this EventProcessorClient should consume events.
      Returns:
      The updated EventProcessorClientBuilder instance.
      Throws:
      NullPointerException - if consumerGroup is null.
    • checkpointStore

      public EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)
      Sets the CheckpointStore the EventProcessorClient will use for storing partition ownership and checkpoint information.

      Users can, optionally, provide their own implementation of CheckpointStore which will store ownership and checkpoint information.

      Parameters:
      checkpointStore - Implementation of CheckpointStore.
      Returns:
      The updated EventProcessorClientBuilder instance.
      Throws:
      NullPointerException - if checkpointStore is null.
    • loadBalancingUpdateInterval

      public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)
      The time interval between load balancing update cycles. This is also generally the interval at which ownership of partitions are renewed. By default, this interval is set to 10 seconds.
      Parameters:
      loadBalancingUpdateInterval - The time duration between load balancing update cycles.
      Returns:
      The updated EventProcessorClientBuilder instance.
      Throws:
      NullPointerException - if loadBalancingUpdateInterval is null.
      IllegalArgumentException - if loadBalancingUpdateInterval is zero or a negative duration.
    • partitionOwnershipExpirationInterval

      public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)
      The time duration after which the ownership of partition expires if it's not renewed by the owning processor instance. This is the duration that this processor instance will wait before taking over the ownership of partitions previously owned by an inactive processor. By default, this duration is set to a minute.
      Parameters:
      partitionOwnershipExpirationInterval - The time duration after which the ownership of partition expires.
      Returns:
      The updated EventProcessorClientBuilder instance.
      Throws:
      NullPointerException - if partitionOwnershipExpirationInterval is null.
      IllegalArgumentException - if partitionOwnershipExpirationInterval is zero or a negative duration.
    • loadBalancingStrategy

      public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)
      The LoadBalancingStrategy the event processor will use for claiming partition ownership. By default, a Balanced approach will be used.
      Parameters:
      loadBalancingStrategy - The LoadBalancingStrategy to use.
      Returns:
      The updated EventProcessorClientBuilder instance.
      Throws:
      NullPointerException - if loadBalancingStrategy is null.
    • prefetchCount

      public EventProcessorClientBuilder prefetchCount(int prefetchCount)
      Sets the count used by the receivers to control the number of events each consumer will actively receive and queue locally without regard to whether a receive operation is currently active.
      Parameters:
      prefetchCount - The number of events to queue locally.
      Returns:
      The updated EventHubClientBuilder object.
      Throws:
      IllegalArgumentException - if prefetchCount is less than 1 or greater than 8000.
    • processEvent

      public EventProcessorClientBuilder processEvent(Consumer<EventContext> processEvent)
      The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data.
      Parameters:
      processEvent - The callback that's called when an event is received by this EventProcessorClient.
      Returns:
      The updated EventProcessorClientBuilder instance.
      Throws:
      NullPointerException - if processEvent is null.
    • processEvent

      public EventProcessorClientBuilder processEvent(Consumer<EventContext> processEvent, Duration maxWaitTime)
      The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.
      Parameters:
      processEvent - The callback that's called when an event is received by this EventProcessorClient or when the max wait duration has expired.
      maxWaitTime - The max time duration to wait to receive an event before invoking this handler.
      Returns:
      The updated EventProcessorClient instance.
      Throws:
      NullPointerException - if processEvent is null.
    • processEventBatch

      public EventProcessorClientBuilder processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize)
      The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.
      Parameters:
      processEventBatch - The callback that's called when an event is received by this EventProcessorClient or when the max wait duration has expired.
      maxBatchSize - The maximum number of events that will be in the list when this callback is invoked.
      Returns:
      The updated EventProcessorClient instance.
      Throws:
      NullPointerException - if processEvent is null.
    • processEventBatch

      public EventProcessorClientBuilder processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime)
      The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.
       EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
           .consumerGroup("consumer-group")
           .checkpointStore(new SampleCheckpointStore())
           .processEventBatch(eventBatchContext -> {
               eventBatchContext.getEvents().forEach(eventData -> {
                   System.out.printf("Partition id = %s and sequence number of event = %s%n",
                       eventBatchContext.getPartitionContext().getPartitionId(),
                       eventData.getSequenceNumber());
               });
           }, 50, Duration.ofSeconds(30))
           .processError(errorContext -> {
               System.out.printf("Error occurred in partition processor for partition %s, %s%n",
                   errorContext.getPartitionContext().getPartitionId(),
                   errorContext.getThrowable());
           })
           .connectionString(connectionString)
           .buildEventProcessorClient();
       
      Parameters:
      processEventBatch - The callback that's called when an event is received or when the max wait duration has expired.
      maxBatchSize - The maximum number of events that will be in the list when this callback is invoked.
      maxWaitTime - The max time duration to wait to receive a batch of events upto the max batch size before invoking this callback.
      Returns:
      The updated EventProcessorClient instance.
      Throws:
      NullPointerException - if processEvent is null.
    • processError

      public EventProcessorClientBuilder processError(Consumer<ErrorContext> processError)
      The function that is called when an error occurs while processing events. The input contains the partition information where the error happened.
      Parameters:
      processError - The callback that's called when an error occurs while processing events.
      Returns:
      The updated EventProcessorClientBuilder instance.
    • processPartitionInitialization

      public EventProcessorClientBuilder processPartitionInitialization(Consumer<InitializationContext> initializePartition)
      The function that is called before processing starts for a partition. The input contains the partition information along with a default starting position for processing events that will be used in the case of a checkpoint unavailable in CheckpointStore. Users can update this position if a different starting position is preferred.
      Parameters:
      initializePartition - The callback that's called before processing starts for a partition
      Returns:
      The updated EventProcessorClientBuilder instance.
    • processPartitionClose

      public EventProcessorClientBuilder processPartitionClose(Consumer<CloseContext> closePartition)
      The function that is called when a processing for a partition stops. The input contains the partition information along with the reason for stopping the event processing for this partition.
      Parameters:
      closePartition - The callback that's called after processing for a partition stops.
      Returns:
      The updated EventProcessorClientBuilder instance.
    • trackLastEnqueuedEventProperties

      public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
      Sets whether or not the event processor should request information on the last enqueued event on its associated partition, and track that information as events are received.

      When information about the partition's last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition that it otherwise would not. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client.

      Parameters:
      trackLastEnqueuedEventProperties - true if the resulting events will keep track of the last enqueued information for that partition; false otherwise.
      Returns:
      The updated EventProcessorClientBuilder instance.
    • initialPartitionEventPosition

      public EventProcessorClientBuilder initialPartitionEventPosition(Map<String,EventPosition> initialPartitionEventPosition)
      Sets the map containing the event position to use for each partition if a checkpoint for the partition does not exist in CheckpointStore. This map is keyed off of the partition id. If there is no checkpoint in CheckpointStore and there is no entry in this map, the processing of the partition will start from latest position.
      Parameters:
      initialPartitionEventPosition - Map of initial event positions for partition ids.
      Returns:
      The updated EventProcessorClientBuilder instance.
    • buildEventProcessorClient

      public EventProcessorClient buildEventProcessorClient()
      This will create a new EventProcessorClient configured with the options set in this builder. Each call to this method will return a new instance of EventProcessorClient.

      All partitions processed by this EventProcessorClient will start processing from earliest available event in the respective partitions.

      Returns:
      A new instance of EventProcessorClient.
      Throws:
      NullPointerException - if processEvent or processError or checkpointStore or consumerGroup is null.
      IllegalArgumentException - if the credentials have not been set using either connectionString(String) or credential(String, String, TokenCredential). Or, if a proxy is specified but the transport type is not web sockets.