Class ServiceBusSenderAsyncClient

java.lang.Object
com.azure.messaging.servicebus.ServiceBusSenderAsyncClient
All Implemented Interfaces:
AutoCloseable

public final class ServiceBusSenderAsyncClient extends Object implements AutoCloseable
An asynchronous client to send messages to a Service Bus resource.

Create an instance of sender

 // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
 ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .sender()
     .queueName(queueName)
     .buildAsyncClient();
 

Create an instance of sender using default credential

 // The required parameter is a way to authenticate with Service Bus using credentials.
 // The connectionString provides a way to authenticate with Service Bus.
 ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
     .credential("<<fully-qualified-namespace>>",
         new DefaultAzureCredentialBuilder().build())
     .sender()
     .queueName("<< QUEUE NAME >>")
     .buildAsyncClient();
 

Send messages to a Service Bus resource

 // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
 ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .sender()
     .queueName(queueName)
     .buildAsyncClient();

 // Creating a batch without options set, will allow for automatic routing of events to any partition.
 sender.createMessageBatch().flatMap(batch -> {
     batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-1".getBytes(UTF_8))));
     batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-2".getBytes(UTF_8))));
     return sender.sendMessages(batch);
 }).subscribe(unused -> {
 },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));
 

Send messages using a size-limited ServiceBusMessageBatch to a Service Bus resource

 Flux<ServiceBusMessage> telemetryMessages = Flux.just(firstMessage, secondMessage);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateMessageBatchOptions options = new CreateMessageBatchOptions()
     .setMaximumSizeInBytes(256);
 AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>(
     sender.createMessageBatch(options).block());

 // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
 telemetryMessages.flatMap(message -> {
     ServiceBusMessageBatch batch = currentBatch.get();
     if (batch.tryAddMessage(message)) {
         return Mono.empty();
     }

     return Mono.when(
         sender.sendMessages(batch),
         sender.createMessageBatch(options).map(newBatch -> {
             currentBatch.set(newBatch);

             // Add the message that did not fit in the previous batch.
             if (!newBatch.tryAddMessage(message)) {
                 throw Exceptions.propagate(new IllegalArgumentException(
                     "Message was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes()));
             }

             return newBatch;
         }));
 }).then()
     .doFinally(signal -> {
         ServiceBusMessageBatch batch = currentBatch.getAndSet(null);
         if (batch != null && batch.getCount() > 0) {
             sender.sendMessages(batch).block();
         }
     });
 
  • Method Details

    • getFullyQualifiedNamespace

      public String getFullyQualifiedNamespace()
      Gets the fully qualified namespace.
      Returns:
      The fully qualified namespace.
    • getEntityPath

      public String getEntityPath()
      Gets the name of the Service Bus resource.
      Returns:
      The name of the Service Bus resource.
    • sendMessage

      public Mono<Void> sendMessage(ServiceBusMessage message)
      Sends a message to a Service Bus queue or topic.
      Parameters:
      message - Message to be sent to Service Bus queue or topic.
      Returns:
      The Mono the finishes this operation on service bus resource.
      Throws:
      NullPointerException - if message is null.
      IllegalStateException - if sender is already disposed.
      ServiceBusException - if message is larger than the maximum allowed size of a single message or the message could not be sent.
    • sendMessage

      public Mono<Void> sendMessage(ServiceBusMessage message, ServiceBusTransactionContext transactionContext)
      Sends a message to a Service Bus queue or topic.
      Parameters:
      message - Message to be sent to Service Bus queue or topic.
      transactionContext - to be set on batch message before sending to Service Bus.
      Returns:
      The Mono the finishes this operation on service bus resource.
      Throws:
      NullPointerException - if message, transactionContext or transactionContext.transactionId is null.
      IllegalStateException - if sender is already disposed.
      ServiceBusException - if message is larger than the maximum allowed size of a single message or the message could not be sent.
    • sendMessages

      public Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext)
      Sends a set of messages to a Service Bus queue or topic using a batched approach. If the size of messages exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.
      Parameters:
      messages - Messages to be sent to Service Bus queue or topic.
      transactionContext - to be set on batch message before sending to Service Bus.
      Returns:
      A Mono that completes when all messages have been sent to the Service Bus resource.
      Throws:
      NullPointerException - if batch, transactionContext or transactionContext.transactionId is null.
      IllegalStateException - if sender is already disposed.
      ServiceBusException - if the message could not be sent or message is larger than the maximum size of the ServiceBusMessageBatch.
    • sendMessages

      public Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages)
      Sends a set of messages to a Service Bus queue or topic using a batched approach. If the size of messages exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.
      Parameters:
      messages - Messages to be sent to Service Bus queue or topic.
      Returns:
      A Mono that completes when all messages have been sent to the Service Bus resource.
      Throws:
      NullPointerException - if messages is null.
      ServiceBusException - if the message could not be sent or message is larger than the maximum size of the ServiceBusMessageBatch.
      IllegalStateException - if sender is already disposed.
    • sendMessages

      public Mono<Void> sendMessages(ServiceBusMessageBatch batch)
      Sends a message batch to the Azure Service Bus entity this sender is connected to.
      Parameters:
      batch - of messages which allows client to send maximum allowed size for a batch of messages.
      Returns:
      A Mono the finishes this operation on service bus resource.
      Throws:
      NullPointerException - if batch is null.
      ServiceBusException - if the message batch could not be sent.
      IllegalStateException - if sender is already disposed.
    • sendMessages

      public Mono<Void> sendMessages(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext)
      Sends a message batch to the Azure Service Bus entity this sender is connected to.
      Parameters:
      batch - of messages which allows client to send maximum allowed size for a batch of messages.
      transactionContext - to be set on batch message before sending to Service Bus.
      Returns:
      A Mono the finishes this operation on service bus resource.
      Throws:
      NullPointerException - if batch, transactionContext or transactionContext.transactionId is null.
      ServiceBusException - if the message batch could not be sent.
      IllegalStateException - if sender is already disposed.
    • createMessageBatch

      public Mono<ServiceBusMessageBatch> createMessageBatch()
      Creates a ServiceBusMessageBatch that can fit as many messages as the transport allows.
      Returns:
      A ServiceBusMessageBatch that can fit as many messages as the transport allows.
      Throws:
      ServiceBusException - if the message batch could not be created.
      IllegalStateException - if sender is already disposed.
    • createMessageBatch

      public Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions options)
      Creates an ServiceBusMessageBatch configured with the options specified.
      Parameters:
      options - A set of options used to configure the ServiceBusMessageBatch.
      Returns:
      A new ServiceBusMessageBatch configured with the given options.
      Throws:
      NullPointerException - if options is null.
      ServiceBusException - if the message batch could not be created.
      IllegalStateException - if sender is already disposed.
      IllegalArgumentException - if CreateMessageBatchOptions.getMaximumSizeInBytes() is larger than maximum allowed size.
    • scheduleMessage

      public Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)
      Sends a scheduled message to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.
      Parameters:
      message - Message to be sent to the Service Bus Queue.
      scheduledEnqueueTime - OffsetDateTime at which the message should appear in the Service Bus queue or topic.
      transactionContext - to be set on message before sending to Service Bus.
      Returns:
      The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
      Throws:
      NullPointerException - if message, scheduledEnqueueTime, transactionContext or transactionContext.transactionID is null.
      ServiceBusException - If the message could not be scheduled.
      IllegalStateException - if sender is already disposed.
    • scheduleMessage

      public Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime)
      Sends a scheduled message to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.
      Parameters:
      message - Message to be sent to the Service Bus Queue.
      scheduledEnqueueTime - OffsetDateTime at which the message should appear in the Service Bus queue or topic.
      Returns:
      The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
      Throws:
      NullPointerException - if message or scheduledEnqueueTime is null.
      ServiceBusException - If the message could not be scheduled.
      IllegalStateException - if sender is already disposed.
    • scheduleMessages

      public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime)
      Sends a batch of scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.
      Parameters:
      messages - Messages to be sent to the Service Bus queue or topic.
      scheduledEnqueueTime - OffsetDateTime at which the message should appear in the Service Bus queue or topic.
      Returns:
      Sequence numbers of the scheduled messages which can be used to cancel the messages.
      Throws:
      NullPointerException - If messages or scheduledEnqueueTime is null.
      ServiceBusException - If the messages could not be scheduled.
      IllegalStateException - if sender is already disposed.
    • scheduleMessages

      public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)
      Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.
      Parameters:
      messages - Messages to be sent to the Service Bus Queue.
      scheduledEnqueueTime - OffsetDateTime at which the messages should appear in the Service Bus queue or topic.
      transactionContext - Transaction to associate with the operation.
      Returns:
      Sequence numbers of the scheduled messages which can be used to cancel the messages.
      Throws:
      NullPointerException - If messages, scheduledEnqueueTime, transactionContext or transactionContext.transactionId is null.
      ServiceBusException - If the messages could not be scheduled or the message is larger than the maximum size of the ServiceBusMessageBatch.
      IllegalStateException - if sender is already disposed.
    • cancelScheduledMessage

      public Mono<Void> cancelScheduledMessage(long sequenceNumber)
      Cancels the enqueuing of a scheduled message, if it was not already enqueued.
      Parameters:
      sequenceNumber - of the scheduled message to cancel.
      Returns:
      The Mono that finishes this operation on service bus resource.
      Throws:
      IllegalArgumentException - if sequenceNumber is negative.
      ServiceBusException - If the messages could not be cancelled.
      IllegalStateException - if sender is already disposed.
    • cancelScheduledMessages

      public Mono<Void> cancelScheduledMessages(Iterable<Long> sequenceNumbers)
      Cancels the enqueuing of an already scheduled message, if it was not already enqueued.
      Parameters:
      sequenceNumbers - of the scheduled messages to cancel.
      Returns:
      The Mono that finishes this operation on service bus resource.
      Throws:
      NullPointerException - if sequenceNumbers is null.
      IllegalStateException - if sender is already disposed.
      ServiceBusException - if the scheduled messages cannot cancelled.
    • createTransaction

      public Mono<ServiceBusTransactionContext> createTransaction()
      Starts a new transaction on Service Bus. The ServiceBusTransactionContext should be passed along with ServiceBusReceivedMessage all operations that needs to be in this transaction.
      Returns:
      A new ServiceBusTransactionContext.
      Throws:
      IllegalStateException - if sender is already disposed.
      ServiceBusException - if a transaction cannot be created.
      See Also:
    • commitTransaction

      public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)
      Commits the transaction given ServiceBusTransactionContext. This will make a call to Service Bus.
      Parameters:
      transactionContext - to be committed.
      Returns:
      The Mono that finishes this operation on Service Bus resource.
      Throws:
      IllegalStateException - if sender is already disposed.
      NullPointerException - if transactionContext or transactionContext.transactionId is null.
      ServiceBusException - if the transaction could not be committed.
      See Also:
    • rollbackTransaction

      public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)
      Rollbacks the transaction given ServiceBusTransactionContext. This will make a call to Service Bus.
      Parameters:
      transactionContext - Transaction to rollback.
      Returns:
      The Mono that finishes this operation on the Service Bus resource.
      Throws:
      IllegalStateException - if sender is already disposed.
      NullPointerException - if transactionContext or transactionContext.transactionId is null.
      ServiceBusException - if the transaction could not be rolled back.
      See Also:
    • close

      public void close()
      Disposes of the ServiceBusSenderAsyncClient. If the client has a dedicated connection, the underlying connection is also closed.
      Specified by:
      close in interface AutoCloseable