ServiceBusSessionReceiverClient.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import java.time.Duration;
import java.util.Objects;
/**
* This <b>synchronous</b> session receiver client is used to acquire session locks from a queue or topic and create
* {@link ServiceBusReceiverClient} instances that are tied to the locked sessions.
*
* <p><strong>Receive messages from a specific session</strong></p>
* <p>Use {@link #acceptSession(String)} to acquire the lock of a session if you know the session id.</p>
* <!-- src_embed com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId -->
* <pre>
* // The connectionString/sessionQueueName must be set by the application. The 'connectionString' format is shown below.
* // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
* ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
* .connectionString(connectionString)
* .sessionReceiver()
* .queueName(sessionQueueName)
* .buildClient();
* ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("<< my-session-id >>");
*
* // Use the receiver and finally close it along with the sessionReceiver.
* receiver.close();
* sessionReceiver.close();
* </pre>
* <!-- end com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId -->
*
* <p><strong>Receive messages from the first available session</strong></p>
* <p>Use {@link #acceptNextSession()} to acquire the lock of the next available session without specifying the session
* id.</p>
* <!-- src_embed com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession -->
* <pre>
* // The connectionString/sessionQueueName must be set by the application. The 'connectionString' format is shown below.
* // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
* ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
* .connectionString(
* "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}")
* .sessionReceiver()
* .queueName("<< QUEUE NAME >>")
* .buildClient();
* ServiceBusReceiverClient receiver = sessionReceiver.acceptNextSession();
*
* // Use the receiver and finally close it along with the sessionReceiver.
* receiver.close();
* sessionReceiver.close();
* </pre>
* <!-- end com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession -->
*/
@ServiceClient(builder = ServiceBusClientBuilder.class)
public final class ServiceBusSessionReceiverClient implements AutoCloseable {
private final ServiceBusSessionReceiverAsyncClient sessionAsyncClient;
private final Duration operationTimeout;
ServiceBusSessionReceiverClient(ServiceBusSessionReceiverAsyncClient asyncClient, Duration operationTimeout) {
this.sessionAsyncClient = Objects.requireNonNull(asyncClient, "'asyncClient' cannot be null.");
this.operationTimeout = operationTimeout;
}
/**
* Acquires a session lock for the next available session and creates a {@link ServiceBusReceiverClient}
* to receive messages from the session. It will wait until a session is available if no one is available
* immediately.
*
* @return A {@link ServiceBusReceiverClient} that is tied to the available session.
*
* @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled.
* @throws AmqpException if the operation times out. The timeout duration is the tryTimeout
* of when you build this client with the {@link ServiceBusClientBuilder#retryOptions(AmqpRetryOptions)}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public ServiceBusReceiverClient acceptNextSession() {
return sessionAsyncClient.acceptNextSession()
.map(asyncClient -> new ServiceBusReceiverClient(asyncClient, operationTimeout))
.block(operationTimeout);
}
/**
* Acquires a session lock for {@code sessionId} and create a {@link ServiceBusReceiverClient}
* to receive messages from the session. If the session is already locked by another client, an
* {@link AmqpException} is thrown immediately.
*
* @param sessionId The session id.
*
* @return A {@link ServiceBusReceiverClient} that is tied to the specified session.
*
* @throws NullPointerException if {@code sessionId} is null.
* @throws IllegalArgumentException if {@code sessionId} is empty.
* @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled.
* @throws ServiceBusException if the lock cannot be acquired.
* @throws AmqpException if the operation times out. The timeout duration is the tryTimeout
* of when you build this client with the {@link ServiceBusClientBuilder#retryOptions(AmqpRetryOptions)}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public ServiceBusReceiverClient acceptSession(String sessionId) {
return sessionAsyncClient.acceptSession(sessionId)
.map(asyncClient -> new ServiceBusReceiverClient(asyncClient, operationTimeout))
.block(operationTimeout);
}
@Override
public void close() {
sessionAsyncClient.close();
}
}