ReactorSession.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addErrorCondition;
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addSignalTypeAndResult;
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId;
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
import static com.azure.core.amqp.implementation.ClientConstants.SESSION_NAME_KEY;

/**
 * Represents an AMQP session using proton-j reactor.
 */
public class ReactorSession implements AmqpSession {
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private final ConcurrentMap<String, LinkSubscription<AmqpSendLink>> openSendLinks = new ConcurrentHashMap<>();
    private final ConcurrentMap<String, LinkSubscription<AmqpReceiveLink>> openReceiveLinks = new ConcurrentHashMap<>();

    private final Scheduler timeoutScheduler = Schedulers.parallel();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Object closeLock = new Object();

    /**
     * Mono that completes when the session is completely closed, that is that the session remote
     */
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();

    private final ClientLogger logger;
    private final Flux<AmqpEndpointState> endpointStates;

    private final AmqpConnection amqpConnection;
    private final Session session;
    private final SessionHandler sessionHandler;
    private final String sessionName;
    private final ReactorProvider provider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final String activeTimeoutMessage;
    private final AmqpRetryOptions retryOptions;

    private final ReactorHandlerProvider handlerProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
    private final Disposable.Composite connectionSubscriptions;

    private final AtomicReference<TransactionCoordinator> transactionCoordinator = new AtomicReference<>();
    private final Flux<AmqpShutdownSignal> shutdownSignals;

    /**
     * Creates a new AMQP session using proton-j.
     *
     * @param session Proton-j session for this AMQP session.
     * @param sessionHandler Handler for events that occur in the session.
     * @param sessionName Name of the session.
     * @param provider Provides reactor instances for messages to sent with.
     * @param handlerProvider Providers reactor handlers for listening to proton-j reactor events.
     * @param cbsNodeSupplier Mono that returns a reference to the {@link ClaimsBasedSecurityNode}.
     * @param tokenManagerProvider Provides {@link TokenManager} that authorizes the client when performing
     *     operations on the message broker.
     * @param retryOptions for the session operations.
     */
    public ReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler,
        String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider,
        Mono<ClaimsBasedSecurityNode> cbsNodeSupplier, TokenManagerProvider tokenManagerProvider,
        MessageSerializer messageSerializer, AmqpRetryOptions retryOptions) {
        this.amqpConnection = amqpConnection;
        this.session = session;
        this.sessionHandler = sessionHandler;
        this.handlerProvider = handlerProvider;
        this.sessionName = sessionName;
        this.provider = provider;
        this.cbsNodeSupplier = cbsNodeSupplier;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.retryOptions = retryOptions;
        this.activeTimeoutMessage = String.format(
            "ReactorSession connectionId[%s], session[%s]: Retries exhausted waiting for ACTIVE endpoint state.",
            sessionHandler.getConnectionId(), sessionName);

        this.logger = new ClientLogger(ReactorSession.class, createContextWithConnectionId(this.sessionHandler.getConnectionId()));

        this.endpointStates = sessionHandler.getEndpointStates()
            .map(state -> {
                logger.atVerbose()
                    .addKeyValue(SESSION_NAME_KEY, sessionName)
                    .addKeyValue("state", state)
                    .log("Got endpoint state.");

                return AmqpEndpointStateUtil.getConnectionState(state);
            })
            .doOnError(error -> handleError(error))
            .doOnComplete(() -> handleClose())
            .cache(1);

        shutdownSignals = amqpConnection.getShutdownSignals();
        connectionSubscriptions = Disposables.composite(
            this.endpointStates.subscribe(),

            shutdownSignals.flatMap(signal ->  closeAsync("Shutdown signal received", null, false)).subscribe());

        session.open();
    }

    Session session() {
        return this.session;
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return endpointStates;
    }

    @Override
    public boolean isDisposed() {
        return isDisposed.get();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void dispose() {
        closeAsync().block(retryOptions.getTryTimeout());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public String getSessionName() {
        return sessionName;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Duration getOperationTimeout() {
        return retryOptions.getTryTimeout();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Mono<AmqpTransaction> createTransaction() {
        return getOrCreateTransactionCoordinator()
            .flatMap(coordinator -> coordinator.declare());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Mono<Void> commitTransaction(AmqpTransaction transaction) {
        return getOrCreateTransactionCoordinator()
            .flatMap(coordinator -> coordinator.discharge(transaction, true));
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Mono<Void> rollbackTransaction(AmqpTransaction transaction) {
        return getOrCreateTransactionCoordinator()
            .flatMap(coordinator -> coordinator.discharge(transaction, false));
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        return createProducer(linkName, entityPath, timeout, retry, null)
            .or(onClosedError("Connection closed while waiting for new producer link.", entityPath, linkName));
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Mono<AmqpLink> createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        return createConsumer(linkName, entityPath, timeout, retry, null, null, null,
            SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND)
            .or(onClosedError("Connection closed while waiting for new receive link.", entityPath, linkName))
            .cast(AmqpLink.class);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean removeLink(String linkName) {
        return removeLink(openSendLinks, linkName) || removeLink(openReceiveLinks, linkName);
    }

    /**
     * A Mono that completes when the session has completely closed.
     *
     * @return Mono that completes when the session has completely closed.
     */
    Mono<Void> isClosed() {
        return isClosedMono.asMono();
    }

    @Override
    public Mono<Void> closeAsync() {
        return closeAsync(null, null, true);
    }

    Mono<Void> closeAsync(String message, ErrorCondition errorCondition, boolean disposeLinks) {
        if (isDisposed.getAndSet(true)) {
            return isClosedMono.asMono();
        }

        addErrorCondition(logger.atVerbose(), errorCondition)
            .addKeyValue(SESSION_NAME_KEY, sessionName)
            .log("Setting error condition and disposing session. {}", message);

        return Mono.fromRunnable(() -> {
            try {
                provider.getReactorDispatcher().invoke(() -> disposeWork(errorCondition, disposeLinks));
            } catch (IOException e) {
                logger.atInfo()
                    .addKeyValue(SESSION_NAME_KEY, sessionName)
                    .log("Error while scheduling work. Manually disposing.", e);

                disposeWork(errorCondition, disposeLinks);
            } catch (RejectedExecutionException e) {
                logger.atInfo()
                    .addKeyValue(SESSION_NAME_KEY, sessionName)
                    .log("RejectedExecutionException when scheduling work.");

                disposeWork(errorCondition, disposeLinks);
            }
        }).then(isClosedMono.asMono());
    }

    /**
     * @return {@link Mono} of {@link TransactionCoordinator}
     */
    @Override
    public Mono<? extends AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
        if (isDisposed()) {
            return Mono.error(logger.atError()
                .addKeyValue(SESSION_NAME_KEY, sessionName)
                .log(new AmqpException(true,
                        String.format("Cannot create coordinator send link %s from a closed session.", TRANSACTION_LINK_NAME),
                        sessionHandler.getErrorContext())));
        }

        final TransactionCoordinator existing = transactionCoordinator.get();
        if (existing != null) {
            logger.atVerbose()
                .addKeyValue("coordinator", TRANSACTION_LINK_NAME)
                .log("Returning existing transaction coordinator.");
            return Mono.just(existing);
        }

        return createProducer(TRANSACTION_LINK_NAME, TRANSACTION_LINK_NAME, new Coordinator(), retryOptions, null,
            false)
            .map(link -> {
                final TransactionCoordinator newCoordinator = new TransactionCoordinator(link, messageSerializer);
                if (transactionCoordinator.compareAndSet(null, newCoordinator)) {
                    return newCoordinator;
                } else {
                    return transactionCoordinator.get();
                }
            })
            .or(onClosedError("Connection closed while waiting for transaction coordinator creation.", NOT_APPLICABLE, NOT_APPLICABLE));
    }

    /**
     * Creates an {@link AmqpReceiveLink} that has AMQP specific capabilities set.
     *
     * Filters can be applied to the source when receiving to inform the source to filter the items sent to the
     * consumer. See
     * <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#doc-idp326640">Filtering
     * Messages</a> and <a href="https://www.amqp.org/specification/1.0/filters">AMQP Filters</a> for more information.
     *
     * @param linkName Name of the receive link.
     * @param entityPath Address in the message broker for the link.
     * @param timeout Operation timeout when creating the link.
     * @param retry Retry policy to apply when link creation times out.
     * @param sourceFilters Add any filters to the source when creating the receive link.
     * @param receiverProperties Any properties to associate with the receive link when attaching to message
     *     broker.
     * @param receiverDesiredCapabilities Capabilities that the receiver link supports.
     * @param senderSettleMode Amqp {@link SenderSettleMode} mode for receiver.
     * @param receiverSettleMode Amqp {@link ReceiverSettleMode} mode for receiver.
     *
     * @return A new instance of an {@link AmqpReceiveLink} with the correct properties set.
     */
    protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPath, Duration timeout,
        AmqpRetryPolicy retry, Map<Symbol, Object> sourceFilters, Map<Symbol, Object> receiverProperties,
        Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode,
        ReceiverSettleMode receiverSettleMode) {

        if (isDisposed()) {
            LoggingEventBuilder logBuilder = logger.atError()
                .addKeyValue(SESSION_NAME_KEY, sessionName)
                .addKeyValue(ENTITY_PATH_KEY, entityPath)
                .addKeyValue(LINK_NAME_KEY, linkName);

            // TODO(limolkova) this can be simplified with FluxUtil.monoError(LoggingEventBuilder), not using it for now
            // to allow using azure-core-amqp with stable azure-core 1.24.0 to simplify dependency management
            // we should switch to it once monoError(LoggingEventBuilder) ships in stable azure-core
            return Mono.error(logBuilder
                .log(Exceptions.propagate(new AmqpException(true, "Cannot create receive link from a closed session.", sessionHandler.getErrorContext()))));
        }

        final LinkSubscription<AmqpReceiveLink> existingLink = openReceiveLinks.get(linkName);
        if (existingLink != null) {
            logger.atInfo()
                .addKeyValue(LINK_NAME_KEY, linkName)
                .addKeyValue(ENTITY_PATH_KEY, entityPath)
                .log("Returning existing receive link.");
            return Mono.just(existingLink.getLink());
        }

        final TokenManager tokenManager = tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath);
        return Mono.when(onActiveEndpoint(), tokenManager.authorize())
            .then(Mono.create((Consumer<MonoSink<AmqpReceiveLink>>) sink -> {
                try {
                    // This has to be executed using reactor dispatcher because it's possible to run into race
                    // conditions with proton-j.
                    provider.getReactorDispatcher().invoke(() -> {
                        final LinkSubscription<AmqpReceiveLink> computed = openReceiveLinks.compute(linkName,
                            (linkNameKey, existing) -> {
                                if (existing != null) {
                                    logger.atInfo()
                                        .addKeyValue(LINK_NAME_KEY, linkName)
                                        .log("Another receive link exists. Disposing of new one.");
                                    tokenManager.close();

                                    return existing;
                                }

                                logger.atInfo()
                                    .addKeyValue(SESSION_NAME_KEY, sessionName)
                                    .addKeyValue(LINK_NAME_KEY, linkName)
                                    .log("Creating a new receiver link.");

                                return getSubscription(linkNameKey, entityPath, sourceFilters, receiverProperties,
                                    receiverDesiredCapabilities, senderSettleMode, receiverSettleMode, tokenManager);
                            });

                        sink.success(computed.getLink());
                    });
                } catch (IOException | RejectedExecutionException e) {
                    sink.error(e);
                }
            }))
            .onErrorResume(t -> Mono.defer(() -> {
                tokenManager.close();
                return Mono.error(t);
            }));
    }

    /**
     * Given the entity path, associated receiver and link handler, creates the receive link instance.
     */
    protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
        ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
        return new ReactorReceiver(amqpConnection, entityPath, receiver, receiveLinkHandler, tokenManager,
            reactorProvider.getReactorDispatcher(), retryOptions);
    }

    /**
     * Creates an {@link AmqpLink} that has AMQP specific capabilities set.
     *
     * @param linkName Name of the receive link.
     * @param entityPath Address in the message broker for the link.
     * @param linkProperties The properties needed to be set on the link.
     * @param timeout Operation timeout when creating the link.
     * @param retry Retry policy to apply when link creation times out.
     *
     * @return A new instance of an {@link AmqpLink} with the correct properties set.
     */
    protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
        AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties) {

        final Target target = new Target();
        target.setAddress(entityPath);

        final AmqpRetryOptions options = retry != null
            ? new AmqpRetryOptions(retry.getRetryOptions())
            : new AmqpRetryOptions();

        if (timeout != null) {
            options.setTryTimeout(timeout);
        }

        return createProducer(linkName, entityPath, target, options, linkProperties, true)
            .cast(AmqpLink.class);
    }

    private Mono<AmqpSendLink> createProducer(String linkName, String entityPath,
        org.apache.qpid.proton.amqp.transport.Target target, AmqpRetryOptions options,
        Map<Symbol, Object> linkProperties, boolean requiresAuthorization) {

        if (isDisposed()) {
            LoggingEventBuilder logBuilder = logger.atError()
                .addKeyValue(SESSION_NAME_KEY, sessionName)
                .addKeyValue(ENTITY_PATH_KEY, entityPath)
                .addKeyValue(LINK_NAME_KEY, linkName);

            // TODO(limolkova) this can be simplified with FluxUtil.monoError(LoggingEventBuilder), not using it for now
            // to allow using azure-core-amqp with stable azure-core 1.24.0 to simplify dependency management
            // we should switch to it once monoError(LoggingEventBuilder) ships in stable azure-core
            return Mono.error(logBuilder
                .log(Exceptions.propagate(new AmqpException(true, "Cannot create send link from a closed session.", sessionHandler.getErrorContext()))));
        }

        final LinkSubscription<AmqpSendLink> existing = openSendLinks.get(linkName);
        if (existing != null) {
            logger.atVerbose()
                .addKeyValue(LINK_NAME_KEY, linkName)
                .log("Returning existing send link.");
            return Mono.just(existing.getLink());
        }

        final TokenManager tokenManager;
        final Mono<Long> authorize;
        if (requiresAuthorization) {
            tokenManager = tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath);
            authorize = tokenManager.authorize();
        } else {
            tokenManager = null;
            authorize = Mono.empty();
        }

        return Mono.when(onActiveEndpoint(), authorize).then(Mono.create(sink -> {
            try {
                // We have to invoke this in the same thread or else proton-j will not properly link up the created
                // sender because the link names are not unique. Link name == entity path.
                provider.getReactorDispatcher().invoke(() -> {
                    final LinkSubscription<AmqpSendLink> computed = openSendLinks.compute(linkName,
                        (linkNameKey, existingLink) -> {
                            if (existingLink != null) {
                                logger.atInfo()
                                    .addKeyValue(LINK_NAME_KEY, linkName)
                                    .log("Another send link exists. Disposing of new one.");

                                if (tokenManager != null) {
                                    tokenManager.close();
                                }
                                return existingLink;
                            }

                            logger.atInfo()
                                .addKeyValue(LINK_NAME_KEY, linkName)
                                .addKeyValue(SESSION_NAME_KEY, sessionName)
                                .log("Creating a new send link.");

                            return getSubscription(linkName, entityPath, target, linkProperties, options,
                                tokenManager);
                        });

                    sink.success(computed.getLink());
                });
            } catch (IOException | RejectedExecutionException e) {
                sink.error(e);
            }
        }));
    }

    /**
     * NOTE: Ensure this is invoked using the reactor dispatcher because proton-j is not thread-safe.
     */
    private LinkSubscription<AmqpSendLink> getSubscription(String linkName, String entityPath,
        org.apache.qpid.proton.amqp.transport.Target target, Map<Symbol, Object> linkProperties,
        AmqpRetryOptions options, TokenManager tokenManager) {

        final Sender sender = session.sender(linkName);
        sender.setTarget(target);

        final Source source = new Source();
        sender.setSource(source);
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);

        if (linkProperties != null && linkProperties.size() > 0) {
            sender.setProperties(linkProperties);
        }

        final SendLinkHandler sendLinkHandler = handlerProvider.createSendLinkHandler(
            sessionHandler.getConnectionId(), sessionHandler.getHostname(), linkName, entityPath);
        BaseHandler.setHandler(sender, sendLinkHandler);

        sender.open();

        final ReactorSender reactorSender = new ReactorSender(amqpConnection, entityPath, sender, sendLinkHandler,
            provider, tokenManager, messageSerializer, options, timeoutScheduler);

        //@formatter:off
        final Disposable subscription = reactorSender.getEndpointStates().subscribe(state -> {
        }, error -> {
            if (!isDisposed.get()) {
                removeLink(openSendLinks, linkName);
            }
        }, () -> {
            if (!isDisposed.get()) {
                logger.atInfo()
                    .addKeyValue(LINK_NAME_KEY, linkName)
                    .log("Complete. Removing and disposing send link.");

                removeLink(openSendLinks, linkName);
            }
        });
        //@formatter:on

        return new LinkSubscription<>(reactorSender, subscription,
            String.format("connectionId[%s] session[%s]: Setting error on receive link.",
                sessionHandler.getConnectionId(), sessionName));
    }

    /**
     * NOTE: Ensure this is invoked using the reactor dispatcher because proton-j is not thread-safe.
     */
    private LinkSubscription<AmqpReceiveLink> getSubscription(String linkName, String entityPath,
        Map<Symbol, Object> sourceFilters, Map<Symbol, Object> receiverProperties,
        Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode,
        TokenManager tokenManager) {

        final Receiver receiver = session.receiver(linkName);
        final Source source = new Source();
        source.setAddress(entityPath);

        if (sourceFilters != null && sourceFilters.size() > 0) {
            source.setFilter(sourceFilters);
        }

        receiver.setSource(source);

        final Target target = new Target();
        receiver.setTarget(target);

        // Use explicit settlement via dispositions (not pre-settled)
        receiver.setSenderSettleMode(senderSettleMode);
        receiver.setReceiverSettleMode(receiverSettleMode);

        if (receiverProperties != null && !receiverProperties.isEmpty()) {
            receiver.setProperties(receiverProperties);
        }

        if (receiverDesiredCapabilities != null && receiverDesiredCapabilities.length > 0) {
            receiver.setDesiredCapabilities(receiverDesiredCapabilities);
        }

        final ReceiveLinkHandler receiveLinkHandler = handlerProvider.createReceiveLinkHandler(
            sessionHandler.getConnectionId(), sessionHandler.getHostname(), linkName, entityPath);
        BaseHandler.setHandler(receiver, receiveLinkHandler);

        receiver.open();

        final ReactorReceiver reactorReceiver = createConsumer(entityPath, receiver, receiveLinkHandler,
            tokenManager, provider);

        final Disposable subscription = reactorReceiver.getEndpointStates().subscribe(state -> {
        }, error -> {
            if (!isDisposed.get()) {
                removeLink(openReceiveLinks, linkName);
            }
        }, () -> {
            if (!isDisposed.get()) {
                logger.atInfo()
                    .addKeyValue(LINK_NAME_KEY, linkName)
                    .addKeyValue(ENTITY_PATH_KEY, entityPath)
                    .log("Complete. Removing receive link.");

                removeLink(openReceiveLinks, linkName);
            }
        });

        return new LinkSubscription<>(reactorReceiver, subscription,
            String.format("connectionId[%s] sessionName[%s]: Setting error on receive link.", amqpConnection.getId(),
                sessionName));
    }

    /**
     * Returns a Mono that completes when the connection handler is closed. If it does, an {@link AmqpException} is
     * returned. It indicates that a shutdown was initiated and we should stop.
     *
     * @return A Mono that completes when the shutdown signal is emitted. If it does, returns an error.
     */
    private <T> Mono<T> onClosedError(String message, String linkName, String entityPath) {
        return Mono.firstWithSignal(isClosedMono.asMono(), shutdownSignals.next())
            .then(Mono.error(new AmqpException(false,
                String.format("connectionId[%s] entityPath[%s] linkName[%s] Connection closed. %s", sessionHandler.getConnectionId(), entityPath, linkName, message),
                sessionHandler.getErrorContext())));
    }

    /**
     * Asynchronously waits for the session's active endpoint state.
     *
     * @return A mono that completes when the session is active.
     */
    private Mono<Void> onActiveEndpoint() {
        return RetryUtil.withRetry(getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE),
            retryOptions, activeTimeoutMessage)
            .then();
    }

    private void handleClose() {
        logger.atVerbose()
            .addKeyValue(SESSION_NAME_KEY, sessionName)
            .log("Disposing of active send and receive links due to session close.");

        closeAsync().subscribe();
    }

    private void handleError(Throwable error) {
        logger.atVerbose()
            .addKeyValue(SESSION_NAME_KEY, sessionName)
            .log("Disposing of active links due to error.");

        final ErrorCondition condition;
        if (error instanceof AmqpException) {
            final AmqpException exception = ((AmqpException) error);
            final String errorCondition = exception.getErrorCondition() != null
                ? exception.getErrorCondition().getErrorCondition() : "UNKNOWN";

            condition = new ErrorCondition(Symbol.getSymbol(errorCondition), exception.getMessage());

            closeAsync(exception.getMessage(), condition, true).subscribe();
        } else {
            condition = null;
        }

        closeAsync(error.getMessage(), condition, true).subscribe();
    }

    /**
     * Takes care of setting the error condition on the session, closing the children if specified and then waiting
     *
     * @param errorCondition Condition to set on the session.
     * @param disposeLinks {@code true} to dispose of children. {@code false} to ignore them, this may be the case
     *     when the {@link AmqpConnection} passes a shutdown signal.
     */
    private void disposeWork(ErrorCondition errorCondition, boolean disposeLinks) {
        if (session.getLocalState() != EndpointState.CLOSED) {
            session.close();

            if (errorCondition != null && session.getCondition() == null) {
                session.setCondition(errorCondition);
            }
        }

        final ArrayList<Mono<Void>> closingLinks = new ArrayList<>();
        if (disposeLinks) {
            synchronized (closeLock) {
                openReceiveLinks.values().forEach(link -> {
                    if (link == null) {
                        return;
                    }

                    closingLinks.add(link.closeAsync(errorCondition));
                });
                openSendLinks.values().forEach(link -> {
                    if (link == null) {
                        return;
                    }

                    closingLinks.add(link.closeAsync(errorCondition));
                });
            }
        }

        // We want to complete the session so that the parent connection isn't waiting.
        Mono<Void> closeLinksMono = Mono.when(closingLinks).timeout(retryOptions.getTryTimeout())
            .onErrorResume(error -> {
                logger.atWarning()
                    .addKeyValue(SESSION_NAME_KEY, sessionName)
                    .log("Timed out waiting for all links to close.", error);
                return Mono.empty();
            })
            .then(Mono.fromRunnable(() -> {
                isClosedMono.emitEmpty((signalType, result) -> {
                    addSignalTypeAndResult(logger.atWarning(), signalType, result)
                        .addKeyValue(SESSION_NAME_KEY, sessionName)
                        .log("Unable to emit shutdown signal.");

                    return false;
                });

                sessionHandler.close();
                connectionSubscriptions.dispose();
            }));

        connectionSubscriptions.add(closeLinksMono.subscribe());
    }

    private <T extends AmqpLink> boolean removeLink(ConcurrentMap<String, LinkSubscription<T>> openLinks, String key) {
        if (key == null) {
            return false;
        }

        synchronized (closeLock) {
            final LinkSubscription<T> removed = openLinks.remove(key);
            if (removed != null) {
                removed.closeAsync(null).subscribe();
            }

            return removed != null;
        }
    }

    private static final class LinkSubscription<T extends AmqpLink> {
        private final AtomicBoolean isDisposed = new AtomicBoolean();
        private final T link;
        private final Disposable subscription;
        private final String errorMessage;

        private LinkSubscription(T link, Disposable subscription, String errorMessage) {
            this.link = link;
            this.subscription = subscription;
            this.errorMessage = errorMessage;
        }

        public T getLink() {
            return link;
        }

        Mono<Void> closeAsync(ErrorCondition errorCondition) {
            if (isDisposed.getAndSet(true)) {
                return Mono.empty();
            }

            subscription.dispose();

            if (link instanceof ReactorReceiver) {
                return ((ReactorReceiver) link).closeAsync(errorMessage, errorCondition);
            } else if (link instanceof ReactorSender) {
                return ((ReactorSender) link).closeAsync(errorMessage, errorCondition);
            } else {
                link.dispose();
                return Mono.empty();
            }
        }
    }
}