public class EventHubConsumer extends Object implements Closeable
EventData from a specific Event Hub partition in the context of a specific
consumer group.
EventHubConsumer is created where EventHubConsumerOptions.getOwnerLevel() has a
value, then Event Hubs service will guarantee only one active consumer exists per partitionId and consumer group
combination. This consumer is sometimes referred to as an "Epoch Consumer."EventHubConsumerOptions.getOwnerLevel() when creating consumers. This non-exclusive consumer is sometimes
referred to as a "Non-Epoch Consumer."Creating a synchronous consumer
Create anEventHubConsumer using EventHubClient.
EventHubClientclient = newEventHubClientBuilder() .connectionString("event-hub-instance-connection-string") .buildClient();StringpartitionId = "0";StringconsumerGroup = "$DEFAULT";EventHubConsumerconsumer = client.createConsumer(consumerGroup, partitionId,EventPosition.latest());
Consuming events from an Event Hub
Events can be consumed usingEventHubConsumer.receive(int) or EventHubConsumer.receive(int, Duration). The call to `receive`
completes and returns an IterableStream when either the number of events is reached, or the
timeout duration is reached.
// Obtain partitionId from EventHubClient.getPartitionIds().StringpartitionId = "0";InstanttwelveHoursAgo =Instant.now().minus(Duration.ofHours(12));EventHubConsumerconsumer = client.createConsumer(EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME, partitionId,EventPosition.fromEnqueuedTime(twelveHoursAgo));IterableStream<EventData> events = consumer.receive(100,Duration.ofSeconds(30)); for (EventDataevent : events) { // For each event, perform some sort of processing.System.out.print("Event received: " + event.getSequenceNumber()); } // Gets the next set of events to consume and process.IterableStream<EventData> nextEvents = consumer.receive(100,Duration.ofSeconds(30));
| Modifier and Type | Method and Description |
|---|---|
void |
close() |
LastEnqueuedEventProperties |
getLastEnqueuedEventProperties()
A set of information about the last enqueued event of a partition, as observed by the consumer as events are
received from the Event Hubs service.
|
IterableStream<EventData> |
receive(int maximumMessageCount)
Receives a batch of EventData from the Event Hub partition.
|
IterableStream<EventData> |
receive(int maximumMessageCount,
Duration maximumWaitTime)
Receives a batch of EventData from the Event Hub partition
|
public IterableStream<EventData> receive(int maximumMessageCount)
maximumMessageCount - The maximum number of messages to receive in this batch.EventData that was received. The iterable contains up to maximumMessageCount
events.IllegalArgumentException - if maximumMessageCount is less than 1.public IterableStream<EventData> receive(int maximumMessageCount, Duration maximumWaitTime)
maximumMessageCount - The maximum number of messages to receive in this batch.maximumWaitTime - The maximum amount of time to wait to build up the requested message count for the
batch; if not specified, the default wait time specified when the consumer was created will be used.EventData that was received. The iterable contains up to maximumMessageCount
events.NullPointerException - if maximumWaitTime is null.IllegalArgumentException - if maximumMessageCount is less than 1 or maximumWaitTime is
zero or a negative duration.public LastEnqueuedEventProperties getLastEnqueuedEventProperties()
null if EventHubConsumerOptions.getTrackLastEnqueuedEventProperties() was not set when
creating the consumer. Otherwise, the properties describing the most recently enqueued event in the
partition.public void close()
throws IOException
close in interface Closeableclose in interface AutoCloseableIOExceptionCopyright © 2019 Microsoft Corporation. All rights reserved.