Class EventHubConsumerClient

java.lang.Object
com.azure.messaging.eventhubs.EventHubConsumerClient
All Implemented Interfaces:
Closeable, AutoCloseable

public class EventHubConsumerClient extends Object implements Closeable
A synchronous consumer responsible for reading EventData from an Event Hub partition in the context of a specific consumer group.

Creating a synchronous consumer

 // The required parameters are `consumerGroup`, and a way to authenticate with Event Hubs using credentials.
 EventHubConsumerClient consumer = new EventHubClientBuilder()
     .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};"
         + "SharedAccessKey={key};Entity-Path={hub-name}")
     .consumerGroup("$DEFAULT")
     .buildConsumerClient();
 

Consuming events from a single partition

Events from a single partition can be consumed using receiveFromPartition(String, int, EventPosition) or receiveFromPartition(String, int, EventPosition, Duration). The call to receiveFromPartition completes and returns an IterableStream when either the maximum number of events is received, or the timeout has elapsed.

 Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
 EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
 String partitionId = "0";

 // Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
 IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
     startingPosition, Duration.ofSeconds(30));

 Long lastSequenceNumber = -1L;
 for (PartitionEvent partitionEvent : events) {
     // For each event, perform some sort of processing.
     System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
     lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
 }

 // Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
 // If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
 // partition.
 if (lastSequenceNumber != -1L) {
     EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

     // Gets the next set of events from partition '0' to consume and process.
     IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
         nextPosition, Duration.ofSeconds(30));
 }
 
  • Method Details

    • getFullyQualifiedNamespace

      public String getFullyQualifiedNamespace()
      Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.
      Returns:
      The fully qualified Event Hubs namespace that the connection is associated with.
    • getEventHubName

      public String getEventHubName()
      Gets the Event Hub name this client interacts with.
      Returns:
      The Event Hub name this client interacts with.
    • getConsumerGroup

      public String getConsumerGroup()
      Gets the consumer group this consumer is reading events as a part of.
      Returns:
      The consumer group this consumer is reading events as a part of.
    • getEventHubProperties

      public EventHubProperties getEventHubProperties()
      Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
      Returns:
      The set of information for the Event Hub that this client is associated with.
    • getPartitionIds

      public IterableStream<String> getPartitionIds()
      Retrieves the identifiers for the partitions of an Event Hub.
      Returns:
      The set of identifiers for the partitions of an Event Hub.
    • getPartitionProperties

      public PartitionProperties getPartitionProperties(String partitionId)
      Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.
      Parameters:
      partitionId - The unique identifier of a partition associated with the Event Hub.
      Returns:
      The set of information for the requested partition under the Event Hub this client is associated with.
      Throws:
      NullPointerException - if partitionId is null.
    • receiveFromPartition

      public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition)
      Receives a batch of events from the Event Hub partition.
      Parameters:
      maximumMessageCount - The maximum number of messages to receive in this batch.
      partitionId - Identifier of the partition to read events from.
      startingPosition - Position within the Event Hub partition to begin consuming events.
      Returns:
      A set of PartitionEvent that was received. The iterable contains up to maximumMessageCount events. If a stream for the events was opened before, the same position within that partition is returned. Otherwise, events are read starting from startingPosition.
      Throws:
      NullPointerException - if partitionId, or startingPosition is null.
      IllegalArgumentException - if maximumMessageCount is less than 1, or if partitionId is an empty string.
    • receiveFromPartition

      public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime)
      Receives a batch of events from the Event Hub partition.
      Parameters:
      partitionId - Identifier of the partition to read events from.
      maximumMessageCount - The maximum number of messages to receive in this batch.
      startingPosition - Position within the Event Hub partition to begin consuming events.
      maximumWaitTime - The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified when the consumer was created will be used.
      Returns:
      A set of PartitionEvent that was received. The iterable contains up to maximumMessageCount events.
      Throws:
      NullPointerException - if partitionId, maximumWaitTime, or startingPosition is null.
      IllegalArgumentException - if maximumMessageCount is less than 1 or maximumWaitTime is zero or a negative duration.
    • receiveFromPartition

      public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions)
      Receives a batch of events from the Event Hub partition.
      Parameters:
      partitionId - Identifier of the partition to read events from.
      maximumMessageCount - The maximum number of messages to receive in this batch.
      startingPosition - Position within the Event Hub partition to begin consuming events.
      maximumWaitTime - The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified when the consumer was created will be used.
      receiveOptions - Options when receiving events from the partition.
      Returns:
      A set of PartitionEvent that was received. The iterable contains up to maximumMessageCount events.
      Throws:
      NullPointerException - if maximumWaitTime, startingPosition, partitionId, or receiveOptions is null.
      IllegalArgumentException - if maximumMessageCount is less than 1 or maximumWaitTime is zero or a negative duration.
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
    • getIdentifier

      public String getIdentifier()
      Gets the client identifier.
      Returns:
      The unique identifier string for current client.