Package com.azure.messaging.eventhubs
Class EventHubConsumerClient
java.lang.Object
com.azure.messaging.eventhubs.EventHubConsumerClient
- All Implemented Interfaces:
Closeable
,AutoCloseable
A synchronous consumer responsible for reading
EventData
from an Event Hub partition in the context of
a specific consumer group.
Creating a synchronous consumer
// The required parameters are `consumerGroup`, and a way to authenticate with Event Hubs using credentials. EventHubConsumerClient consumer = new EventHubClientBuilder() .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + "SharedAccessKey={key};Entity-Path={hub-name}") .consumerGroup("$DEFAULT") .buildConsumerClient();
Consuming events from a single partition
Events from a single partition can be consumed using receiveFromPartition(String, int, EventPosition)
or
receiveFromPartition(String, int, EventPosition, Duration)
. The call to receiveFromPartition
completes and returns an IterableStream
when either the maximum number of events is received, or the
timeout has elapsed.
Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12)); EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo); String partitionId = "0"; // Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed. IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100, startingPosition, Duration.ofSeconds(30)); Long lastSequenceNumber = -1L; for (PartitionEvent partitionEvent : events) { // For each event, perform some sort of processing. System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber()); lastSequenceNumber = partitionEvent.getData().getSequenceNumber(); } // Figure out what the next EventPosition to receive from is based on last event we processed in the stream. // If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the // partition. if (lastSequenceNumber != -1L) { EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false); // Gets the next set of events from partition '0' to consume and process. IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100, nextPosition, Duration.ofSeconds(30)); }
-
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
Gets the consumer group this consumer is reading events as a part of.Gets the Event Hub name this client interacts with.Retrieves information about an Event Hub, including the number of partitions present and their identifiers.Gets the fully qualified Event Hubs namespace that the connection is associated with.Gets the client identifier.Retrieves the identifiers for the partitions of an Event Hub.getPartitionProperties
(String partitionId) Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.receiveFromPartition
(String partitionId, int maximumMessageCount, EventPosition startingPosition) Receives a batch ofevents
from the Event Hub partition.receiveFromPartition
(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime) Receives a batch ofevents
from the Event Hub partition.receiveFromPartition
(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions) Receives a batch ofevents
from the Event Hub partition.
-
Method Details
-
getFullyQualifiedNamespace
Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to{yournamespace}.servicebus.windows.net
.- Returns:
- The fully qualified Event Hubs namespace that the connection is associated with.
-
getEventHubName
Gets the Event Hub name this client interacts with.- Returns:
- The Event Hub name this client interacts with.
-
getConsumerGroup
Gets the consumer group this consumer is reading events as a part of.- Returns:
- The consumer group this consumer is reading events as a part of.
-
getEventHubProperties
Retrieves information about an Event Hub, including the number of partitions present and their identifiers.- Returns:
- The set of information for the Event Hub that this client is associated with.
-
getPartitionIds
Retrieves the identifiers for the partitions of an Event Hub.- Returns:
- The set of identifiers for the partitions of an Event Hub.
-
getPartitionProperties
Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.- Parameters:
partitionId
- The unique identifier of a partition associated with the Event Hub.- Returns:
- The set of information for the requested partition under the Event Hub this client is associated with.
- Throws:
NullPointerException
- ifpartitionId
is null.
-
receiveFromPartition
public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition) Receives a batch ofevents
from the Event Hub partition.- Parameters:
maximumMessageCount
- The maximum number of messages to receive in this batch.partitionId
- Identifier of the partition to read events from.startingPosition
- Position within the Event Hub partition to begin consuming events.- Returns:
- A set of
PartitionEvent
that was received. The iterable contains up tomaximumMessageCount
events. If a stream for the events was opened before, the same position within that partition is returned. Otherwise, events are read starting fromstartingPosition
. - Throws:
NullPointerException
- ifpartitionId
, orstartingPosition
is null.IllegalArgumentException
- ifmaximumMessageCount
is less than 1, or ifpartitionId
is an empty string.
-
receiveFromPartition
public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime) Receives a batch ofevents
from the Event Hub partition.- Parameters:
partitionId
- Identifier of the partition to read events from.maximumMessageCount
- The maximum number of messages to receive in this batch.startingPosition
- Position within the Event Hub partition to begin consuming events.maximumWaitTime
- The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified when the consumer was created will be used.- Returns:
- A set of
PartitionEvent
that was received. The iterable contains up tomaximumMessageCount
events. - Throws:
NullPointerException
- ifpartitionId
,maximumWaitTime
, orstartingPosition
isnull
.IllegalArgumentException
- ifmaximumMessageCount
is less than 1 ormaximumWaitTime
is zero or a negative duration.
-
receiveFromPartition
public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions) Receives a batch ofevents
from the Event Hub partition.- Parameters:
partitionId
- Identifier of the partition to read events from.maximumMessageCount
- The maximum number of messages to receive in this batch.startingPosition
- Position within the Event Hub partition to begin consuming events.maximumWaitTime
- The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified when the consumer was created will be used.receiveOptions
- Options when receiving events from the partition.- Returns:
- A set of
PartitionEvent
that was received. The iterable contains up tomaximumMessageCount
events. - Throws:
NullPointerException
- ifmaximumWaitTime
,startingPosition
,partitionId
, orreceiveOptions
isnull
.IllegalArgumentException
- ifmaximumMessageCount
is less than 1 ormaximumWaitTime
is zero or a negative duration.
-
close
public void close()- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
-
getIdentifier
Gets the client identifier.- Returns:
- The unique identifier string for current client.
-