ServiceBusReactorSession.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus.implementation;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static com.azure.messaging.servicebus.implementation.MessageUtils.adjustServerTimeout;
/**
* An AMQP session for Service Bus.
*/
class ServiceBusReactorSession extends ReactorSession implements ServiceBusSession {
static final Symbol SESSION_FILTER = Symbol.getSymbol(AmqpConstants.VENDOR + ":session-filter");
static final Symbol LOCKED_UNTIL_UTC = Symbol.getSymbol(AmqpConstants.VENDOR + ":locked-until-utc");
private static final Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
private static final Symbol ENTITY_TYPE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-type");
private static final Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR
+ ":transfer-destination-address");
private final ClientLogger logger = new ClientLogger(ServiceBusReactorSession.class);
private final Duration openTimeout;
private final AmqpRetryPolicy retryPolicy;
private final TokenManagerProvider tokenManagerProvider;
private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
/**
* Creates a new AMQP session using proton-j.
*
* @param session Proton-j session for this AMQP session.
* @param sessionHandler Handler for events that occur in the session.
* @param sessionName Name of the session.
* @param provider Provides reactor instances for messages to sent with.
* @param handlerProvider Providers reactor handlers for listening to proton-j reactor events.
* @param cbsNodeSupplier Mono that returns a reference to the {@link ClaimsBasedSecurityNode}.
* @param tokenManagerProvider Provides {@link TokenManager} that authorizes the client when performing
* operations on the message broker.
* @param openTimeout Timeout to wait for the session operation to complete.
*/
ServiceBusReactorSession(Session session, SessionHandler sessionHandler, String sessionName,
ReactorProvider provider, ReactorHandlerProvider handlerProvider, Mono<ClaimsBasedSecurityNode> cbsNodeSupplier,
TokenManagerProvider tokenManagerProvider, Duration openTimeout, MessageSerializer messageSerializer,
AmqpRetryPolicy retryPolicy) {
super(session, sessionHandler, sessionName, provider, handlerProvider, cbsNodeSupplier, tokenManagerProvider,
messageSerializer, openTimeout, retryPolicy);
this.openTimeout = openTimeout;
this.retryPolicy = retryPolicy;
this.tokenManagerProvider = tokenManagerProvider;
this.cbsNodeSupplier = cbsNodeSupplier;
}
@Override
public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode) {
final Map<Symbol, Object> filter = new HashMap<>();
return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter);
}
@Override
public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
String sessionId) {
final Map<Symbol, Object> filter = new HashMap<>();
filter.put(SESSION_FILTER, sessionId);
return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter);
}
@Override
public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
AmqpRetryPolicy retry, String transferEntityPath) {
Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Objects.requireNonNull(timeout, "'timeout' cannot be null.");
Objects.requireNonNull(retry, "'retry' cannot be null.");
final Duration serverTimeout = adjustServerTimeout(timeout);
Map<Symbol, Object> linkProperties = new HashMap<>();
linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis()));
if (!CoreUtils.isNullOrEmpty(transferEntityPath)) {
linkProperties.put(LINK_TRANSFER_DESTINATION_PROPERTY, transferEntityPath);
logger.verbose("Get or create sender link {} for via entity path: '{}'", linkName, entityPath);
final TokenManager tokenManager = tokenManagerProvider.getTokenManager(cbsNodeSupplier,
transferEntityPath);
return tokenManager.authorize()
.doFinally(signalType -> tokenManager.close())
.then(createProducer(linkName, entityPath, timeout, retry, linkProperties));
} else {
logger.verbose("Get or create sender link {} for entity path: '{}'", linkName, entityPath);
return createProducer(linkName, entityPath, timeout, retry, linkProperties);
}
}
@Override
protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
return new ServiceBusReactorReceiver(entityPath, receiver, receiveLinkHandler, tokenManager,
reactorProvider, openTimeout, retryPolicy);
}
private Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
Map<Symbol, Object> filter) {
Objects.requireNonNull(linkName, "'linkName' cannot be null.");
Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Objects.requireNonNull(timeout, "'timeout' cannot be null.");
Objects.requireNonNull(retry, "'retry' cannot be null.");
Objects.requireNonNull(receiveMode, "'receiveMode' cannot be null.");
final Map<Symbol, Object> linkProperties = new HashMap<>();
final Duration serverTimeout = adjustServerTimeout(timeout);
linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis()));
if (entityType != null) {
linkProperties.put(ENTITY_TYPE_PROPERTY, entityType.getValue());
}
final SenderSettleMode senderSettleMode;
final ReceiverSettleMode receiverSettleMode;
switch (receiveMode) {
case PEEK_LOCK:
senderSettleMode = SenderSettleMode.UNSETTLED;
receiverSettleMode = ReceiverSettleMode.SECOND;
break;
case RECEIVE_AND_DELETE:
senderSettleMode = SenderSettleMode.SETTLED;
receiverSettleMode = ReceiverSettleMode.FIRST;
break;
default:
return Mono.error(new RuntimeException("ReceiveMode is not supported: " + receiveMode));
}
return createConsumer(linkName, entityPath, timeout, retry, filter, linkProperties, null,
senderSettleMode, receiverSettleMode).cast(ServiceBusReceiveLink.class);
}
}