ReactorHandlerProvider.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.handler.ConnectionHandler;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.amqp.implementation.handler.WebSocketsConnectionHandler;
import com.azure.core.amqp.implementation.handler.WebSocketsProxyConnectionHandler;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.reactor.Reactor;

import java.time.Duration;
import java.util.Locale;

/**
 * Provides handlers for the various types of links.
 */
public class ReactorHandlerProvider {
    private final ClientLogger logger = new ClientLogger(ReactorHandlerProvider.class);
    private final ReactorProvider provider;

    /**
     * Creates a new instance with the reactor provider to handle {@link ReactorDispatcher ReactorDispatchers} to its
     * generated handlers.
     *
     * @param provider The provider that creates and manages {@link Reactor} instances.
     */
    public ReactorHandlerProvider(ReactorProvider provider) {
        this.provider = provider;
    }

    /**
     * Creates a new connection handler with the given {@code connectionId} and {@code hostname}.
     *
     * @param connectionId Identifier associated with this connection.
     * @param hostname Host for the connection handler.
     * @param transportType Transport type used for the connection.
     * @param proxyOptions The options to use for proxy.
     * @param product The name of the product this connection handler is created for.
     * @param clientVersion The version of the client library creating the connection handler.
     * @param clientOptions provided by user.
     * @return A new {@link ConnectionHandler}.
     */
    public ConnectionHandler createConnectionHandler(String connectionId, String hostname,
            AmqpTransportType transportType, ProxyOptions proxyOptions, String product, String clientVersion,
            SslDomain.VerifyMode verifyMode, ClientOptions clientOptions) {
        switch (transportType) {
            case AMQP:
                return new ConnectionHandler(connectionId, hostname, product, clientVersion, verifyMode, clientOptions);
            case AMQP_WEB_SOCKETS:
                if (proxyOptions != null && proxyOptions.isProxyAddressConfigured()) {
                    return new WebSocketsProxyConnectionHandler(connectionId, hostname, proxyOptions, product,
                        clientVersion, verifyMode, clientOptions);
                } else if (WebSocketsProxyConnectionHandler.shouldUseProxy(hostname)) {
                    logger.info("System default proxy configured for hostname '{}'. Using proxy.", hostname);
                    return new WebSocketsProxyConnectionHandler(connectionId, hostname,
                        ProxyOptions.SYSTEM_DEFAULTS, product, clientVersion, verifyMode, clientOptions);
                } else {
                    return new WebSocketsConnectionHandler(connectionId, hostname, product, clientVersion, verifyMode,
                        clientOptions);
                }
            default:
                throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US,
                    "This transport type '%s' is not supported.", transportType)));
        }
    }

    /**
     * Creates a new session handler with the given {@code connectionId}, {@code hostname}, and {@code sessionName}.
     *
     * @param connectionId Identifier of the parent connection that created this session.
     * @param hostname Host of the parent connection.
     * @param sessionName Name of the session.
     * @param openTimeout Duration to wait for the session to open.
     * @return A new {@link SessionHandler}.
     */
    public SessionHandler createSessionHandler(String connectionId, String hostname, String sessionName,
                                               Duration openTimeout) {
        return new SessionHandler(connectionId, hostname, sessionName, provider.getReactorDispatcher(), openTimeout);
    }

    /**
     * Creates a new link handler for sending messages.
     *
     * @param connectionId Identifier of the parent connection that created this session.
     * @param fullyQualifiedNamespace Fully qualified namespace of the parent connection.
     * @param senderName Name of the send link.
     * @return A new {@link SendLinkHandler}.
     */
    public SendLinkHandler createSendLinkHandler(String connectionId, String fullyQualifiedNamespace, String senderName,
                                                 String entityPath) {
        return new SendLinkHandler(connectionId, fullyQualifiedNamespace, senderName, entityPath);
    }

    /**
     * Creates a new link handler for receiving messages.
     *
     * @param connectionId Identifier of the parent connection that created this session.
     * @param fullyQualifiedNamespace Fully qualified namespace of the parent connection.
     * @param receiverName Name of the send link.
     * @return A new {@link ReceiveLinkHandler}.
     */
    public ReceiveLinkHandler createReceiveLinkHandler(String connectionId, String fullyQualifiedNamespace,
            String receiverName, String entityPath) {
        return new ReceiveLinkHandler(connectionId, fullyQualifiedNamespace, receiverName, entityPath);
    }
}