ServiceBusReactorSession.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static com.azure.messaging.servicebus.implementation.MessageUtils.adjustServerTimeout;

/**
 * An AMQP session for Service Bus.
 */
class ServiceBusReactorSession extends ReactorSession implements ServiceBusSession {
    static final Symbol SESSION_FILTER = Symbol.getSymbol(AmqpConstants.VENDOR + ":session-filter");
    static final Symbol LOCKED_UNTIL_UTC = Symbol.getSymbol(AmqpConstants.VENDOR + ":locked-until-utc");

    private static final Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
    private static final Symbol ENTITY_TYPE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-type");
    private static final Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR
        + ":transfer-destination-address");

    private final ClientLogger logger = new ClientLogger(ServiceBusReactorSession.class);
    private final Duration openTimeout;
    private final AmqpRetryPolicy retryPolicy;
    private final TokenManagerProvider tokenManagerProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;

    /**
     * Creates a new AMQP session using proton-j.
     *
     * @param session Proton-j session for this AMQP session.
     * @param sessionHandler Handler for events that occur in the session.
     * @param sessionName Name of the session.
     * @param provider Provides reactor instances for messages to sent with.
     * @param handlerProvider Providers reactor handlers for listening to proton-j reactor events.
     * @param cbsNodeSupplier Mono that returns a reference to the {@link ClaimsBasedSecurityNode}.
     * @param tokenManagerProvider Provides {@link TokenManager} that authorizes the client when performing
     *     operations on the message broker.
     * @param openTimeout Timeout to wait for the session operation to complete.
     */
    ServiceBusReactorSession(Session session, SessionHandler sessionHandler, String sessionName,
        ReactorProvider provider, ReactorHandlerProvider handlerProvider, Mono<ClaimsBasedSecurityNode> cbsNodeSupplier,
        TokenManagerProvider tokenManagerProvider, Duration openTimeout, MessageSerializer messageSerializer,
        AmqpRetryPolicy retryPolicy) {
        super(session, sessionHandler, sessionName, provider, handlerProvider, cbsNodeSupplier, tokenManagerProvider,
            messageSerializer, openTimeout, retryPolicy);
        this.openTimeout = openTimeout;
        this.retryPolicy = retryPolicy;
        this.tokenManagerProvider = tokenManagerProvider;
        this.cbsNodeSupplier = cbsNodeSupplier;
    }

    @Override
    public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
        MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode) {
        final Map<Symbol, Object> filter = new HashMap<>();

        return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter);
    }

    @Override
    public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
        MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
        String sessionId) {

        final Map<Symbol, Object> filter = new HashMap<>();
        filter.put(SESSION_FILTER, sessionId);

        return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter);
    }

    @Override
    public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
        AmqpRetryPolicy retry, String transferEntityPath) {
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Objects.requireNonNull(timeout, "'timeout' cannot be null.");
        Objects.requireNonNull(retry, "'retry' cannot be null.");

        final Duration serverTimeout = adjustServerTimeout(timeout);
        Map<Symbol, Object> linkProperties = new HashMap<>();

        linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis()));

        if (!CoreUtils.isNullOrEmpty(transferEntityPath)) {
            linkProperties.put(LINK_TRANSFER_DESTINATION_PROPERTY, transferEntityPath);
            logger.verbose("Get or create sender link {} for via entity path: '{}'", linkName, entityPath);

            final TokenManager tokenManager = tokenManagerProvider.getTokenManager(cbsNodeSupplier,
                transferEntityPath);

            return tokenManager.authorize()
                .doFinally(signalType -> tokenManager.close())
                .then(createProducer(linkName, entityPath, timeout, retry, linkProperties));
        } else {
            logger.verbose("Get or create sender link {} for entity path: '{}'", linkName, entityPath);
            return createProducer(linkName, entityPath, timeout, retry, linkProperties);
        }
    }

    @Override
    protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
        ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
        return new ServiceBusReactorReceiver(entityPath, receiver, receiveLinkHandler, tokenManager,
            reactorProvider, openTimeout, retryPolicy);
    }

    private Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
        MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode,
        Map<Symbol, Object> filter) {
        Objects.requireNonNull(linkName, "'linkName' cannot be null.");
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Objects.requireNonNull(timeout, "'timeout' cannot be null.");
        Objects.requireNonNull(retry, "'retry' cannot be null.");
        Objects.requireNonNull(receiveMode, "'receiveMode' cannot be null.");

        final Map<Symbol, Object> linkProperties = new HashMap<>();
        final Duration serverTimeout = adjustServerTimeout(timeout);
        linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(serverTimeout.toMillis()));
        if (entityType != null) {
            linkProperties.put(ENTITY_TYPE_PROPERTY, entityType.getValue());
        }


        final SenderSettleMode senderSettleMode;
        final ReceiverSettleMode receiverSettleMode;
        switch (receiveMode) {
            case PEEK_LOCK:
                senderSettleMode = SenderSettleMode.UNSETTLED;
                receiverSettleMode = ReceiverSettleMode.SECOND;
                break;
            case RECEIVE_AND_DELETE:
                senderSettleMode = SenderSettleMode.SETTLED;
                receiverSettleMode = ReceiverSettleMode.FIRST;
                break;
            default:
                return Mono.error(new RuntimeException("ReceiveMode is not supported: " + receiveMode));
        }

        return createConsumer(linkName, entityPath, timeout, retry, filter, linkProperties, null,
            senderSettleMode, receiverSettleMode).cast(ServiceBusReceiveLink.class);
    }
}