Class EventHubProducerAsyncClient

java.lang.Object
com.azure.messaging.eventhubs.EventHubProducerAsyncClient
All Implemented Interfaces:
Closeable, AutoCloseable

public class EventHubProducerAsyncClient extends Object implements Closeable
An asynchronous producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified when creating an EventDataBatch, the events may be automatically routed to an available partition or specific to a partition.

Allowing automatic routing of partitions is recommended when:

  • The sending of events needs to be highly available.
  • The event data should be evenly distributed among all available partitions.

If no partition id is specified, the following rules are used for automatically selecting one:

  1. Distribute the events equally amongst all available partitions using a round-robin approach.
  2. If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.

Create a producer and publish events to any partition

 // The required parameter is a way to authenticate with Event Hubs using credentials.
 // The connectionString provides a way to authenticate with Event Hub.
 EventHubProducerAsyncClient producer = new EventHubClientBuilder()
     .connectionString(
         "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}",
         "event-hub-name")
     .buildAsyncProducerClient();

 // Creating a batch without options set, will allow for automatic routing of events to any partition.
 producer.createBatch().flatMap(batch -> {
     batch.tryAdd(new EventData("test-event-1"));
     batch.tryAdd(new EventData("test-event-2"));
     return producer.send(batch);
 }).subscribe(unused -> { },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));
 

Publish events to partition "foo"

 // Creating a batch with partitionId set will route all events in that batch to partition `foo`.
 CreateBatchOptions options = new CreateBatchOptions().setPartitionId("foo");
 producer.createBatch(options).flatMap(batch -> {
     batch.tryAdd(new EventData("test-event-1"));
     batch.tryAdd(new EventData("test-event-2"));
     return producer.send(batch);
 }).subscribe(unused -> { },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));
 

Publish events to the same partition, grouped together using partition key

 // Creating a batch with partitionKey set will tell the service to hash the partitionKey and decide which
 // partition to send the events to. Events with the same partitionKey are always routed to the same partition.
 CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");
 producer.createBatch(options).flatMap(batch -> {
     batch.tryAdd(new EventData("sourdough"));
     batch.tryAdd(new EventData("rye"));
     return producer.send(batch);
 }).subscribe(unused -> { },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));
 

Publish events using a size-limited EventDataBatch

 Flux<EventData> telemetryEvents = Flux.just(firstEvent, secondEvent);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);
 AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
     producer.createBatch(options).block());

 // The sample Flux contains two events, but it could be an infinite stream of telemetry events.
 telemetryEvents.flatMap(event -> {
     final EventDataBatch batch = currentBatch.get();
     if (batch.tryAdd(event)) {
         return Mono.empty();
     }

     return Mono.when(
         producer.send(batch),
         producer.createBatch(options).map(newBatch -> {
             currentBatch.set(newBatch);

             // Add the event that did not fit in the previous batch.
             if (!newBatch.tryAdd(event)) {
                 throw Exceptions.propagate(new IllegalArgumentException(
                     "Event was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes()));
             }

             return newBatch;
         }));
 }).then()
     .doFinally(signal -> {
         final EventDataBatch batch = currentBatch.getAndSet(null);
         if (batch != null && batch.getCount() > 0) {
             producer.send(batch).block();
         }
     });
 
See Also:
  • Method Details

    • getFullyQualifiedNamespace

      public String getFullyQualifiedNamespace()
      Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.
      Returns:
      The fully qualified Event Hubs namespace that the connection is associated with.
    • getEventHubName

      public String getEventHubName()
      Gets the Event Hub name this client interacts with.
      Returns:
      The Event Hub name this client interacts with.
    • getEventHubProperties

      public Mono<EventHubProperties> getEventHubProperties()
      Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
      Returns:
      The set of information for the Event Hub that this client is associated with.
    • getPartitionIds

      public Flux<String> getPartitionIds()
      Retrieves the identifiers for the partitions of an Event Hub.
      Returns:
      A Flux of identifiers for the partitions of an Event Hub.
    • getPartitionProperties

      public Mono<PartitionProperties> getPartitionProperties(String partitionId)
      Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.
      Parameters:
      partitionId - The unique identifier of a partition associated with the Event Hub.
      Returns:
      The set of information for the requested partition under the Event Hub this client is associated with.
      Throws:
      NullPointerException - if partitionId is null.
    • createBatch

      public Mono<EventDataBatch> createBatch()
      Creates an EventDataBatch that can fit as many events as the transport allows.
      Returns:
      A new EventDataBatch that can fit as many events as the transport allows.
    • createBatch

      public Mono<EventDataBatch> createBatch(CreateBatchOptions options)
      Creates an EventDataBatch configured with the options specified.
      Parameters:
      options - A set of options used to configure the EventDataBatch.
      Returns:
      A new EventDataBatch that can fit as many events as the transport allows.
      Throws:
      NullPointerException - if options is null.
    • send

      public Mono<Void> send(Iterable<EventData> events)
      Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.
       List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
           new EventData("oak"));
       producer
           .send(events)
           .subscribe(unused -> { },
               error -> System.err.println("Error occurred while sending events:" + error),
               () -> System.out.println("Send complete."));
       

      For more information regarding the maximum event size allowed, see Azure Event Hubs Quotas and Limits.

      Parameters:
      events - Events to send to the service.
      Returns:
      A Mono that completes when all events are pushed to the service.
      Throws:
      AmqpException - if the size of events exceed the maximum size of a single batch.
    • send

      public Mono<Void> send(Iterable<EventData> events, SendOptions options)
      Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.
       List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
           new EventData("New York"));
       SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
       producer
           .send(events, sendOptions)
           .subscribe(unused -> { },
               error -> System.err.println("Error occurred while sending events:" + error),
               () -> System.out.println("Send complete."));
       

      For more information regarding the maximum event size allowed, see Azure Event Hubs Quotas and Limits.

      Parameters:
      events - Events to send to the service.
      options - The set of options to consider when sending this batch.
      Returns:
      A Mono that completes when all events are pushed to the service.
      Throws:
      AmqpException - if the size of events exceed the maximum size of a single batch.
    • send

      public Mono<Void> send(EventDataBatch batch)
      Sends the batch to the associated Event Hub.
      Parameters:
      batch - The batch to send to the service.
      Returns:
      A Mono that completes when the batch is pushed to the service.
      Throws:
      NullPointerException - if batch is null.
      See Also:
    • close

      public void close()
      Disposes of the EventHubProducerAsyncClient. If the client had a dedicated connection, the underlying connection is also closed.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
    • getIdentifier

      public String getIdentifier()
      Gets the client identifier.
      Returns:
      The unique identifier string for current client.