Class EventHubConsumerAsyncClient

java.lang.Object
com.azure.messaging.eventhubs.EventHubConsumerAsyncClient
All Implemented Interfaces:
Closeable, AutoCloseable

public class EventHubConsumerAsyncClient extends Object implements Closeable
An asynchronous consumer responsible for reading EventData from either a specific Event Hub partition or all partitions in the context of a specific consumer group.

Creating an EventHubConsumerAsyncClient

 // The required parameters are `consumerGroup` and a way to authenticate with Event Hubs using credentials.
 EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
     .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};"
         + "SharedAccessKey={key};EntityPath={eh-name}")
     .consumerGroup("consumer-group-name")
     .buildAsyncConsumerClient();
 

Consuming events a single partition from Event Hub

 // Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
 String partitionId = "0";
 EventPosition startingPosition = EventPosition.latest();

 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
     .subscribe(partitionEvent -> {
         PartitionContext partitionContext = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
         System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
     }, error -> System.err.print(error.toString()));
 

Viewing latest partition information

Latest partition information as events are received can by setting setTrackLastEnqueuedEventProperties to true. As events come in, explore the PartitionEvent object.

 // Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for
 // each event that is received.
 ReceiveOptions receiveOptions = new ReceiveOptions()
     .setTrackLastEnqueuedEventProperties(true);

 // Receives events from partition "0" as they come in.
 consumer.receiveFromPartition("0", EventPosition.earliest(), receiveOptions)
     .subscribe(partitionEvent -> {
         LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
         System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",
             properties.getRetrievalTime(),
             properties.getSequenceNumber());
     });
 

Rate limiting consumption of events from Event Hub

For event consumers that need to limit the number of events they receive at a given time, they can use BaseSubscriber.request(long).

 consumer.receiveFromPartition(partitionId, EventPosition.latest()).subscribe(new BaseSubscriber<PartitionEvent>() {
     private static final int NUMBER_OF_EVENTS = 5;
     private final AtomicInteger currentNumberOfEvents = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 events at a time.
         request(NUMBER_OF_EVENTS);
     }

     @Override
     protected void hookOnNext(PartitionEvent value) {
         // Process the EventData

         // If the number of events we have currently received is a multiple of 5, that means we have reached the
         // last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that
         // the subscriber is ready to get more events from upstream.
         if (currentNumberOfEvents.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_EVENTS);
         }
     }
 });
 

Receiving from all partitions

 // Receives events from all partitions from the beginning of each partition.
 consumer.receive(true).subscribe(partitionEvent -> {
     PartitionContext context = partitionEvent.getPartitionContext();
     EventData event = partitionEvent.getData();
     System.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(), context.getPartitionId());
 });
 
  • Method Details

    • getFullyQualifiedNamespace

      public String getFullyQualifiedNamespace()
      Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.
      Returns:
      The fully qualified Event Hubs namespace that the connection is associated with
    • getEventHubName

      public String getEventHubName()
      Gets the Event Hub name this client interacts with.
      Returns:
      The Event Hub name this client interacts with.
    • getConsumerGroup

      public String getConsumerGroup()
      Gets the consumer group this consumer is reading events as a part of.
      Returns:
      The consumer group this consumer is reading events as a part of.
    • getEventHubProperties

      public Mono<EventHubProperties> getEventHubProperties()
      Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
      Returns:
      The set of information for the Event Hub that this client is associated with.
    • getPartitionIds

      public Flux<String> getPartitionIds()
      Retrieves the identifiers for the partitions of an Event Hub.
      Returns:
      A Flux of identifiers for the partitions of an Event Hub.
    • getPartitionProperties

      public Mono<PartitionProperties> getPartitionProperties(String partitionId)
      Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.
      Parameters:
      partitionId - The unique identifier of a partition associated with the Event Hub.
      Returns:
      The set of information for the requested partition under the Event Hub this client is associated with.
      Throws:
      NullPointerException - if partitionId is null.
    • receiveFromPartition

      public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition)
      Consumes events from a single partition starting at startingPosition.
      Parameters:
      partitionId - Identifier of the partition to read events from.
      startingPosition - Position within the Event Hub partition to begin consuming events.
      Returns:
      A stream of events for this partition starting from startingPosition.
      Throws:
      NullPointerException - if partitionId, or startingPosition is null.
      IllegalArgumentException - if partitionId is an empty string.
    • receiveFromPartition

      public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)
      Consumes events from a single partition starting at startingPosition with a set of receive options.
      • If receive is invoked where ReceiveOptions.getOwnerLevel() has a value, then Event Hubs service will guarantee only one active consumer exists per partitionId and consumer group combination. This receive operation is sometimes referred to as an "Epoch Consumer".
      • Multiple consumers per partitionId and consumer group combination can be created by not setting ReceiveOptions.getOwnerLevel() when invoking receive operations. This non-exclusive consumer is sometimes referred to as a "Non-Epoch Consumer."
      Parameters:
      partitionId - Identifier of the partition to read events from.
      startingPosition - Position within the Event Hub partition to begin consuming events.
      receiveOptions - Options when receiving events from the partition.
      Returns:
      A stream of events for this partition. If a stream for the events was opened before, the same position within that partition is returned. Otherwise, events are read starting from startingPosition.
      Throws:
      NullPointerException - if partitionId, startingPosition, receiveOptions is null.
      IllegalArgumentException - if partitionId is an empty string.
    • receive

      public Flux<PartitionEvent> receive()
      Consumes events from all partitions starting from the beginning of each partition.

      This method is not recommended for production use; the EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput. It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a given partition or subset of partitions.

      Returns:
      A stream of events for every partition in the Event Hub starting from the beginning of each partition.
    • receive

      public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent)
      Consumes events from all partitions.

      This method is not recommended for production use; the EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput. It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a given partition or subset of partitions.

      Parameters:
      startReadingAtEarliestEvent - true to begin reading at the first events available in each partition; otherwise, reading will begin at the end of each partition seeing only new events as they are published.
      Returns:
      A stream of events for every partition in the Event Hub.
    • receive

      public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)
      Consumes events from all partitions configured with a set of receiveOptions.

      This method is not recommended for production use; the EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput. It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a given partition or subset of partitions.

      • If receive is invoked where ReceiveOptions.getOwnerLevel() has a value, then Event Hubs service will guarantee only one active consumer exists per partitionId and consumer group combination. This receive operation is sometimes referred to as an "Epoch Consumer".
      • Multiple consumers per partitionId and consumer group combination can be created by not setting ReceiveOptions.getOwnerLevel() when invoking receive operations. This non-exclusive consumer is sometimes referred to as a "Non-Epoch Consumer."
      Parameters:
      startReadingAtEarliestEvent - true to begin reading at the first events available in each partition; otherwise, reading will begin at the end of each partition seeing only new events as they are published.
      receiveOptions - Options when receiving events from each Event Hub partition.
      Returns:
      A stream of events for every partition in the Event Hub.
      Throws:
      NullPointerException - if receiveOptions is null.
    • close

      public void close()
      Disposes of the consumer by closing the underlying connection to the service.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
    • getIdentifier

      public String getIdentifier()
      Gets the client identifier.
      Returns:
      The unique identifier string for current client.