Class ServiceBusSessionReceiverAsyncClient

java.lang.Object
com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient
All Implemented Interfaces:
AutoCloseable

public final class ServiceBusSessionReceiverAsyncClient extends Object implements AutoCloseable
This asynchronous session receiver client is used to acquire session locks from a queue or topic and create ServiceBusReceiverAsyncClient instances that are tied to the locked sessions.

Receive messages from a specific session

Use acceptSession(String) to acquire the lock of a session if you know the session id.

 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .sessionReceiver()
     .queueName(queueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<< my-session-id >>" session is
 // successfully locked.
 // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes.
 // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
 Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<< my-session-id >>"),
     receiver -> receiver.receiveMessages(),
     receiver -> Mono.fromRunnable(() -> receiver.close()));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
         message.getSequenceNumber(), message.getBody()),
     error -> System.err.print(error));
 

Receive messages from the first available session

Use acceptNextSession() to acquire the lock of the next available session without specifying the session id.

 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .sessionReceiver()
     .queueName(queueName)
     .buildAsyncClient();

 // acceptNextSession() completes successfully with a receiver when it acquires the next available session.
 // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes.
 // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
 Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptNextSession(),
     receiver -> receiver.receiveMessages(),
     receiver -> Mono.fromRunnable(() -> receiver.close()));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
         message.getSequenceNumber(), message.getBody()),
     error -> System.err.print(error));