ServiceBusClientBuilder.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.AzureTokenManagerProvider;
import com.azure.core.amqp.implementation.CbsAuthorizationType;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.annotation.ServiceClientProtocol;
import com.azure.core.credential.TokenCredential;
import com.azure.core.exception.AzureException;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
import org.apache.qpid.proton.engine.SslDomain;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.InetSocketAddress;
import java.net.Proxy;
import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.Pattern;

/**
 * The builder to create Service Bus clients:
 *
 *
 * <p><strong>Clients for sending messages</strong></p>
 * <ul>
 * <li>{@link ServiceBusSenderAsyncClient}</li>
 * <li>{@link ServiceBusSenderClient}</li>
 * </ul>
 *
 * <p><strong>Clients for receiving messages</strong></p>
 * <ul>
 * <li>{@link ServiceBusReceiverAsyncClient}</li>
 * <li>{@link ServiceBusReceiverClient}</li>
 * </ul>
 *
 * <p><strong>Clients for receiving messages from a session-enabled Service Bus entity</strong></p>
 * <ul>
 * <li>{@link ServiceBusSessionReceiverAsyncClient}</li>
 * <li>{@link ServiceBusSessionReceiverClient}</li>
 * </ul>
 *
 * <p><strong>Client for receiving messages using a callback-based processor</strong></p>
 * <ul>
 * <li>{@link ServiceBusProcessorClient}</li>
 * </ul>
 */
@ServiceClientBuilder(serviceClients = {ServiceBusReceiverAsyncClient.class, ServiceBusSenderAsyncClient.class,
    ServiceBusSenderClient.class, ServiceBusReceiverClient.class, ServiceBusProcessorClient.class},
    protocol = ServiceClientProtocol.AMQP)
public final class ServiceBusClientBuilder {
    private static final AmqpRetryOptions DEFAULT_RETRY =
        new AmqpRetryOptions().setTryTimeout(ServiceBusConstants.OPERATION_TIMEOUT);

    private static final String SERVICE_BUS_PROPERTIES_FILE = "azure-messaging-servicebus.properties";
    private static final String SUBSCRIPTION_ENTITY_PATH_FORMAT = "%s/subscriptions/%s";
    private static final String DEAD_LETTER_QUEUE_NAME_SUFFIX = "/$deadletterqueue";
    private static final String TRANSFER_DEAD_LETTER_QUEUE_NAME_SUFFIX = "/$Transfer/$deadletterqueue";

    // Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application
    // receiving messages at a slow rate. Applications can set it to a higher value if they need better performance.
    private static final int DEFAULT_PREFETCH_COUNT = 0;
    private static final String NAME_KEY = "name";
    private static final String VERSION_KEY = "version";
    private static final String UNKNOWN = "UNKNOWN";
    private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");
    private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5);

    private final Object connectionLock = new Object();
    private final ClientLogger logger = new ClientLogger(ServiceBusClientBuilder.class);
    private final MessageSerializer messageSerializer = new ServiceBusMessageSerializer();
    private final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));

    private ClientOptions clientOptions;
    private Configuration configuration;
    private ServiceBusConnectionProcessor sharedConnection;
    private String connectionStringEntityName;
    private TokenCredential credentials;
    private String fullyQualifiedNamespace;
    private ProxyOptions proxyOptions;
    private AmqpRetryOptions retryOptions;
    private Scheduler scheduler;
    private AmqpTransportType transport = AmqpTransportType.AMQP;
    private SslDomain.VerifyMode verifyMode;

    /**
     * Keeps track of the open clients that were created from this builder when there is a shared connection.
     */
    private final AtomicInteger openClients = new AtomicInteger();

    /**
     * Creates a new instance with the default transport {@link AmqpTransportType#AMQP}.
     */
    public ServiceBusClientBuilder() {
    }

    /**
     * Sets the {@link ClientOptions} to be sent from the client built from this builder, enabling customization of
     * certain properties, as well as support the addition of custom header information. Refer to the {@link
     * ClientOptions} documentation for more information.
     *
     * @param clientOptions to be set on the client.
     *
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    public ServiceBusClientBuilder clientOptions(ClientOptions clientOptions) {
        this.clientOptions = clientOptions;
        return this;
    }

    /**
     * Sets the connection string for a Service Bus namespace or a specific Service Bus resource.
     *
     * @param connectionString Connection string for a Service Bus namespace or a specific Service Bus resource.
     *
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    public ServiceBusClientBuilder connectionString(String connectionString) {
        final ConnectionStringProperties properties = new ConnectionStringProperties(connectionString);
        final TokenCredential tokenCredential;
        try {
            tokenCredential = getTokenCredential(properties);
        } catch (Exception e) {
            throw logger.logExceptionAsError(
                new AzureException("Could not create the ServiceBusSharedKeyCredential.", e));
        }

        this.fullyQualifiedNamespace = properties.getEndpoint().getHost();

        if (properties.getEntityPath() != null && !properties.getEntityPath().isEmpty()) {
            logger.info("Setting 'entityName' [{}] from connectionString.", properties.getEntityPath());
            this.connectionStringEntityName = properties.getEntityPath();
        }

        return credential(properties.getEndpoint().getHost(), tokenCredential);
    }

    private TokenCredential getTokenCredential(ConnectionStringProperties properties) {
        TokenCredential tokenCredential;
        if (properties.getSharedAccessSignature() == null) {
            tokenCredential = new ServiceBusSharedKeyCredential(properties.getSharedAccessKeyName(),
                properties.getSharedAccessKey(), ServiceBusConstants.TOKEN_VALIDITY);
        } else {
            tokenCredential = new ServiceBusSharedKeyCredential(properties.getSharedAccessSignature());
        }
        return tokenCredential;
    }

    /**
     * Sets the configuration store that is used during construction of the service client.
     *
     * If not specified, the default configuration store is used to configure Service Bus clients. Use {@link
     * Configuration#NONE} to bypass using configuration settings during construction.
     *
     * @param configuration The configuration store used to configure Service Bus clients.
     *
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    public ServiceBusClientBuilder configuration(Configuration configuration) {
        this.configuration = configuration;
        return this;
    }

    /**
     * Sets the credential for the Service Bus resource.
     *
     * @param fullyQualifiedNamespace for the Service Bus.
     * @param credential {@link TokenCredential} to be used for authentication.
     *
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenCredential credential) {

        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
            "'fullyQualifiedNamespace' cannot be null.");
        this.credentials = Objects.requireNonNull(credential, "'credential' cannot be null.");

        if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
            throw logger.logExceptionAsError(
                new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string."));
        }

        return this;
    }

    /**
     * Sets the proxy configuration to use for {@link ServiceBusSenderAsyncClient}. When a proxy is configured, {@link
     * AmqpTransportType#AMQP_WEB_SOCKETS} must be used for the transport type.
     *
     * @param proxyOptions The proxy configuration to use.
     *
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    public ServiceBusClientBuilder proxyOptions(ProxyOptions proxyOptions) {
        this.proxyOptions = proxyOptions;
        return this;
    }

    /**
     * Package-private method that sets the verify mode for this connection.
     *
     * @param verifyMode The verification mode.
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    ServiceBusClientBuilder verifyMode(SslDomain.VerifyMode verifyMode) {
        this.verifyMode = verifyMode;
        return this;
    }

    /**
     * Sets the retry options for Service Bus clients. If not specified, the default retry options are used.
     *
     * @param retryOptions The retry options to use.
     *
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    public ServiceBusClientBuilder retryOptions(AmqpRetryOptions retryOptions) {
        this.retryOptions = retryOptions;
        return this;
    }

    /**
     * Sets the scheduler to use.
     *
     * @param scheduler Scheduler to be used.
     *
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    ServiceBusClientBuilder scheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
        return this;
    }

    /**
     * Sets the transport type by which all the communication with Azure Service Bus occurs. Default value is {@link
     * AmqpTransportType#AMQP}.
     *
     * @param transportType The transport type to use.
     *
     * @return The updated {@link ServiceBusClientBuilder} object.
     */
    public ServiceBusClientBuilder transportType(AmqpTransportType transportType) {
        this.transport = transportType;
        return this;
    }

    /**
     * A new instance of {@link ServiceBusSenderClientBuilder} used to configure Service Bus message senders.
     *
     * @return A new instance of {@link ServiceBusSenderClientBuilder}.
     */
    public ServiceBusSenderClientBuilder sender() {
        return new ServiceBusSenderClientBuilder();
    }

    /**
     * A new instance of {@link ServiceBusReceiverClientBuilder} used to configure Service Bus message receivers.
     *
     * @return A new instance of {@link ServiceBusReceiverClientBuilder}.
     */
    public ServiceBusReceiverClientBuilder receiver() {
        return new ServiceBusReceiverClientBuilder();
    }

    /**
     * A new instance of {@link ServiceBusSessionReceiverClientBuilder} used to configure <b>session aware</b> Service
     * Bus message receivers.
     *
     * @return A new instance of {@link ServiceBusSessionReceiverClientBuilder}.
     */
    public ServiceBusSessionReceiverClientBuilder sessionReceiver() {
        return new ServiceBusSessionReceiverClientBuilder();
    }

    /**
     * A new instance of {@link ServiceBusProcessorClientBuilder} used to configure {@link ServiceBusProcessorClient}
     * instance.
     *
     * @return A new instance of {@link ServiceBusProcessorClientBuilder}.
     */
    public ServiceBusProcessorClientBuilder processor() {
        return new ServiceBusProcessorClientBuilder();
    }

    /**
     * A new instance of {@link ServiceBusSessionProcessorClientBuilder} used to configure a Service Bus processor
     * instance that processes sessions.
     * @return A new instance of {@link ServiceBusSessionProcessorClientBuilder}.
     */
    public ServiceBusSessionProcessorClientBuilder sessionProcessor() {
        return new ServiceBusSessionProcessorClientBuilder();
    }

    /**
     * Called when a child client is closed. Disposes of the shared connection if there are no more clients.
     */
    void onClientClose() {
        synchronized (connectionLock) {
            final int numberOfOpenClients = openClients.decrementAndGet();
            logger.info("Closing a dependent client. # of open clients: {}", numberOfOpenClients);

            if (numberOfOpenClients > 0) {
                return;
            }

            if (numberOfOpenClients < 0) {
                logger.warning("There should not be less than 0 clients. actual: {}", numberOfOpenClients);
            }

            logger.info("No more open clients, closing shared connection [{}].", sharedConnection);
            if (sharedConnection != null) {
                sharedConnection.dispose();
                sharedConnection = null;
            } else {
                logger.warning("Shared ServiceBusConnectionProcessor was already disposed.");
            }
        }
    }

    private ServiceBusConnectionProcessor getOrCreateConnectionProcessor(MessageSerializer serializer) {
        if (retryOptions == null) {
            retryOptions = DEFAULT_RETRY;
        }

        if (scheduler == null) {
            scheduler = Schedulers.elastic();
        }

        synchronized (connectionLock) {
            if (sharedConnection == null) {
                final ConnectionOptions connectionOptions = getConnectionOptions();
                final TokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider(
                    connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(),
                    ServiceBusConstants.AZURE_ACTIVE_DIRECTORY_SCOPE);
                final ReactorProvider provider = new ReactorProvider();
                final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);

                final Map<String, String> properties = CoreUtils.getProperties(SERVICE_BUS_PROPERTIES_FILE);
                final String product = properties.getOrDefault(NAME_KEY, UNKNOWN);
                final String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN);

                final Flux<ServiceBusAmqpConnection> connectionFlux = Mono.fromCallable(() -> {
                    final String connectionId = StringUtil.getRandomString("MF");

                    return (ServiceBusAmqpConnection) new ServiceBusReactorAmqpConnection(connectionId,
                        connectionOptions, provider, handlerProvider, tokenManagerProvider, serializer, product,
                        clientVersion);
                }).repeat();

                sharedConnection = connectionFlux.subscribeWith(new ServiceBusConnectionProcessor(
                    connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getRetry()));
            }
        }

        final int numberOfOpenClients = openClients.incrementAndGet();
        logger.info("# of open clients with shared connection: {}", numberOfOpenClients);

        return sharedConnection;
    }

    private ConnectionOptions getConnectionOptions() {
        configuration = configuration == null ? Configuration.getGlobalConfiguration().clone() : configuration;
        if (credentials == null) {
            throw logger.logExceptionAsError(new IllegalArgumentException("Credentials have not been set. "
                + "They can be set using: connectionString(String), connectionString(String, String), "
                + "or credentials(String, String, TokenCredential)"
            ));
        }

        // If the proxy has been configured by the user but they have overridden the TransportType with something that
        // is not AMQP_WEB_SOCKETS.
        if (proxyOptions != null && proxyOptions.isProxyAddressConfigured()
            && transport != AmqpTransportType.AMQP_WEB_SOCKETS) {
            throw logger.logExceptionAsError(new IllegalArgumentException(
                "Cannot use a proxy when TransportType is not AMQP."));
        }

        if (proxyOptions == null) {
            proxyOptions = getDefaultProxyConfiguration(configuration);
        }

        final CbsAuthorizationType authorizationType = credentials instanceof ServiceBusSharedKeyCredential
            ? CbsAuthorizationType.SHARED_ACCESS_SIGNATURE
            : CbsAuthorizationType.JSON_WEB_TOKEN;
        final ClientOptions options = clientOptions != null ? clientOptions : new ClientOptions();

        final SslDomain.VerifyMode verificationMode = verifyMode != null
            ? verifyMode
            : SslDomain.VerifyMode.VERIFY_PEER_NAME;

        return new ConnectionOptions(fullyQualifiedNamespace, credentials, authorizationType, transport, retryOptions,
            proxyOptions, scheduler, options, verificationMode);
    }

    private ProxyOptions getDefaultProxyConfiguration(Configuration configuration) {
        ProxyAuthenticationType authentication = ProxyAuthenticationType.NONE;
        if (proxyOptions != null) {
            authentication = proxyOptions.getAuthentication();
        }

        String proxyAddress = configuration.get(Configuration.PROPERTY_HTTP_PROXY);

        if (CoreUtils.isNullOrEmpty(proxyAddress)) {
            return ProxyOptions.SYSTEM_DEFAULTS;
        }

        return getProxyOptions(authentication, proxyAddress);
    }

    private ProxyOptions getProxyOptions(ProxyAuthenticationType authentication, String proxyAddress) {
        String host;
        int port;
        if (HOST_PORT_PATTERN.matcher(proxyAddress.trim()).find()) {
            final String[] hostPort = proxyAddress.split(":");
            host = hostPort[0];
            port = Integer.parseInt(hostPort[1]);
            final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(host, port));
            final String username = configuration.get(ProxyOptions.PROXY_USERNAME);
            final String password = configuration.get(ProxyOptions.PROXY_PASSWORD);
            return new ProxyOptions(authentication, proxy, username, password);
        } else {
            com.azure.core.http.ProxyOptions coreProxyOptions = com.azure.core.http.ProxyOptions
                .fromConfiguration(configuration);
            return new ProxyOptions(authentication, new Proxy(coreProxyOptions.getType().toProxyType(),
                coreProxyOptions.getAddress()), coreProxyOptions.getUsername(), coreProxyOptions.getPassword());
        }
    }

    private static boolean isNullOrEmpty(String item) {
        return item == null || item.isEmpty();
    }

    private static MessagingEntityType validateEntityPaths(ClientLogger logger, String connectionStringEntityName,
        String topicName, String queueName) {

        final boolean hasTopicName = !isNullOrEmpty(topicName);
        final boolean hasQueueName = !isNullOrEmpty(queueName);
        final boolean hasConnectionStringEntity = !isNullOrEmpty(connectionStringEntityName);

        final MessagingEntityType entityType;

        if (!hasConnectionStringEntity && !hasQueueName && !hasTopicName) {
            throw logger.logExceptionAsError(new IllegalStateException(
                "Cannot build client without setting either a queueName or topicName."));
        } else if (hasQueueName && hasTopicName) {
            throw logger.logExceptionAsError(new IllegalStateException(String.format(
                "Cannot build client with both queueName (%s) and topicName (%s) set.", queueName, topicName)));
        } else if (hasQueueName) {
            if (hasConnectionStringEntity && !queueName.equals(connectionStringEntityName)) {
                throw logger.logExceptionAsError(new IllegalStateException(String.format(
                    "queueName (%s) is different than the connectionString's EntityPath (%s).",
                    queueName, connectionStringEntityName)));
            }

            entityType = MessagingEntityType.QUEUE;
        } else if (hasTopicName) {
            if (hasConnectionStringEntity && !topicName.equals(connectionStringEntityName)) {
                throw logger.logExceptionAsError(new IllegalStateException(String.format(
                    "topicName (%s) is different than the connectionString's EntityPath (%s).",
                    topicName, connectionStringEntityName)));
            }

            entityType = MessagingEntityType.SUBSCRIPTION;
        } else {
            // It is a connection string entity path.
            entityType = MessagingEntityType.UNKNOWN;
        }

        return entityType;
    }

    private static String getEntityPath(ClientLogger logger, MessagingEntityType entityType, String queueName,
        String topicName, String subscriptionName, SubQueue subQueue) {

        String entityPath;
        switch (entityType) {
            case QUEUE:
                entityPath = queueName;
                break;
            case SUBSCRIPTION:
                if (isNullOrEmpty(subscriptionName)) {
                    throw logger.logExceptionAsError(new IllegalStateException(String.format(
                        "topicName (%s) must have a subscriptionName associated with it.", topicName)));
                }

                entityPath = String.format(Locale.ROOT, SUBSCRIPTION_ENTITY_PATH_FORMAT, topicName,
                    subscriptionName);
                break;
            default:
                throw logger.logExceptionAsError(
                    new IllegalArgumentException("Unknown entity type: " + entityType));
        }

        if (subQueue == null) {
            return entityPath;
        }

        switch (subQueue) {
            case NONE:
                break;
            case TRANSFER_DEAD_LETTER_QUEUE:
                entityPath += TRANSFER_DEAD_LETTER_QUEUE_NAME_SUFFIX;
                break;
            case DEAD_LETTER_QUEUE:
                entityPath += DEAD_LETTER_QUEUE_NAME_SUFFIX;
                break;
            default:
                throw logger.logExceptionAsError(new IllegalArgumentException("Unsupported value of subqueue type: "
                    + subQueue));
        }

        return entityPath;
    }

    /**
     * Builder for creating {@link ServiceBusSenderClient} and {@link ServiceBusSenderAsyncClient} to publish messages
     * to Service Bus.
     *
     * @see ServiceBusSenderAsyncClient
     * @see ServiceBusSenderClient
     */
    @ServiceClientBuilder(serviceClients = {ServiceBusSenderClient.class, ServiceBusSenderAsyncClient.class})
    public final class ServiceBusSenderClientBuilder {
        private String queueName;
        private String topicName;

        private ServiceBusSenderClientBuilder() {
        }

        /**
         * Sets the name of the Service Bus queue to publish messages to.
         *
         * @param queueName Name of the queue.
         *
         * @return The modified {@link ServiceBusSenderClientBuilder} object.
         */
        public ServiceBusSenderClientBuilder queueName(String queueName) {
            this.queueName = queueName;
            return this;
        }

        /**
         * Sets the name of the Service Bus topic to publish messages to.
         *
         * @param topicName Name of the topic.
         *
         * @return The modified {@link ServiceBusSenderClientBuilder} object.
         */
        public ServiceBusSenderClientBuilder topicName(String topicName) {
            this.topicName = topicName;
            return this;
        }

        /**
         * Creates an <b>asynchronous</b> {@link ServiceBusSenderAsyncClient client} for transmitting {@link
         * ServiceBusMessage} to a Service Bus queue or topic.
         *
         * @return A new {@link ServiceBusSenderAsyncClient} for transmitting to a Service queue or topic.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}.
         * @throws IllegalArgumentException if the entity type is not a queue or a topic.
         */
        public ServiceBusSenderAsyncClient buildAsyncClient() {
            final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
            final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
                queueName);

            final String entityName;
            switch (entityType) {
                case QUEUE:
                    entityName = queueName;
                    break;
                case SUBSCRIPTION:
                    entityName = topicName;
                    break;
                case UNKNOWN:
                    entityName = connectionStringEntityName;
                    break;
                default:
                    throw logger.logExceptionAsError(
                        new IllegalArgumentException("Unknown entity type: " + entityType));
            }

            return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions,
                tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null);
        }

        /**
         * Creates a <b>synchronous</b> {@link ServiceBusSenderClient client} for transmitting {@link ServiceBusMessage}
         * to a Service Bus queue or topic.
         *
         * @return A new {@link ServiceBusSenderAsyncClient} for transmitting to a Service queue or topic.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}
         * @throws IllegalArgumentException if the entity type is not a queue or a topic.
         */
        public ServiceBusSenderClient buildClient() {
            return new ServiceBusSenderClient(buildAsyncClient(), MessageUtils.getTotalTimeout(retryOptions));
        }
    }

    /**
     * Builder for creating {@link ServiceBusProcessorClient} to consume messages from a session-based Service Bus
     * entity. {@link ServiceBusProcessorClient} processes messages and errors via {@link #processMessage(Consumer)}
     * and {@link #processError(Consumer)}. When the processor finishes processing a session, it tries to fetch the
     * next session to process.
     *
     * <p>
     * By default, the processor:
     * <ul>
     *     <li>Automatically settles messages. Disabled via {@link #disableAutoComplete()}</li>
     *     <li>Processes 1 session concurrently. Configured via {@link #maxConcurrentSessions(int)}</li>
     *     <li>Invokes 1 instance of {@link #processMessage(Consumer) processMessage consumer}. Configured via
     *     {@link #maxConcurrentCalls(int)}</li>
     * </ul>
     *
     * <p><strong>Instantiate a session-enabled processor client</strong></p>
     * {@codesnippet com.azure.messaging.servicebus.servicebusprocessorclient#session-instantiation}
     *
     * @see ServiceBusProcessorClient
     */
    public final class ServiceBusSessionProcessorClientBuilder {
        private final ServiceBusProcessorClientOptions processorClientOptions;
        private final ServiceBusSessionReceiverClientBuilder sessionReceiverClientBuilder;
        private Consumer<ServiceBusReceivedMessageContext> processMessage;
        private Consumer<ServiceBusErrorContext> processError;

        private ServiceBusSessionProcessorClientBuilder() {
            sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder();
            processorClientOptions = new ServiceBusProcessorClientOptions()
                .setMaxConcurrentCalls(1)
                .setTracerProvider(tracerProvider);
            sessionReceiverClientBuilder.maxConcurrentSessions(1);
        }

        /**
         * Enables session processing roll-over by processing at most {@code maxConcurrentSessions}.
         *
         * @param maxConcurrentSessions Maximum number of concurrent sessions to process at any given time.
         *
         * @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
         * @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1.
         */
        public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
            if (maxConcurrentSessions < 1) {
                throw logger.logExceptionAsError(
                    new IllegalArgumentException("'maxConcurrentSessions' cannot be less than 1"));
            }
            sessionReceiverClientBuilder.maxConcurrentSessions(maxConcurrentSessions);
            return this;
        }

        /**
         * Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
         * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 0.
         *
         * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
         * and before the application starts the processor.
         * Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off.
         * Using a non-zero prefetch risks of losing messages even though it has better performance.
         * @see <a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch">Service Bus Prefetch</a>
         *
         * @param prefetchCount The prefetch count.
         *
         * @return The modified {@link ServiceBusProcessorClientBuilder} object.
         */
        public ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount) {
            sessionReceiverClientBuilder.prefetchCount(prefetchCount);
            return this;
        }

        /**
         * Sets the name of the queue to create a processor for.
         * @param queueName Name of the queue.
         *
         * @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
         */
        public ServiceBusSessionProcessorClientBuilder queueName(String queueName) {
            sessionReceiverClientBuilder.queueName(queueName);
            return this;
        }

        /**
         * Sets the receive mode for the processor.
         * @param receiveMode Mode for receiving messages.
         *
         * @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
         */
        public ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
            sessionReceiverClientBuilder.receiveMode(receiveMode);
            return this;
        }

        /**
         * Sets the name of the subscription in the topic to listen to. <b>{@link #topicName(String)} must also be set.
         * </b>
         * @param subscriptionName Name of the subscription.
         *
         * @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
         * @see #topicName A topic name should be set as well.
         */
        public ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName) {
            sessionReceiverClientBuilder.subscriptionName(subscriptionName);
            return this;
        }

        /**
         * Sets the name of the topic. <b>{@link #subscriptionName(String)} must also be set.</b>
         * @param topicName Name of the topic.
         *
         * @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
         * @see #subscriptionName A subscription name should be set as well.
         */
        public ServiceBusSessionProcessorClientBuilder topicName(String topicName) {
            sessionReceiverClientBuilder.topicName(topicName);
            return this;
        }

        /**
         * The message processing callback for the processor that will be executed when a message is received.
         * @param processMessage The message processing consumer that will be executed when a message is received.
         *
         * @return The updated {@link ServiceBusProcessorClientBuilder} object.
         */
        public ServiceBusSessionProcessorClientBuilder processMessage(
            Consumer<ServiceBusReceivedMessageContext> processMessage) {
            this.processMessage = processMessage;
            return this;
        }

        /**
         * The error handler for the processor which will be invoked in the event of an error while receiving messages.
         * @param processError The error handler which will be executed when an error occurs.
         *
         * @return The updated {@link ServiceBusProcessorClientBuilder} object
         */
        public ServiceBusSessionProcessorClientBuilder processError(
            Consumer<ServiceBusErrorContext> processError) {
            this.processError = processError;
            return this;
        }

        /**
         * Max concurrent messages that this processor should process.
         *
         * @param maxConcurrentCalls max concurrent messages that this processor should process.
         *
         * @return The updated {@link ServiceBusSessionProcessorClientBuilder} object.
         * @throws IllegalArgumentException if {@code maxConcurrentCalls} is less than 1.
         */
        public ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls) {
            if (maxConcurrentCalls < 1) {
                throw logger.logExceptionAsError(
                    new IllegalArgumentException("'maxConcurrentCalls' cannot be less than 1"));
            }
            processorClientOptions.setMaxConcurrentCalls(maxConcurrentCalls);
            return this;
        }

        /**
         * Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is
         * {@link ServiceBusReceivedMessageContext#complete() completed}. If an error happens when
         * the message is processed, it is {@link ServiceBusReceivedMessageContext#abandon()
         * abandoned}.
         *
         * @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
         */
        public ServiceBusSessionProcessorClientBuilder disableAutoComplete() {
            sessionReceiverClientBuilder.disableAutoComplete();
            processorClientOptions.setDisableAutoComplete(true);
            return this;
        }

        /**
         * Creates a <b>session-aware</b> Service Bus processor responsible for reading
         * {@link ServiceBusReceivedMessage messages} from a specific queue or subscription.
         *
         * @return An new {@link ServiceBusProcessorClient} that receives messages from a queue or subscription.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
         *     #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
         * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
         *     queueName()} or {@link #topicName(String) topicName()}, respectively.
         * @throws NullPointerException if the {@link #processMessage(Consumer)} or {@link #processError(Consumer)}
         *     callbacks are not set.
         */
        public ServiceBusProcessorClient buildProcessorClient() {
            return new ServiceBusProcessorClient(sessionReceiverClientBuilder,
                Objects.requireNonNull(processMessage, "'processMessage' cannot be null"),
                Objects.requireNonNull(processError, "'processError' cannot be null"), processorClientOptions);
        }
    }

    /**
     * Builder for creating {@link ServiceBusReceiverClient} and {@link ServiceBusReceiverAsyncClient} to consume
     * messages from a <b>session aware</b> Service Bus entity.
     *
     * @see ServiceBusReceiverAsyncClient
     * @see ServiceBusReceiverClient
     */
    @ServiceClientBuilder(serviceClients = {ServiceBusReceiverClient.class, ServiceBusReceiverAsyncClient.class})
    public final class ServiceBusSessionReceiverClientBuilder {
        private boolean enableAutoComplete = true;
        private Integer maxConcurrentSessions = null;
        private int prefetchCount = DEFAULT_PREFETCH_COUNT;
        private String queueName;
        private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK;
        private String subscriptionName;
        private String topicName;
        private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;

        private ServiceBusSessionReceiverClientBuilder() {
        }

        /**
         * Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is
         * {@link ServiceBusReceiverAsyncClient#complete(ServiceBusReceivedMessage) completed}. If an error happens when
         * the message is processed, it is {@link ServiceBusReceiverAsyncClient#abandon(ServiceBusReceivedMessage)
         * abandoned}.
         *
         * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
         */
        public ServiceBusSessionReceiverClientBuilder disableAutoComplete() {
            this.enableAutoComplete = false;
            return this;
        }

        /**
         * Sets the amount of time to continue auto-renewing the session lock. Setting {@link Duration#ZERO} or
         * {@code null} disables auto-renewal. For {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE}
         * mode, auto-renewal is disabled.
         *
         * @param maxAutoLockRenewDuration the amount of time to continue auto-renewing the session lock.
         * {@link Duration#ZERO} or {@code null} indicates that auto-renewal is disabled.
         *
         * @return The updated {@link ServiceBusSessionReceiverClientBuilder} object.
         * @throws IllegalArgumentException If {code maxAutoLockRenewDuration} is negative.
         */
        public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration) {
            validateAndThrow(maxAutoLockRenewDuration);
            this.maxAutoLockRenewDuration = maxAutoLockRenewDuration;
            return this;
        }

        /**
         * Enables session processing roll-over by processing at most {@code maxConcurrentSessions}.
         *
         * @param maxConcurrentSessions Maximum number of concurrent sessions to process at any given time.
         *
         * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
         * @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1.
         */
        ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
            if (maxConcurrentSessions < 1) {
                throw logger.logExceptionAsError(new IllegalArgumentException(
                    "maxConcurrentSessions cannot be less than 1."));
            }

            this.maxConcurrentSessions = maxConcurrentSessions;
            return this;
        }

        /**
         * Sets the prefetch count of the receiver. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
         * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
         *
         * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
         * and before the application asks for one using {@link ServiceBusReceiverAsyncClient#receiveMessages()}.
         * Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch
         * off.
         *
         * @param prefetchCount The prefetch count.
         *
         * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
         * @throws IllegalArgumentException If {code prefetchCount} is negative.
         */
        public ServiceBusSessionReceiverClientBuilder prefetchCount(int prefetchCount) {
            validateAndThrow(prefetchCount);
            this.prefetchCount = prefetchCount;
            return this;
        }

        /**
         * Sets the name of the queue to create a receiver for.
         *
         * @param queueName Name of the queue.
         *
         * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
         */
        public ServiceBusSessionReceiverClientBuilder queueName(String queueName) {
            this.queueName = queueName;
            return this;
        }

        /**
         * Sets the receive mode for the receiver.
         *
         * @param receiveMode Mode for receiving messages.
         *
         * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
         */
        public ServiceBusSessionReceiverClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
            this.receiveMode = receiveMode;
            return this;
        }

        /**
         * Sets the name of the subscription in the topic to listen to. <b>{@link #topicName(String)} must also be set.
         * </b>
         *
         * @param subscriptionName Name of the subscription.
         *
         * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
         * @see #topicName A topic name should be set as well.
         */
        public ServiceBusSessionReceiverClientBuilder subscriptionName(String subscriptionName) {
            this.subscriptionName = subscriptionName;
            return this;
        }

        /**
         * Sets the name of the topic. <b>{@link #subscriptionName(String)} must also be set.</b>
         *
         * @param topicName Name of the topic.
         *
         * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
         * @see #subscriptionName A subscription name should be set as well.
         */
        public ServiceBusSessionReceiverClientBuilder topicName(String topicName) {
            this.topicName = topicName;
            return this;
        }

        /**
         * Creates an <b>asynchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
         * ServiceBusMessage messages} from a specific queue or subscription.
         *
         * @return An new {@link ServiceBusReceiverAsyncClient} that receives messages from a queue or subscription.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
         *     #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
         * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
         *     queueName()} or {@link #topicName(String) topicName()}, respectively.
         */
        ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
            final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
                queueName);
            final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
                SubQueue.NONE);

            if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
                logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
                enableAutoComplete = false;
            }

            if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
                maxAutoLockRenewDuration = Duration.ZERO;
            }

            final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
            final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
                maxAutoLockRenewDuration, enableAutoComplete, null,
                maxConcurrentSessions);

            final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
                connectionProcessor, tracerProvider, messageSerializer, receiverOptions);

            return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
                entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
                tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager);
        }

        /**
         * Creates an <b>asynchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
         * ServiceBusMessage messages} from a specific queue or subscription.
         *
         * @return An new {@link ServiceBusSessionReceiverAsyncClient} that receives messages from a queue or
         *      subscription.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
         *     #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
         * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
         *     queueName()} or {@link #topicName(String) topicName()}, respectively.
         */
        public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
            return buildAsyncClient(true);
        }

        /**
         * Creates a <b>synchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
         * ServiceBusMessage messages} from a specific queue or subscription.
         *
         * @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or subscription.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
         *     #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
         * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
         *     queueName()} or {@link #topicName(String) topicName()}, respectively.
         */
        public ServiceBusSessionReceiverClient buildClient() {
            return new ServiceBusSessionReceiverClient(buildAsyncClient(false),
                MessageUtils.getTotalTimeout(retryOptions));
        }

        private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
            final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
                queueName);
            final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
                SubQueue.NONE);

            if (!isAutoCompleteAllowed && enableAutoComplete) {
                logger.warning(
                    "'enableAutoComplete' is not supported in synchronous client except through callback receive.");
                enableAutoComplete = false;
            } else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
                logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
                enableAutoComplete = false;
            }

            if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
                maxAutoLockRenewDuration = Duration.ZERO;
            }

            final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
            final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
                maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);

            return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
                entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer,
                ServiceBusClientBuilder.this::onClientClose);
        }
    }

    /**
     * Builder for creating {@link ServiceBusProcessorClient} to consume messages from a Service Bus entity.
     * {@link ServiceBusProcessorClient ServiceBusProcessorClients} provides a push-based mechanism that notifies
     * the message processing callback when a message is received or the error handle when an error is observed. To
     * create an instance, therefore, configuring the two callbacks - {@link #processMessage(Consumer)} and
     * {@link #processError(Consumer)} are necessary. By default, a {@link ServiceBusProcessorClient} is configured
     * with auto-completion and auto-lock renewal capabilities.
     *
     * <p><strong>Sample code to instantiate a processor client</strong></p>
     * {@codesnippet com.azure.messaging.servicebus.servicebusprocessorclient#instantiation}
     *
     * @see ServiceBusProcessorClient
     */
    public final class ServiceBusProcessorClientBuilder {
        private final ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder;
        private final ServiceBusProcessorClientOptions processorClientOptions;
        private Consumer<ServiceBusReceivedMessageContext> processMessage;
        private Consumer<ServiceBusErrorContext> processError;

        private ServiceBusProcessorClientBuilder() {
            serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder();
            processorClientOptions = new ServiceBusProcessorClientOptions()
                .setMaxConcurrentCalls(1)
                .setTracerProvider(tracerProvider);
        }

        /**
         * Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
         * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
         *
         * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
         * and before the application starts the processor.
         * Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off.
         *
         * @param prefetchCount The prefetch count.
         *
         * @return The modified {@link ServiceBusProcessorClientBuilder} object.
         */
        public ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount) {
            serviceBusReceiverClientBuilder.prefetchCount(prefetchCount);
            return this;
        }

        /**
         * Sets the name of the queue to create a processor for.
         * @param queueName Name of the queue.
         *
         * @return The modified {@link ServiceBusProcessorClientBuilder} object.
         */
        public ServiceBusProcessorClientBuilder queueName(String queueName) {
            serviceBusReceiverClientBuilder.queueName(queueName);
            return this;
        }

        /**
         * Sets the receive mode for the processor.
         * @param receiveMode Mode for receiving messages.
         *
         * @return The modified {@link ServiceBusProcessorClientBuilder} object.
         */
        public ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
            serviceBusReceiverClientBuilder.receiveMode(receiveMode);
            return this;
        }

        /**
         * Sets the name of the subscription in the topic to listen to. <b>{@link #topicName(String)} must also be set.
         * </b>
         * @param subscriptionName Name of the subscription.
         *
         * @return The modified {@link ServiceBusProcessorClientBuilder} object.
         * @see #topicName A topic name should be set as well.
         */
        public ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName) {
            serviceBusReceiverClientBuilder.subscriptionName(subscriptionName);
            return this;
        }

        /**
         * Sets the name of the topic. <b>{@link #subscriptionName(String)} must also be set.</b>
         * @param topicName Name of the topic.
         *
         * @return The modified {@link ServiceBusProcessorClientBuilder} object.
         * @see #subscriptionName A subscription name should be set as well.
         */
        public ServiceBusProcessorClientBuilder topicName(String topicName) {
            serviceBusReceiverClientBuilder.topicName(topicName);
            return this;
        }

        /**
         * The message processing callback for the processor which will be executed when a message is received.
         * @param processMessage The message processing consumer that will be executed when a message is received.
         *
         * @return The updated {@link ServiceBusProcessorClientBuilder} object.
         */
        public ServiceBusProcessorClientBuilder processMessage(
            Consumer<ServiceBusReceivedMessageContext> processMessage) {
            this.processMessage = processMessage;
            return this;
        }

        /**
         * The error handler for the processor which will be invoked in the event of an error while receiving messages.
         * @param processError The error handler which will be executed when an error occurs.
         *
         * @return The updated {@link ServiceBusProcessorClientBuilder} object
         */
        public ServiceBusProcessorClientBuilder processError(Consumer<ServiceBusErrorContext> processError) {
            this.processError = processError;
            return this;
        }

        /**
         * Max concurrent messages that this processor should process. By default, this is set to 1.
         *
         * @param maxConcurrentCalls max concurrent messages that this processor should process.
         * @return The updated {@link ServiceBusProcessorClientBuilder} object.
         * @throws IllegalArgumentException if the {@code maxConcurrentCalls} is set to a value less than 1.
         */
        public ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls) {
            if (maxConcurrentCalls < 1) {
                throw logger.logExceptionAsError(
                    new IllegalArgumentException("'maxConcurrentCalls' cannot be less than 1"));
            }
            processorClientOptions.setMaxConcurrentCalls(maxConcurrentCalls);
            return this;
        }

        /**
         * Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is
         * {@link ServiceBusReceivedMessageContext#complete() completed}. If an error happens when
         * the message is processed, it is {@link ServiceBusReceivedMessageContext#abandon()
         * abandoned}.
         *
         * @return The modified {@link ServiceBusProcessorClientBuilder} object.
         */
        public ServiceBusProcessorClientBuilder disableAutoComplete() {
            serviceBusReceiverClientBuilder.disableAutoComplete();
            processorClientOptions.setDisableAutoComplete(true);
            return this;
        }

        /**
         * Creates Service Bus message processor responsible for reading {@link ServiceBusReceivedMessage
         * messages} from a specific queue or subscription.
         *
         * @return An new {@link ServiceBusProcessorClient} that processes messages from a queue or subscription.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
         *     #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
         * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
         *     queueName()} or {@link #topicName(String) topicName()}, respectively.
         * @throws NullPointerException if the {@link #processMessage(Consumer)} or {@link #processError(Consumer)}
         *     callbacks are not set.
         */
        public ServiceBusProcessorClient buildProcessorClient() {
            return new ServiceBusProcessorClient(serviceBusReceiverClientBuilder,
                Objects.requireNonNull(processMessage, "'processMessage' cannot be null"),
                Objects.requireNonNull(processError, "'processError' cannot be null"), processorClientOptions);
        }
    }

    /**
     * Builder for creating {@link ServiceBusReceiverClient} and {@link ServiceBusReceiverAsyncClient} to consume
     * messages from Service Bus.
     *
     * @see ServiceBusReceiverAsyncClient
     * @see ServiceBusReceiverClient
     */
    @ServiceClientBuilder(serviceClients = {ServiceBusReceiverClient.class, ServiceBusReceiverAsyncClient.class})
    public final class ServiceBusReceiverClientBuilder {
        private boolean enableAutoComplete = true;
        private int prefetchCount = DEFAULT_PREFETCH_COUNT;
        private String queueName;
        private SubQueue subQueue;
        private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK;
        private String subscriptionName;
        private String topicName;
        private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;

        private ServiceBusReceiverClientBuilder() {
        }

        /**
         * Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is
         * {@link ServiceBusReceiverAsyncClient#complete(ServiceBusReceivedMessage) completed}. If an error happens when
         * the message is processed, it is {@link ServiceBusReceiverAsyncClient#abandon(ServiceBusReceivedMessage)
         * abandoned}.
         *
         * @return The modified {@link ServiceBusReceiverClientBuilder} object.
         */
        public ServiceBusReceiverClientBuilder disableAutoComplete() {
            this.enableAutoComplete = false;
            return this;
        }

        /**
         * Sets the amount of time to continue auto-renewing the lock. Setting {@link Duration#ZERO} or {@code null}
         * disables auto-renewal. For {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode,
         * auto-renewal is disabled.
         *
         * @param maxAutoLockRenewDuration the amount of time to continue auto-renewing the lock. {@link Duration#ZERO}
         * or {@code null} indicates that auto-renewal is disabled.
         *
         * @return The updated {@link ServiceBusReceiverClientBuilder} object.
         * @throws IllegalArgumentException If {code maxAutoLockRenewDuration} is negative.
         */
        public ServiceBusReceiverClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration) {
            validateAndThrow(maxAutoLockRenewDuration);
            this.maxAutoLockRenewDuration = maxAutoLockRenewDuration;
            return this;
        }

        /**
         * Sets the prefetch count of the receiver. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
         * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
         *
         * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
         * and before the application asks for one using {@link ServiceBusReceiverAsyncClient#receiveMessages()}.
         * Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch
         * off.
         *
         * @param prefetchCount The prefetch count.
         *
         * @return The modified {@link ServiceBusReceiverClientBuilder} object.
         * @throws IllegalArgumentException If {code prefetchCount} is negative.
         */
        public ServiceBusReceiverClientBuilder prefetchCount(int prefetchCount) {
            validateAndThrow(prefetchCount);
            this.prefetchCount = prefetchCount;
            return this;
        }

        /**
         * Sets the name of the queue to create a receiver for.
         *
         * @param queueName Name of the queue.
         *
         * @return The modified {@link ServiceBusReceiverClientBuilder} object.
         */
        public ServiceBusReceiverClientBuilder queueName(String queueName) {
            this.queueName = queueName;
            return this;
        }

        /**
         * Sets the receive mode for the receiver.
         *
         * @param receiveMode Mode for receiving messages.
         *
         * @return The modified {@link ServiceBusReceiverClientBuilder} object.
         */
        public ServiceBusReceiverClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
            this.receiveMode = receiveMode;
            return this;
        }

        /**
         * Sets the type of the {@link SubQueue} to connect to.
         *
         * @param subQueue The type of the sub queue.
         *
         * @return The modified {@link ServiceBusReceiverClientBuilder} object.
         * @see #queueName A queuename or #topicName A topic name should be set as well.
         */
        public ServiceBusReceiverClientBuilder subQueue(SubQueue subQueue) {
            this.subQueue = subQueue;
            return this;
        }

        /**
         * Sets the name of the subscription in the topic to listen to. <b>{@link #topicName(String)} must also be set.
         * </b>
         *
         * @param subscriptionName Name of the subscription.
         *
         * @return The modified {@link ServiceBusReceiverClientBuilder} object.
         * @see #topicName A topic name should be set as well.
         */
        public ServiceBusReceiverClientBuilder subscriptionName(String subscriptionName) {
            this.subscriptionName = subscriptionName;
            return this;
        }

        /**
         * Sets the name of the topic. <b>{@link #subscriptionName(String)} must also be set.</b>
         *
         * @param topicName Name of the topic.
         *
         * @return The modified {@link ServiceBusReceiverClientBuilder} object.
         * @see #subscriptionName A subscription name should be set as well.
         */
        public ServiceBusReceiverClientBuilder topicName(String topicName) {
            this.topicName = topicName;
            return this;
        }

        /**
         * Creates an <b>asynchronous</b> Service Bus receiver responsible for reading {@link ServiceBusMessage
         * messages} from a specific queue or subscription.
         *
         * @return An new {@link ServiceBusReceiverAsyncClient} that receives messages from a queue or subscription.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
         *     #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
         * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
         *     queueName()} or {@link #topicName(String) topicName()}, respectively.
         */
        public ServiceBusReceiverAsyncClient buildAsyncClient() {
            return buildAsyncClient(true);
        }

        /**
         * Creates <b>synchronous</b> Service Bus receiver responsible for reading {@link ServiceBusMessage messages}
         * from a specific queue or subscription.
         *
         * @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or subscription.
         * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
         *     topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
         *     #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
         *     {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
         *     #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
         * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
         *     queueName()} or {@link #topicName(String) topicName()}, respectively.
         */
        public ServiceBusReceiverClient buildClient() {
            return new ServiceBusReceiverClient(buildAsyncClient(false),
                MessageUtils.getTotalTimeout(retryOptions));
        }

        ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
            final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
                queueName);
            final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
                subQueue);

            if (!isAutoCompleteAllowed && enableAutoComplete) {
                logger.warning(
                    "'enableAutoComplete' is not supported in synchronous client except through callback receive.");
                enableAutoComplete = false;
            } else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
                logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
                enableAutoComplete = false;
            }

            if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
                maxAutoLockRenewDuration = Duration.ZERO;
            }

            final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
            final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
                maxAutoLockRenewDuration, enableAutoComplete);

            return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
                entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
                tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose);
        }
    }

    private void validateAndThrow(int prefetchCount) {
        if (prefetchCount < 0) {
            throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
                "prefetchCount (%s) cannot be less than 0.", prefetchCount)));
        }
    }

    private void validateAndThrow(Duration maxLockRenewalDuration) {
        if (maxLockRenewalDuration != null && maxLockRenewalDuration.isNegative()) {
            throw logger.logExceptionAsError(new IllegalArgumentException(
                "'maxLockRenewalDuration' cannot be negative."));
        }
    }
}