ServiceBusReactorSession.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus.implementation;
import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static com.azure.messaging.servicebus.implementation.MessageUtils.adjustServerTimeout;
/**
* An AMQP session for Service Bus.
*/
class ServiceBusReactorSession extends ReactorSession implements ServiceBusSession {
static final Symbol SESSION_FILTER = Symbol.getSymbol(AmqpConstants.VENDOR + ":session-filter");
static final Symbol LOCKED_UNTIL_UTC = Symbol.getSymbol(AmqpConstants.VENDOR + ":locked-until-utc");
private static final Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
private static final Symbol ENTITY_TYPE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-type");
private static final Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR
+ ":transfer-destination-address");
private final ClientLogger logger = new ClientLogger(ServiceBusReactorSession.class);
private final AmqpRetryPolicy retryPolicy;
private final TokenManagerProvider tokenManagerProvider;
private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
private final AmqpConnection amqpConnection;
private final AmqpRetryOptions retryOptions;
private final boolean distributedTransactionsSupport;
/**
* Creates a new AMQP session using proton-j.
*
* @param session Proton-j session for this AMQP session.
* @param sessionHandler Handler for events that occur in the session.
* @param sessionName Name of the session.
* @param provider Provides reactor instances for messages to sent with.
* @param handlerProvider Providers reactor handlers for listening to proton-j reactor events.
* @param cbsNodeSupplier Mono that returns a reference to the {@link ClaimsBasedSecurityNode}.
* @param tokenManagerProvider Provides {@link TokenManager} that authorizes the client when performing
* operations on the message broker.
* @param retryOptions Retry options.
* @param createOptions the options to create {@link ServiceBusReactorSession}.
*/
ServiceBusReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler,
String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider,
Mono<ClaimsBasedSecurityNode> cbsNodeSupplier, TokenManagerProvider tokenManagerProvider,
MessageSerializer messageSerializer, AmqpRetryOptions retryOptions,
ServiceBusCreateSessionOptions createOptions) {
super(amqpConnection, session, sessionHandler, sessionName, provider, handlerProvider, cbsNodeSupplier,
tokenManagerProvider, messageSerializer, retryOptions);
this.amqpConnection = amqpConnection;
this.retryOptions = retryOptions;
this.retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
this.tokenManagerProvider = tokenManagerProvider;
this.cbsNodeSupplier = cbsNodeSupplier;
this.distributedTransactionsSupport = createOptions.isDistributedTransactionsSupported();
}
@Override
public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode) {
final Map<Symbol, Object> filter = new HashMap<>();
return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter);
}
@Override
public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
String sessionId) {
final Map<Symbol, Object> filter = new HashMap<>();
filter.put(SESSION_FILTER, sessionId);
return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter);
}
@Override
public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
AmqpRetryPolicy retry, String transferEntityPath) {
Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Objects.requireNonNull(timeout, "'timeout' cannot be null.");
Objects.requireNonNull(retry, "'retry' cannot be null.");
final Duration serverTimeout = adjustServerTimeout(timeout);
Map<Symbol, Object> linkProperties = new HashMap<>();
linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis()));
if (!CoreUtils.isNullOrEmpty(transferEntityPath)) {
linkProperties.put(LINK_TRANSFER_DESTINATION_PROPERTY, transferEntityPath);
logger.verbose("Get or create sender link {} for via entity path: '{}'", linkName, entityPath);
final TokenManager tokenManager = tokenManagerProvider.getTokenManager(cbsNodeSupplier,
transferEntityPath);
return tokenManager.authorize()
.doFinally(signalType -> tokenManager.close())
.then(createProducer(linkName, entityPath, timeout, retry, linkProperties));
} else {
logger.verbose("Get or create sender link {} for entity path: '{}'", linkName, entityPath);
return createProducer(linkName, entityPath, timeout, retry, linkProperties);
}
}
@Override
public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
return this.createProducer(linkName, entityPath, timeout, retry, (Map<Symbol, Object>) null);
}
@Override
protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties) {
if (distributedTransactionsSupport) {
return getOrCreateTransactionCoordinator().flatMap(coordinator -> super.createProducer(linkName, entityPath,
timeout, retry, linkProperties));
} else {
return super.createProducer(linkName, entityPath, timeout, retry, linkProperties);
}
}
@Override
protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
return new ServiceBusReactorReceiver(amqpConnection, entityPath, receiver, receiveLinkHandler, tokenManager,
reactorProvider, retryOptions.getTryTimeout(), retryPolicy);
}
private Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
Map<Symbol, Object> filter) {
Objects.requireNonNull(linkName, "'linkName' cannot be null.");
Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Objects.requireNonNull(timeout, "'timeout' cannot be null.");
Objects.requireNonNull(retry, "'retry' cannot be null.");
Objects.requireNonNull(receiveMode, "'receiveMode' cannot be null.");
final Map<Symbol, Object> linkProperties = new HashMap<>();
final Duration serverTimeout = adjustServerTimeout(timeout);
linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis()));
if (entityType != null) {
linkProperties.put(ENTITY_TYPE_PROPERTY, entityType.getValue());
}
final SenderSettleMode senderSettleMode;
final ReceiverSettleMode receiverSettleMode;
switch (receiveMode) {
case PEEK_LOCK:
senderSettleMode = SenderSettleMode.UNSETTLED;
receiverSettleMode = ReceiverSettleMode.SECOND;
break;
case RECEIVE_AND_DELETE:
senderSettleMode = SenderSettleMode.SETTLED;
receiverSettleMode = ReceiverSettleMode.FIRST;
break;
default:
return Mono.error(new RuntimeException("ReceiveMode is not supported: " + receiveMode));
}
if (distributedTransactionsSupport) {
return getOrCreateTransactionCoordinator().flatMap(transactionCoordinator -> createConsumer(linkName,
entityPath, timeout, retry, filter, linkProperties, null, senderSettleMode,
receiverSettleMode)
.cast(ServiceBusReceiveLink.class));
} else {
return createConsumer(linkName, entityPath, timeout, retry, filter, linkProperties,
null, senderSettleMode, receiverSettleMode).cast(ServiceBusReceiveLink.class);
}
}
}