EventHubReactorAmqpConnection.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import java.util.HashMap;
import java.util.Map;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.ENTITY_PATH_KEY;

/**
 * A proton-j AMQP connection to an Azure Event Hub instance. Adds additional support for management operations.
 */
public class EventHubReactorAmqpConnection extends ReactorConnection implements EventHubAmqpConnection {
    private static final String MANAGEMENT_SESSION_NAME = "mgmt-session";
    private static final String MANAGEMENT_LINK_NAME = "mgmt";
    private static final String MANAGEMENT_ADDRESS = "$management";

    private final ClientLogger logger;
    private final TokenCredential tokenCredential;
    private final String connectionId;
    private final ReactorProvider reactorProvider;
    private final ReactorHandlerProvider handlerProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final AmqpRetryOptions retryOptions;
    private final MessageSerializer messageSerializer;
    private final Scheduler scheduler;
    private final String eventHubName;

    private volatile ManagementChannel managementChannel;

    /**
     * Creates a new AMQP connection that uses proton-j.
     *
     * @param connectionId Identifier for the connection.
     * @param connectionOptions A set of options used to create the AMQP connection.
     * @param reactorProvider Provides proton-j reactor instances.
     * @param handlerProvider Provides {@link BaseHandler} to listen to proton-j reactor events.
     * @param tokenManagerProvider Provides a token manager for authorizing with CBS node.
     * @param messageSerializer Serializes and deserializes proton-j messages.
     */
    public EventHubReactorAmqpConnection(String connectionId, ConnectionOptions connectionOptions, String eventHubName,
        ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider,
        TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer) {
        super(connectionId, connectionOptions, reactorProvider, handlerProvider, tokenManagerProvider,
            messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND);
        this.connectionId = connectionId;
        this.reactorProvider = reactorProvider;
        this.handlerProvider = handlerProvider;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.eventHubName = eventHubName;
        this.retryOptions = connectionOptions.getRetry();
        this.tokenCredential = connectionOptions.getTokenCredential();
        this.scheduler = connectionOptions.getScheduler();

        Map<String, Object> loggingContext = new HashMap<>(1);
        loggingContext.put(CONNECTION_ID_KEY, connectionId);
        this.logger = new ClientLogger(EventHubReactorAmqpConnection.class, loggingContext);
    }

    @Override
    public Mono<EventHubManagementNode> getManagementNode() {
        if (isDisposed()) {
            return Mono.error(logger.logExceptionAsError(new IllegalStateException(String.format(
                "connectionId[%s]: Connection is disposed. Cannot get management instance", connectionId))));
        }

        return getReactorConnection().then(Mono.fromCallable(this::getOrCreateManagementChannel));
    }

    /**
     * Creates or gets a send link. The same link is returned if there is an existing send link with the same {@code
     * linkName}. Otherwise, a new link is created and returned.
     *
     * @param linkName The name of the link.
     * @param entityPath The remote address to connect to for the message broker.
     * @param retryOptions Options to use when creating the link.
     * @return A new or existing send link that is connected to the given {@code entityPath}.
     */
    @Override
    public Mono<AmqpSendLink> createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions) {
        return createSession(entityPath).flatMap(session -> {
            logger.atVerbose()
                .addKeyValue(ENTITY_PATH_KEY, entityPath)
                .log("Get or create producer.");
            final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);

            return session.createProducer(linkName, entityPath, retryOptions.getTryTimeout(), retryPolicy)
                .cast(AmqpSendLink.class);
        });
    }

    /**
     * Creates or gets an existing receive link. The same link is returned if there is an existing receive link with the
     * same {@code linkName}. Otherwise, a new link is created and returned.
     *
     * @param linkName The name of the link.
     * @param entityPath The remote address to connect to for the message broker.
     * @param eventPosition Position to set the receive link to.
     * @param options Consumer options to use when creating the link.
     * @return A new or existing receive link that is connected to the given {@code entityPath}.
     */
    @Override
    public Mono<AmqpReceiveLink> createReceiveLink(String linkName, String entityPath, EventPosition eventPosition,
        ReceiveOptions options) {
        return createSession(entityPath).cast(EventHubSession.class)
            .flatMap(session -> {
                logger.atVerbose()
                    .addKeyValue(ENTITY_PATH_KEY, entityPath)
                    .log("Get or create consumer.");
                final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);

                return session.createConsumer(linkName, entityPath, retryOptions.getTryTimeout(), retryPolicy,
                    eventPosition, options);
            });
    }

    @Override
    public void dispose() {
        if (isDisposed()) {
            return;
        }

        if (managementChannel != null) {
            managementChannel.close();
        }

        super.dispose();
    }

    @Override
    protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) {
        return new EventHubReactorSession(this, session, handler, sessionName, reactorProvider,
            handlerProvider, getClaimsBasedSecurityNode(), tokenManagerProvider, retryOptions, messageSerializer);
    }

    private synchronized ManagementChannel getOrCreateManagementChannel() {
        if (managementChannel == null) {
            managementChannel = new ManagementChannel(
                createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS),
                eventHubName, tokenCredential, tokenManagerProvider, this.messageSerializer, scheduler);
        }

        return managementChannel;
    }
}