SharedTransportClient.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.LifeCycleUtils;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.UserAgentContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * This class uses a shared RntbdTransportClient for multiple Cosmos Clients.
 * The benefit is the underlying connections can be shared if possible across multiple Cosmos client instances.
 */
// We suppress the "try" warning here because the close() method's signature
// allows it to throw InterruptedException which is strongly advised against
// by AutoCloseable (see: http://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html#close()).
// close() will never throw an InterruptedException but the exception remains in the
// signature for backwards compatibility purposes.
@SuppressWarnings("try")
public class SharedTransportClient extends TransportClient {
    private static final Logger logger = LoggerFactory.getLogger(SharedTransportClient.class);
    private static final AtomicInteger counter = new AtomicInteger(0);
    private static SharedTransportClient sharedTransportClient;
    private final RntbdTransportClient.Options rntbdOptions;

    public static TransportClient getOrCreateInstance(
        Protocol protocol,
        Configs configs,
        ConnectionPolicy connectionPolicy,
        UserAgentContainer userAgent,
        DiagnosticsClientContext.DiagnosticsClientConfig diagnosticsClientConfig,
        IAddressResolver addressResolver) {

        synchronized (SharedTransportClient.class) {
            if (sharedTransportClient == null) {
                assert counter.get() == 0;
                logger.info("creating a new shared RntbdTransportClient");
                sharedTransportClient = new SharedTransportClient(protocol, configs, connectionPolicy, userAgent, addressResolver);
            } else {
                logger.info("Reusing an instance of RntbdTransportClient");
            }

            counter.incrementAndGet();

            diagnosticsClientConfig.withRntbdOptions(sharedTransportClient.rntbdOptions);
            return sharedTransportClient;
        }
    }

    private final TransportClient transportClient;

    private SharedTransportClient(
        Protocol protocol,
        Configs configs,
        ConnectionPolicy connectionPolicy,
        UserAgentContainer userAgent,
        IAddressResolver addressResolver) {
        if (protocol == Protocol.TCP) {
            this.rntbdOptions =
                new RntbdTransportClient.Options.Builder(connectionPolicy).userAgent(userAgent).build();
            this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext(), addressResolver);

        } else if (protocol == Protocol.HTTPS){
            this.rntbdOptions = null;
            this.transportClient = new HttpTransportClient(configs, connectionPolicy, userAgent);
        } else {
            throw new IllegalArgumentException(String.format("protocol: %s", protocol));
        }
    }

    @Override
    protected Mono<StoreResponse> invokeStoreAsync(Uri physicalAddress, RxDocumentServiceRequest request) {
        return transportClient.invokeStoreAsync(physicalAddress, request);
    }

    public int getReferenceCounter() {
        return counter.get();
    }

    @Override
    public void close() throws Exception {
        synchronized (SharedTransportClient.class) {
            final int numberOfActiveTransportClients = counter.decrementAndGet();
            logger.info("closing one reference to the shared RntbdTransportClient, the number of remaining references is {}", numberOfActiveTransportClients);
            if (numberOfActiveTransportClients == 0) {
                logger.info("All references to shared RntbdTransportClient are closed. Closing the underlying RntbdTransportClient");
                LifeCycleUtils.closeQuietly(sharedTransportClient.transportClient);
                sharedTransportClient = null;
            }
        }
    }
}