Class EventHubProducerClient

java.lang.Object
com.azure.messaging.eventhubs.EventHubProducerClient
All Implemented Interfaces:
Closeable, AutoCloseable

public class EventHubProducerClient extends Object implements Closeable
A synchronous producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified when creating an EventDataBatch, the events may be automatically routed to an available partition or specific to a partition.

Allowing automatic routing of partitions is recommended when:

  • The sending of events needs to be highly available.
  • The event data should be evenly distributed among all available partitions.

If no partition id is specified, the following rules are used for automatically selecting one:

  1. Distribute the events equally amongst all available partitions using a round-robin approach.
  2. If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.

Create a producer and publish events to any partition

 // The required parameter is a way to authenticate with Event Hubs using credentials.
 // The connectionString provides a way to authenticate with Event Hub.
 EventHubProducerClient producer = new EventHubClientBuilder()
     .connectionString(
         "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}",
         "event-hub-name")
     .buildProducerClient();
 List<EventData> events = Arrays.asList(new EventData("test-event-1"), new EventData("test-event-2"));

 // Creating a batch without options set, will allow for automatic routing of events to any partition.
 EventDataBatch batch = producer.createBatch();
 for (EventData event : events) {
     if (batch.tryAdd(event)) {
         continue;
     }

     producer.send(batch);
     batch = producer.createBatch();
     if (!batch.tryAdd(event)) {
         throw new IllegalArgumentException("Event is too large for an empty batch.");
     }
 }
 

Publish events to partition "foo"

 // Creating a batch with partitionId set will route all events in that batch to partition `foo`.
 CreateBatchOptions options = new CreateBatchOptions().setPartitionId("foo");

 EventDataBatch batch = producer.createBatch(options);
 batch.tryAdd(new EventData("data-to-partition-foo"));
 producer.send(batch);
 

Publish events to the same partition, grouped together using partition key

 List<EventData> events = Arrays.asList(new EventData("sourdough"), new EventData("rye"),
     new EventData("wheat"));

 // Creating a batch with partitionKey set will tell the service to hash the partitionKey and decide which
 // partition to send the events to. Events with the same partitionKey are always routed to the same partition.
 CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");
 EventDataBatch batch = producer.createBatch(options);

 events.forEach(event -> batch.tryAdd(event));
 producer.send(batch);
 

Publish events using a size-limited EventDataBatch

 List<EventData> telemetryEvents = Arrays.asList(firstEvent, secondEvent, thirdEvent);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);

 EventDataBatch currentBatch = producer.createBatch(options);

 // For each telemetry event, we try to add it to the current batch.
 // When the batch is full, send it then create another batch to add more events to.
 for (EventData event : telemetryEvents) {
     if (!currentBatch.tryAdd(event)) {
         producer.send(currentBatch);
         currentBatch = producer.createBatch(options);

         // Add the event we couldn't before.
         if (!currentBatch.tryAdd(event)) {
             throw new IllegalArgumentException("Event is too large for an empty batch.");
         }
     }
 }
 
See Also:
  • Method Details

    • getEventHubName

      public String getEventHubName()
      Gets the Event Hub name this client interacts with.
      Returns:
      The Event Hub name this client interacts with.
    • getFullyQualifiedNamespace

      public String getFullyQualifiedNamespace()
      Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.
      Returns:
      The fully qualified Event Hubs namespace that the connection is associated with.
    • getEventHubProperties

      public EventHubProperties getEventHubProperties()
      Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
      Returns:
      The set of information for the Event Hub that this client is associated with.
    • getPartitionIds

      public IterableStream<String> getPartitionIds()
      Retrieves the identifiers for the partitions of an Event Hub.
      Returns:
      A Flux of identifiers for the partitions of an Event Hub.
    • getPartitionProperties

      public PartitionProperties getPartitionProperties(String partitionId)
      Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.
      Parameters:
      partitionId - The unique identifier of a partition associated with the Event Hub.
      Returns:
      The set of information for the requested partition under the Event Hub this client is associated with.
      Throws:
      NullPointerException - if partitionId is null.
    • createBatch

      public EventDataBatch createBatch()
      Creates an EventDataBatch that can fit as many events as the transport allows.
      Returns:
      A new EventDataBatch that can fit as many events as the transport allows.
    • createBatch

      public EventDataBatch createBatch(CreateBatchOptions options)
      Creates an EventDataBatch configured with the options specified.
      Parameters:
      options - A set of options used to configure the EventDataBatch.
      Returns:
      A new EventDataBatch that can fit as many events as the transport allows.
      Throws:
      NullPointerException - if options is null.
    • send

      public void send(Iterable<EventData> events)
      Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.
       List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
           new EventData("oak"));
       producer.send(events);
       

      For more information regarding the maximum event size allowed, see Azure Event Hubs Quotas and Limits.

      Parameters:
      events - Events to send to the service.
      Throws:
      AmqpException - if the size of events exceed the maximum size of a single batch.
    • send

      public void send(Iterable<EventData> events, SendOptions options)
      Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.
       List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
           new EventData("New York"));
       SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
       producer.send(events, sendOptions);
       

      For more information regarding the maximum event size allowed, see Azure Event Hubs Quotas and Limits.

      Parameters:
      events - Events to send to the service.
      options - The set of options to consider when sending this batch.
      Throws:
      AmqpException - if the size of events exceed the maximum size of a single batch.
    • send

      public void send(EventDataBatch batch)
      Sends the batch to the associated Event Hub.
      Parameters:
      batch - The batch to send to the service.
      Throws:
      NullPointerException - if batch is null.
      See Also:
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
    • getIdentifier

      public String getIdentifier()
      Gets the client identifier.
      Returns:
      The unique identifier string for current client.