RxDocumentClientImpl.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.credential.SimpleTokenCache;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.ApiType;
import com.azure.cosmos.implementation.batch.BatchResponseParser;
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest;
import com.azure.cosmos.implementation.caches.SizeLimitingLRUCache;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.cpu.CpuMemoryListener;
import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor;
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.GlobalAddressResolver;
import com.azure.cosmos.implementation.directconnectivity.ServerStoreModel;
import com.azure.cosmos.implementation.directconnectivity.StoreClient;
import com.azure.cosmos.implementation.directconnectivity.StoreClientFactory;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpClientConfig;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.SharedGatewayHttpClient;
import com.azure.cosmos.implementation.patch.PatchUtil;
import com.azure.cosmos.implementation.query.DocumentQueryExecutionContextFactory;
import com.azure.cosmos.implementation.query.IDocumentQueryClient;
import com.azure.cosmos.implementation.query.IDocumentQueryExecutionContext;
import com.azure.cosmos.implementation.query.Paginator;
import com.azure.cosmos.implementation.query.PartitionedQueryExecutionInfo;
import com.azure.cosmos.implementation.query.PipelinedDocumentQueryExecutionContext;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.PartitionKeyAndResourceTokenPair;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.implementation.spark.OperationContext;
import com.azure.cosmos.implementation.spark.OperationListener;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore;
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal;
import com.azure.cosmos.models.CosmosAuthorizationTokenResolver;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.cosmos.BridgeInternal.documentFromObject;
import static com.azure.cosmos.BridgeInternal.getAltLink;
import static com.azure.cosmos.BridgeInternal.toFeedResponsePage;
import static com.azure.cosmos.BridgeInternal.toResourceResponse;
import static com.azure.cosmos.BridgeInternal.toStoredProcedureResponse;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
import static com.azure.cosmos.models.ModelBridgeInternal.serializeJsonToByteBuffer;

/**
 * While this class is public, it is not part of our published public APIs.
 * This is meant to be internally used only by our sdk.
 */
public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorizationTokenProvider, CpuMemoryListener,
    DiagnosticsClientContext {
    private static final AtomicInteger activeClientsCnt = new AtomicInteger(0);
    private static final AtomicInteger clientIdGenerator = new AtomicInteger(0);
    private static final Range<String> RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES = new Range<>(
        PartitionKeyInternalHelper.MinimumInclusiveEffectivePartitionKey,
        PartitionKeyInternalHelper.MaximumExclusiveEffectivePartitionKey, true, false);

    private static final String DUMMY_SQL_QUERY = "this is dummy and only used in creating " +
        "ParallelDocumentQueryExecutioncontext, but not used";
    private final static ObjectMapper mapper = Utils.getSimpleObjectMapper();
    private final ItemDeserializer itemDeserializer = new ItemDeserializer.JsonDeserializer();
    private final Logger logger = LoggerFactory.getLogger(RxDocumentClientImpl.class);
    private final String masterKeyOrResourceToken;
    private final URI serviceEndpoint;
    private final ConnectionPolicy connectionPolicy;
    private final ConsistencyLevel consistencyLevel;
    private final BaseAuthorizationTokenProvider authorizationTokenProvider;
    private final UserAgentContainer userAgentContainer;
    private final boolean hasAuthKeyResourceToken;
    private final Configs configs;
    private final boolean connectionSharingAcrossClientsEnabled;
    private AzureKeyCredential credential;
    private final TokenCredential tokenCredential;
    private String[] tokenCredentialScopes;
    private SimpleTokenCache tokenCredentialCache;
    private CosmosAuthorizationTokenResolver cosmosAuthorizationTokenResolver;
    AuthorizationTokenType authorizationTokenType;
    private SessionContainer sessionContainer;
    private String firstResourceTokenFromPermissionFeed = StringUtils.EMPTY;
    private RxClientCollectionCache collectionCache;
    private RxStoreModel gatewayProxy;
    private RxStoreModel storeModel;
    private GlobalAddressResolver addressResolver;
    private RxPartitionKeyRangeCache partitionKeyRangeCache;
    private Map<String, List<PartitionKeyAndResourceTokenPair>> resourceTokensMap;
    private final boolean contentResponseOnWriteEnabled;
    private Map<String, PartitionedQueryExecutionInfo> queryPlanCache;

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final int clientId;
    private ClientTelemetry clientTelemetry;
    private ApiType apiType;

    // RetryPolicy retries a request when it encounters session unavailable (see ClientRetryPolicy).
    // Once it exhausts all write regions it clears the session container, then it uses RxClientCollectionCache
    // to resolves the request's collection name. If it differs from the session container's resource id it
    // explains the session unavailable exception: somebody removed and recreated the collection. In this
    // case we retry once again (with empty session token) otherwise we return the error to the client
    // (see RenameCollectionAwareClientRetryPolicy)
    private IRetryPolicyFactory resetSessionTokenRetryPolicy;
    /**
     * Compatibility mode: Allows to specify compatibility mode used by client when
     * making query requests. Should be removed when application/sql is no longer
     * supported.
     */
    private final QueryCompatibilityMode queryCompatibilityMode = QueryCompatibilityMode.Default;
    private final GlobalEndpointManager globalEndpointManager;
    private final RetryPolicy retryPolicy;
    private HttpClient reactorHttpClient;
    private Function<HttpClient, HttpClient> httpClientInterceptor;
    private volatile boolean useMultipleWriteLocations;

    // creator of TransportClient is responsible for disposing it.
    private StoreClientFactory storeClientFactory;

    private GatewayServiceConfigurationReader gatewayConfigurationReader;
    private final DiagnosticsClientConfig diagnosticsClientConfig;

    private final AtomicBoolean throughputControlEnabled;
    private ThroughputControlStore throughputControlStore;

    public RxDocumentClientImpl(URI serviceEndpoint,
                                String masterKeyOrResourceToken,
                                List<Permission> permissionFeed,
                                ConnectionPolicy connectionPolicy,
                                ConsistencyLevel consistencyLevel,
                                Configs configs,
                                CosmosAuthorizationTokenResolver cosmosAuthorizationTokenResolver,
                                AzureKeyCredential credential,
                                boolean sessionCapturingOverride,
                                boolean connectionSharingAcrossClientsEnabled,
                                boolean contentResponseOnWriteEnabled,
                                CosmosClientMetadataCachesSnapshot metadataCachesSnapshot,
                                ApiType apiType) {
        this(serviceEndpoint, masterKeyOrResourceToken, permissionFeed, connectionPolicy, consistencyLevel, configs,
            credential, null, sessionCapturingOverride, connectionSharingAcrossClientsEnabled, contentResponseOnWriteEnabled, metadataCachesSnapshot, apiType);
        this.cosmosAuthorizationTokenResolver = cosmosAuthorizationTokenResolver;
    }

    public RxDocumentClientImpl(URI serviceEndpoint,
                                String masterKeyOrResourceToken,
                                List<Permission> permissionFeed,
                                ConnectionPolicy connectionPolicy,
                                ConsistencyLevel consistencyLevel,
                                Configs configs,
                                CosmosAuthorizationTokenResolver cosmosAuthorizationTokenResolver,
                                AzureKeyCredential credential,
                                TokenCredential tokenCredential,
                                boolean sessionCapturingOverride,
                                boolean connectionSharingAcrossClientsEnabled,
                                boolean contentResponseOnWriteEnabled,
                                CosmosClientMetadataCachesSnapshot metadataCachesSnapshot,
                                ApiType apiType) {
        this(serviceEndpoint, masterKeyOrResourceToken, permissionFeed, connectionPolicy, consistencyLevel, configs,
            credential, tokenCredential, sessionCapturingOverride, connectionSharingAcrossClientsEnabled, contentResponseOnWriteEnabled, metadataCachesSnapshot, apiType);
        this.cosmosAuthorizationTokenResolver = cosmosAuthorizationTokenResolver;
    }

    private RxDocumentClientImpl(URI serviceEndpoint,
                                String masterKeyOrResourceToken,
                                List<Permission> permissionFeed,
                                ConnectionPolicy connectionPolicy,
                                ConsistencyLevel consistencyLevel,
                                Configs configs,
                                AzureKeyCredential credential,
                                TokenCredential tokenCredential,
                                boolean sessionCapturingOverrideEnabled,
                                boolean connectionSharingAcrossClientsEnabled,
                                boolean contentResponseOnWriteEnabled,
                                CosmosClientMetadataCachesSnapshot metadataCachesSnapshot,
                                ApiType apiType) {
        this(serviceEndpoint, masterKeyOrResourceToken, connectionPolicy, consistencyLevel, configs,
            credential, tokenCredential, sessionCapturingOverrideEnabled, connectionSharingAcrossClientsEnabled, contentResponseOnWriteEnabled, metadataCachesSnapshot, apiType);

        if (permissionFeed != null && permissionFeed.size() > 0) {
            this.resourceTokensMap = new HashMap<>();
            for (Permission permission : permissionFeed) {
                String[] segments = StringUtils.split(permission.getResourceLink(),
                        Constants.Properties.PATH_SEPARATOR.charAt(0));

                if (segments.length <= 0) {
                    throw new IllegalArgumentException("resourceLink");
                }

                List<PartitionKeyAndResourceTokenPair> partitionKeyAndResourceTokenPairs = null;
                PathInfo pathInfo = new PathInfo(false, StringUtils.EMPTY, StringUtils.EMPTY, false);
                if (!PathsHelper.tryParsePathSegments(permission.getResourceLink(), pathInfo, null)) {
                    throw new IllegalArgumentException(permission.getResourceLink());
                }

                partitionKeyAndResourceTokenPairs = resourceTokensMap.get(pathInfo.resourceIdOrFullName);
                if (partitionKeyAndResourceTokenPairs == null) {
                    partitionKeyAndResourceTokenPairs = new ArrayList<>();
                    this.resourceTokensMap.put(pathInfo.resourceIdOrFullName, partitionKeyAndResourceTokenPairs);
                }

                PartitionKey partitionKey = permission.getResourcePartitionKey();
                partitionKeyAndResourceTokenPairs.add(new PartitionKeyAndResourceTokenPair(
                        partitionKey != null ? BridgeInternal.getPartitionKeyInternal(partitionKey) : PartitionKeyInternal.Empty,
                        permission.getToken()));
                logger.debug("Initializing resource token map  , with map key [{}] , partition key [{}] and resource token [{}]",
                        pathInfo.resourceIdOrFullName, partitionKey != null ? partitionKey.toString() : null, permission.getToken());

            }

            if(this.resourceTokensMap.isEmpty()) {
                throw new IllegalArgumentException("permissionFeed");
            }

            String firstToken = permissionFeed.get(0).getToken();
            if(ResourceTokenAuthorizationHelper.isResourceToken(firstToken)) {
                this.firstResourceTokenFromPermissionFeed = firstToken;
            }
        }
    }

    RxDocumentClientImpl(URI serviceEndpoint,
                         String masterKeyOrResourceToken,
                         ConnectionPolicy connectionPolicy,
                         ConsistencyLevel consistencyLevel,
                         Configs configs,
                         AzureKeyCredential credential,
                         TokenCredential tokenCredential,
                         boolean sessionCapturingOverrideEnabled,
                         boolean connectionSharingAcrossClientsEnabled,
                         boolean contentResponseOnWriteEnabled,
                         CosmosClientMetadataCachesSnapshot metadataCachesSnapshot,
                         ApiType apiType) {

        activeClientsCnt.incrementAndGet();
        this.clientId = clientIdGenerator.getAndDecrement();
        this.diagnosticsClientConfig = new DiagnosticsClientConfig();
        this.diagnosticsClientConfig.withClientId(this.clientId);
        this.diagnosticsClientConfig.withActiveClientCounter(activeClientsCnt);

        this.diagnosticsClientConfig.withConnectionSharingAcrossClientsEnabled(connectionSharingAcrossClientsEnabled);
        this.diagnosticsClientConfig.withConsistency(consistencyLevel);
        this.throughputControlEnabled = new AtomicBoolean(false);

        logger.info(
            "Initializing DocumentClient [{}] with"
                + " serviceEndpoint [{}], connectionPolicy [{}], consistencyLevel [{}], directModeProtocol [{}]",
            this.clientId, serviceEndpoint, connectionPolicy, consistencyLevel, configs.getProtocol());

        try {
            this.connectionSharingAcrossClientsEnabled = connectionSharingAcrossClientsEnabled;
            this.configs = configs;
            this.masterKeyOrResourceToken = masterKeyOrResourceToken;
            this.serviceEndpoint = serviceEndpoint;
            this.credential = credential;
            this.tokenCredential = tokenCredential;
            this.contentResponseOnWriteEnabled = contentResponseOnWriteEnabled;
            this.authorizationTokenType = AuthorizationTokenType.Invalid;

            if (this.credential != null) {
                hasAuthKeyResourceToken = false;
                this.authorizationTokenType = AuthorizationTokenType.PrimaryMasterKey;
                this.authorizationTokenProvider = new BaseAuthorizationTokenProvider(this.credential);
            } else if (masterKeyOrResourceToken != null && ResourceTokenAuthorizationHelper.isResourceToken(masterKeyOrResourceToken)) {
                this.authorizationTokenProvider = null;
                hasAuthKeyResourceToken = true;
                this.authorizationTokenType = AuthorizationTokenType.ResourceToken;
            } else if(masterKeyOrResourceToken != null && !ResourceTokenAuthorizationHelper.isResourceToken(masterKeyOrResourceToken)) {
                this.credential = new AzureKeyCredential(this.masterKeyOrResourceToken);
                hasAuthKeyResourceToken = false;
                this.authorizationTokenType = AuthorizationTokenType.PrimaryMasterKey;
                this.authorizationTokenProvider = new BaseAuthorizationTokenProvider(this.credential);
            } else {
                hasAuthKeyResourceToken = false;
                this.authorizationTokenProvider = null;
                if (tokenCredential != null) {
                    this.tokenCredentialScopes = new String[] {
//                    AadTokenAuthorizationHelper.AAD_AUTH_TOKEN_COSMOS_WINDOWS_SCOPE,
                        serviceEndpoint.getScheme() + "://" + serviceEndpoint.getHost() + "/.default"
                    };
                    this.tokenCredentialCache = new SimpleTokenCache(() -> this.tokenCredential
                        .getToken(new TokenRequestContext().addScopes(this.tokenCredentialScopes)));
                    this.authorizationTokenType = AuthorizationTokenType.AadToken;
                }
            }

            if (connectionPolicy != null) {
                this.connectionPolicy = connectionPolicy;
            } else {
                this.connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
            }

            this.diagnosticsClientConfig.withConnectionMode(this.getConnectionPolicy().getConnectionMode());
            this.diagnosticsClientConfig.withMultipleWriteRegionsEnabled(this.connectionPolicy.isMultipleWriteRegionsEnabled());
            this.diagnosticsClientConfig.withEndpointDiscoveryEnabled(this.connectionPolicy.isEndpointDiscoveryEnabled());
            this.diagnosticsClientConfig.withPreferredRegions(this.connectionPolicy.getPreferredRegions());

            boolean disableSessionCapturing = (ConsistencyLevel.SESSION != consistencyLevel && !sessionCapturingOverrideEnabled);

            this.sessionContainer = new SessionContainer(this.serviceEndpoint.getHost(), disableSessionCapturing);
            this.consistencyLevel = consistencyLevel;

            this.userAgentContainer = new UserAgentContainer();

            String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix();
            if (userAgentSuffix != null && userAgentSuffix.length() > 0) {
                userAgentContainer.setSuffix(userAgentSuffix);
            }

            this.httpClientInterceptor = null;
            this.reactorHttpClient = httpClient();

            this.globalEndpointManager = new GlobalEndpointManager(asDatabaseAccountManagerInternal(), this.connectionPolicy, /**/configs);
            this.retryPolicy = new RetryPolicy(this, this.globalEndpointManager, this.connectionPolicy);
            this.resetSessionTokenRetryPolicy = retryPolicy;
            CpuMemoryMonitor.register(this);
            this.queryPlanCache = Collections.synchronizedMap(new SizeLimitingLRUCache(Constants.QUERYPLAN_CACHE_SIZE));
            this.apiType = apiType;
        } catch (RuntimeException e) {
            logger.error("unexpected failure in initializing client.", e);
            close();
            throw e;
        }
    }

    @Override
    public DiagnosticsClientConfig getConfig() {
        return diagnosticsClientConfig;
    }

    @Override
    public CosmosDiagnostics createDiagnostics() {
       return BridgeInternal.createCosmosDiagnostics(this, this.globalEndpointManager);
    }

    private void initializeGatewayConfigurationReader() {
        this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.globalEndpointManager);
        DatabaseAccount databaseAccount = this.globalEndpointManager.getLatestDatabaseAccount();
        //Database account should not be null here,
        // this.globalEndpointManager.init() must have been already called
        // hence asserting it
        if (databaseAccount == null) {
            logger.error("Client initialization failed."
                + " Check if the endpoint is reachable and if your auth token is valid. More info: https://aka.ms/cosmosdb-tsg-service-unavailable-java");
            throw new RuntimeException("Client initialization failed."
                + " Check if the endpoint is reachable and if your auth token is valid. More info: https://aka.ms/cosmosdb-tsg-service-unavailable-java");
        }

        this.useMultipleWriteLocations = this.connectionPolicy.isMultipleWriteRegionsEnabled() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount);

        // TODO: add support for openAsync
        // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589
    }

    private void updateGatewayProxy() {
        ((RxGatewayStoreModel)this.gatewayProxy).setGatewayServiceConfigurationReader(this.gatewayConfigurationReader);
        ((RxGatewayStoreModel)this.gatewayProxy).setCollectionCache(this.collectionCache);
        ((RxGatewayStoreModel)this.gatewayProxy).setPartitionKeyRangeCache(this.partitionKeyRangeCache);
        ((RxGatewayStoreModel)this.gatewayProxy).setUseMultipleWriteLocations(this.useMultipleWriteLocations);
    }

    public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Function<HttpClient, HttpClient> httpClientInterceptor) {
        try {
            // TODO: add support for openAsync
            // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589

            this.httpClientInterceptor = httpClientInterceptor;
            if (httpClientInterceptor != null) {
                this.reactorHttpClient = httpClientInterceptor.apply(httpClient());
            }

            this.gatewayProxy = createRxGatewayProxy(this.sessionContainer,
                this.consistencyLevel,
                this.queryCompatibilityMode,
                this.userAgentContainer,
                this.globalEndpointManager,
                this.reactorHttpClient,
                this.apiType);
            this.globalEndpointManager.init();
            this.initializeGatewayConfigurationReader();

            if (metadataCachesSnapshot != null) {
                this.collectionCache = new RxClientCollectionCache(this,
                    this.sessionContainer,
                    this.gatewayProxy,
                    this,
                    this.retryPolicy,
                    metadataCachesSnapshot.getCollectionInfoByNameCache(),
                    metadataCachesSnapshot.getCollectionInfoByIdCache()
                );
            } else {
                this.collectionCache = new RxClientCollectionCache(this,
                    this.sessionContainer,
                    this.gatewayProxy,
                    this,
                    this.retryPolicy);
            }
            this.resetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

            this.partitionKeyRangeCache = new RxPartitionKeyRangeCache(RxDocumentClientImpl.this,
                collectionCache);

            updateGatewayProxy();
            if (this.connectionPolicy.getConnectionMode() == ConnectionMode.GATEWAY) {
                this.storeModel = this.gatewayProxy;
            } else {
                this.initializeDirectConnectivity();
            }
            clientTelemetry = new ClientTelemetry(null, UUID.randomUUID().toString(),
                ManagementFactory.getRuntimeMXBean().getName(), userAgentContainer.getUserAgent(),
                connectionPolicy.getConnectionMode(), globalEndpointManager.getLatestDatabaseAccount().getId(),
                null, null, this.reactorHttpClient, connectionPolicy.isClientTelemetryEnabled(), this, this.connectionPolicy.getPreferredRegions());
            clientTelemetry.init();
            this.queryPlanCache = new ConcurrentHashMap<>();
            this.retryPolicy.setRxCollectionCache(this.collectionCache);
        } catch (Exception e) {
            logger.error("unexpected failure in initializing client.", e);
            close();
            throw e;
        }
    }

    public void serialize(CosmosClientMetadataCachesSnapshot state) {
        RxCollectionCache.serialize(state, this.collectionCache);
    }

    private void initializeDirectConnectivity() {

        this.addressResolver = new GlobalAddressResolver(this,
            this.reactorHttpClient,
            this.globalEndpointManager,
            this.configs.getProtocol(),
            this,
            this.collectionCache,
            this.partitionKeyRangeCache,
            userAgentContainer,
            // TODO: GATEWAY Configuration Reader
            //     this.gatewayConfigurationReader,
            null,
            this.connectionPolicy,
            this.apiType);

        this.storeClientFactory = new StoreClientFactory(
            this.addressResolver,
            this.diagnosticsClientConfig,
            this.configs,
            this.connectionPolicy,
            // this.maxConcurrentConnectionOpenRequests,
            this.userAgentContainer,
            this.connectionSharingAcrossClientsEnabled
        );

        this.createStoreModel(true);
    }

    DatabaseAccountManagerInternal asDatabaseAccountManagerInternal() {
        return new DatabaseAccountManagerInternal() {

            @Override
            public URI getServiceEndpoint() {
                return RxDocumentClientImpl.this.getServiceEndpoint();
            }

            @Override
            public Flux<DatabaseAccount> getDatabaseAccountFromEndpoint(URI endpoint) {
                logger.info("Getting database account endpoint from {}", endpoint);
                return RxDocumentClientImpl.this.getDatabaseAccountFromEndpoint(endpoint);
            }

            @Override
            public ConnectionPolicy getConnectionPolicy() {
                return RxDocumentClientImpl.this.getConnectionPolicy();
            }
        };
    }

    RxGatewayStoreModel createRxGatewayProxy(ISessionContainer sessionContainer,
                                             ConsistencyLevel consistencyLevel,
                                             QueryCompatibilityMode queryCompatibilityMode,
                                             UserAgentContainer userAgentContainer,
                                             GlobalEndpointManager globalEndpointManager,
                                             HttpClient httpClient,
                                             ApiType apiType) {
        return new RxGatewayStoreModel(
                this,
                sessionContainer,
                consistencyLevel,
                queryCompatibilityMode,
                userAgentContainer,
                globalEndpointManager,
                httpClient,
                apiType);
    }

    private HttpClient httpClient() {
        HttpClientConfig httpClientConfig = new HttpClientConfig(this.configs)
            .withMaxIdleConnectionTimeout(this.connectionPolicy.getIdleHttpConnectionTimeout())
            .withPoolSize(this.connectionPolicy.getMaxConnectionPoolSize())
            .withProxy(this.connectionPolicy.getProxy())
            .withNetworkRequestTimeout(this.connectionPolicy.getHttpNetworkRequestTimeout());

        if (connectionSharingAcrossClientsEnabled) {
            return SharedGatewayHttpClient.getOrCreateInstance(httpClientConfig, diagnosticsClientConfig);
        } else {
            diagnosticsClientConfig.withGatewayHttpClientConfig(httpClientConfig);
            return HttpClient.createFixed(httpClientConfig);
        }
    }

    private void createStoreModel(boolean subscribeRntbdStatus) {
        // EnableReadRequestsFallback, if not explicitly set on the connection policy,
        // is false if the account's consistency is bounded staleness,
        // and true otherwise.

        StoreClient storeClient = this.storeClientFactory.createStoreClient(this,
                this.addressResolver,
                this.sessionContainer,
                this.gatewayConfigurationReader,
                this,
                this.useMultipleWriteLocations
        );

        this.storeModel = new ServerStoreModel(storeClient);
    }


    @Override
    public URI getServiceEndpoint() {
        return this.serviceEndpoint;
    }

    @Override
    public URI getWriteEndpoint() {
        return globalEndpointManager.getWriteEndpoints().stream().findFirst().orElse(null);
    }

    @Override
    public URI getReadEndpoint() {
        return globalEndpointManager.getReadEndpoints().stream().findFirst().orElse(null);
    }

    @Override
    public ConnectionPolicy getConnectionPolicy() {
        return this.connectionPolicy;
    }

    @Override
    public boolean isContentResponseOnWriteEnabled() {
        return contentResponseOnWriteEnabled;
    }

    @Override
    public ConsistencyLevel getConsistencyLevel() {
        return consistencyLevel;
    }

    @Override
    public ClientTelemetry getClientTelemetry() {
        return this.clientTelemetry;
    }

    @Override
    public Mono<ResourceResponse<Database>> createDatabase(Database database, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> createDatabaseInternal(database, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Database>> createDatabaseInternal(Database database, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {

            if (database == null) {
                throw new IllegalArgumentException("Database");
            }

            logger.debug("Creating a Database. id: [{}]", database.getId());
            validateResource(database);

            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.Database, OperationType.Create);
            Instant serializationStartTimeUTC = Instant.now();
            ByteBuffer byteBuffer = ModelBridgeInternal.serializeJsonToByteBuffer(database);
            Instant serializationEndTimeUTC = Instant.now();
            SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics(
                serializationStartTimeUTC,
                serializationEndTimeUTC,
                SerializationDiagnosticsContext.SerializationType.DATABASE_SERIALIZATION);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Create, ResourceType.Database, Paths.DATABASES_ROOT, byteBuffer, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics);
            if (serializationDiagnosticsContext != null) {
                serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics);
            }

            return this.create(request, retryPolicyInstance, getOperationContextAndListenerTuple(options))
                       .map(response -> toResourceResponse(response, Database.class));
        } catch (Exception e) {
            logger.debug("Failure in creating a database. due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Database>> deleteDatabase(String databaseLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteDatabaseInternal(databaseLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Database>> deleteDatabaseInternal(String databaseLink, RequestOptions options,
                                                                    DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(databaseLink)) {
                throw new IllegalArgumentException("databaseLink");
            }

            logger.debug("Deleting a Database. databaseLink: [{}]", databaseLink);
            String path = Utils.joinPath(databaseLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.Database, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.Database, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.delete(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, Database.class));
        } catch (Exception e) {
            logger.debug("Failure in deleting a database. due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Database>> readDatabase(String databaseLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readDatabaseInternal(databaseLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Database>> readDatabaseInternal(String databaseLink, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(databaseLink)) {
                throw new IllegalArgumentException("databaseLink");
            }

            logger.debug("Reading a Database. databaseLink: [{}]", databaseLink);
            String path = Utils.joinPath(databaseLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.Database, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.Database, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }
            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, Database.class));
        } catch (Exception e) {
            logger.debug("Failure in reading a database. due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<Database>> readDatabases(CosmosQueryRequestOptions options) {
        return readFeed(options, ResourceType.Database, Database.class, Paths.DATABASES_ROOT);
    }

    private String parentResourceLinkToQueryLink(String parentResourceLink, ResourceType resourceTypeEnum) {
        switch (resourceTypeEnum) {
            case Database:
                return Paths.DATABASES_ROOT;

            case DocumentCollection:
                return Utils.joinPath(parentResourceLink, Paths.COLLECTIONS_PATH_SEGMENT);

            case Document:
                return Utils.joinPath(parentResourceLink, Paths.DOCUMENTS_PATH_SEGMENT);

            case Offer:
                return Paths.OFFERS_ROOT;

            case User:
                return Utils.joinPath(parentResourceLink, Paths.USERS_PATH_SEGMENT);

            case ClientEncryptionKey:
                return Utils.joinPath(parentResourceLink, Paths.CLIENT_ENCRYPTION_KEY_PATH_SEGMENT);

            case Permission:
                return Utils.joinPath(parentResourceLink, Paths.PERMISSIONS_PATH_SEGMENT);

            case Attachment:
                return Utils.joinPath(parentResourceLink, Paths.ATTACHMENTS_PATH_SEGMENT);

            case StoredProcedure:
                return Utils.joinPath(parentResourceLink, Paths.STORED_PROCEDURES_PATH_SEGMENT);

            case Trigger:
                return Utils.joinPath(parentResourceLink, Paths.TRIGGERS_PATH_SEGMENT);

            case UserDefinedFunction:
                return Utils.joinPath(parentResourceLink, Paths.USER_DEFINED_FUNCTIONS_PATH_SEGMENT);

            case Conflict:
                return Utils.joinPath(parentResourceLink, Paths.CONFLICTS_PATH_SEGMENT);

            default:
                throw new IllegalArgumentException("resource type not supported");
        }
    }

    private OperationContextAndListenerTuple getOperationContextAndListenerTuple(CosmosQueryRequestOptions options) {
        if (options == null) {
            return null;
        }
        return ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.getCosmosQueryRequestOptionsAccessor().getOperationContext(options);
    }

    private OperationContextAndListenerTuple getOperationContextAndListenerTuple(RequestOptions options) {
        if (options == null) {
            return null;
        }
        return options.getOperationContextAndListenerTuple();
    }

    private <T extends Resource> Flux<FeedResponse<T>> createQuery(
        String parentResourceLink,
        SqlQuerySpec sqlQuery,
        CosmosQueryRequestOptions options,
        Class<T> klass,
        ResourceType resourceTypeEnum) {

        String resourceLink = parentResourceLinkToQueryLink(parentResourceLink, resourceTypeEnum);
        UUID activityId = Utils.randomUUID();
        IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this,
            getOperationContextAndListenerTuple(options));

        // Trying to put this logic as low as the query pipeline
        // Since for parallelQuery, each partition will have its own request, so at this point, there will be no request associate with this retry policy.
        // For default document context, it already wired up InvalidPartitionExceptionRetry, but there is no harm to wire it again here
        InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy(
            this.collectionCache,
            null,
            resourceLink,
            ModelBridgeInternal.getPropertiesFromQueryRequestOptions(options));

        return ObservableHelper.fluxInlineIfPossibleAsObs(
            () -> createQueryInternal(resourceLink, sqlQuery, options, klass, resourceTypeEnum, queryClient, activityId),
            invalidPartitionExceptionRetryPolicy);
    }

    private <T extends Resource> Flux<FeedResponse<T>> createQueryInternal(
            String resourceLink,
            SqlQuerySpec sqlQuery,
            CosmosQueryRequestOptions options,
            Class<T> klass,
            ResourceType resourceTypeEnum,
            IDocumentQueryClient queryClient,
            UUID activityId) {

        Flux<? extends IDocumentQueryExecutionContext<T>> executionContext =
            DocumentQueryExecutionContextFactory
                .createDocumentQueryExecutionContextAsync(this, queryClient, resourceTypeEnum, klass, sqlQuery,
                                                          options, resourceLink, false, activityId,
                                                          Configs.isQueryPlanCachingEnabled(), queryPlanCache);

        AtomicBoolean isFirstResponse = new AtomicBoolean(true);
        return executionContext.flatMap(iDocumentQueryExecutionContext -> {
            QueryInfo queryInfo = null;
            if (iDocumentQueryExecutionContext instanceof PipelinedDocumentQueryExecutionContext) {
                queryInfo = ((PipelinedDocumentQueryExecutionContext<T>) iDocumentQueryExecutionContext).getQueryInfo();
            }

            QueryInfo finalQueryInfo = queryInfo;
            return iDocumentQueryExecutionContext.executeAsync()
                .map(tFeedResponse -> {
                    if (finalQueryInfo != null) {
                        if (finalQueryInfo.hasSelectValue()) {
                            ModelBridgeInternal
                                .addQueryInfoToFeedResponse(tFeedResponse, finalQueryInfo);
                        }

                        if (isFirstResponse.compareAndSet(true, false)) {
                            ModelBridgeInternal.addQueryPlanDiagnosticsContextToFeedResponse(tFeedResponse,
                                finalQueryInfo.getQueryPlanDiagnosticsContext());
                        }
                    }
                    return tFeedResponse;
                });
        });
    }

    @Override
    public Flux<FeedResponse<Database>> queryDatabases(String query, CosmosQueryRequestOptions options) {
        return queryDatabases(new SqlQuerySpec(query), options);
    }


    @Override
    public Flux<FeedResponse<Database>> queryDatabases(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
        return createQuery(Paths.DATABASES_ROOT, querySpec, options, Database.class, ResourceType.Database);
    }

    @Override
    public Mono<ResourceResponse<DocumentCollection>> createCollection(String databaseLink,
                                                                       DocumentCollection collection, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> this.createCollectionInternal(databaseLink, collection, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<DocumentCollection>> createCollectionInternal(String databaseLink,
                                                                                DocumentCollection collection, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(databaseLink)) {
                throw new IllegalArgumentException("databaseLink");
            }
            if (collection == null) {
                throw new IllegalArgumentException("collection");
            }

            logger.debug("Creating a Collection. databaseLink: [{}], Collection id: [{}]", databaseLink,
                collection.getId());
            validateResource(collection);

            String path = Utils.joinPath(databaseLink, Paths.COLLECTIONS_PATH_SEGMENT);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.DocumentCollection, OperationType.Create);

            Instant serializationStartTimeUTC = Instant.now();
            ByteBuffer byteBuffer = ModelBridgeInternal.serializeJsonToByteBuffer(collection);
            Instant serializationEndTimeUTC = Instant.now();
            SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics(
                serializationStartTimeUTC,
                serializationEndTimeUTC,
                SerializationDiagnosticsContext.SerializationType.CONTAINER_SERIALIZATION);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Create, ResourceType.DocumentCollection, path, byteBuffer, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics);
            if (serializationDiagnosticsContext != null) {
                serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics);
            }

            return this.create(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, DocumentCollection.class))
                       .doOnNext(resourceResponse -> {
                    // set the session token
                    this.sessionContainer.setSessionToken(resourceResponse.getResource().getResourceId(),
                        getAltLink(resourceResponse.getResource()),
                        resourceResponse.getResponseHeaders());
                });
        } catch (Exception e) {
            logger.debug("Failure in creating a collection. due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<DocumentCollection>> replaceCollection(DocumentCollection collection,
                                                                        RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceCollectionInternal(collection, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<DocumentCollection>> replaceCollectionInternal(DocumentCollection collection,
                                                                                 RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (collection == null) {
                throw new IllegalArgumentException("collection");
            }

            logger.debug("Replacing a Collection. id: [{}]", collection.getId());
            validateResource(collection);

            String path = Utils.joinPath(collection.getSelfLink(), null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.DocumentCollection, OperationType.Replace);
            Instant serializationStartTimeUTC = Instant.now();
            ByteBuffer byteBuffer = ModelBridgeInternal.serializeJsonToByteBuffer(collection);
            Instant serializationEndTimeUTC = Instant.now();
            SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics(
                serializationStartTimeUTC,
                serializationEndTimeUTC,
                SerializationDiagnosticsContext.SerializationType.CONTAINER_SERIALIZATION);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Replace, ResourceType.DocumentCollection, path, byteBuffer, requestHeaders, options);

            // TODO: .Net has some logic for updating session token which we don't
            // have here
            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics);
            if (serializationDiagnosticsContext != null) {
                serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics);
            }

            return this.replace(request, retryPolicyInstance).map(response -> toResourceResponse(response, DocumentCollection.class))
                .doOnNext(resourceResponse -> {
                    if (resourceResponse.getResource() != null) {
                        // set the session token
                        this.sessionContainer.setSessionToken(resourceResponse.getResource().getResourceId(),
                            getAltLink(resourceResponse.getResource()),
                            resourceResponse.getResponseHeaders());
                    }
                });

        } catch (Exception e) {
            logger.debug("Failure in replacing a collection. due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<DocumentCollection>> deleteCollection(String collectionLink,
                                                                       RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteCollectionInternal(collectionLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<DocumentCollection>> deleteCollectionInternal(String collectionLink,
                                                                                RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(collectionLink)) {
                throw new IllegalArgumentException("collectionLink");
            }

            logger.debug("Deleting a Collection. collectionLink: [{}]", collectionLink);
            String path = Utils.joinPath(collectionLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.DocumentCollection, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.DocumentCollection, path, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.delete(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, DocumentCollection.class));

        } catch (Exception e) {
            logger.debug("Failure in deleting a collection, due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    private Mono<RxDocumentServiceResponse> delete(RxDocumentServiceRequest request, DocumentClientRetryPolicy documentClientRetryPolicy, OperationContextAndListenerTuple operationContextAndListenerTuple) {
        return populateHeaders(request, RequestVerb.DELETE)
            .flatMap(requestPopulated -> {
                if (documentClientRetryPolicy.getRetryContext() != null && documentClientRetryPolicy.getRetryContext().getRetryCount() > 0) {
                    documentClientRetryPolicy.getRetryContext().updateEndTime();
                }

                return getStoreProxy(requestPopulated).processMessage(requestPopulated, operationContextAndListenerTuple);
            });
    }

    private Mono<RxDocumentServiceResponse> deleteAllItemsByPartitionKey(RxDocumentServiceRequest request, DocumentClientRetryPolicy documentClientRetryPolicy, OperationContextAndListenerTuple operationContextAndListenerTuple) {
        return populateHeaders(request, RequestVerb.POST)
            .flatMap(requestPopulated -> {
                RxStoreModel storeProxy = this.getStoreProxy(requestPopulated);
                if (documentClientRetryPolicy.getRetryContext() != null && documentClientRetryPolicy.getRetryContext().getRetryCount() > 0) {
                    documentClientRetryPolicy.getRetryContext().updateEndTime();
                }

                return storeProxy.processMessage(requestPopulated, operationContextAndListenerTuple);
            });
    }

    private Mono<RxDocumentServiceResponse> read(RxDocumentServiceRequest request, DocumentClientRetryPolicy documentClientRetryPolicy) {
        return populateHeaders(request, RequestVerb.GET)
            .flatMap(requestPopulated -> {
                if (documentClientRetryPolicy.getRetryContext() != null && documentClientRetryPolicy.getRetryContext().getRetryCount() > 0) {
                    documentClientRetryPolicy.getRetryContext().updateEndTime();
                }

                return getStoreProxy(requestPopulated).processMessage(requestPopulated);
                });
    }

    Mono<RxDocumentServiceResponse> readFeed(RxDocumentServiceRequest request) {
        return populateHeaders(request, RequestVerb.GET)
            .flatMap(requestPopulated -> getStoreProxy(requestPopulated).processMessage(requestPopulated));
    }

    private Mono<RxDocumentServiceResponse> query(RxDocumentServiceRequest request) {
        return populateHeaders(request, RequestVerb.POST)
            .flatMap(requestPopulated ->
                this.getStoreProxy(requestPopulated).processMessage(requestPopulated)
                    .map(response -> {
                            this.captureSessionToken(requestPopulated, response);
                            return response;
                        }
                    ));
    }

    @Override
    public Mono<ResourceResponse<DocumentCollection>> readCollection(String collectionLink,
                                                                     RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readCollectionInternal(collectionLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<DocumentCollection>> readCollectionInternal(String collectionLink,
                                                                              RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {

        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {
            if (StringUtils.isEmpty(collectionLink)) {
                throw new IllegalArgumentException("collectionLink");
            }

            logger.debug("Reading a Collection. collectionLink: [{}]", collectionLink);
            String path = Utils.joinPath(collectionLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.DocumentCollection, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.DocumentCollection, path, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }
            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, DocumentCollection.class));
        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in reading a collection, due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<DocumentCollection>> readCollections(String databaseLink, CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(databaseLink)) {
            throw new IllegalArgumentException("databaseLink");
        }

        return readFeed(options, ResourceType.DocumentCollection, DocumentCollection.class,
                Utils.joinPath(databaseLink, Paths.COLLECTIONS_PATH_SEGMENT));
    }

    @Override
    public Flux<FeedResponse<DocumentCollection>> queryCollections(String databaseLink, String query,
                                                                   CosmosQueryRequestOptions options) {
        return createQuery(databaseLink, new SqlQuerySpec(query), options, DocumentCollection.class, ResourceType.DocumentCollection);
    }

    @Override
    public Flux<FeedResponse<DocumentCollection>> queryCollections(String databaseLink,
                                                                         SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
        return createQuery(databaseLink, querySpec, options, DocumentCollection.class, ResourceType.DocumentCollection);
    }

    private static String serializeProcedureParams(List<Object> objectArray) {
        String[] stringArray = new String[objectArray.size()];

        for (int i = 0; i < objectArray.size(); ++i) {
            Object object = objectArray.get(i);
            if (object instanceof JsonSerializable) {
                stringArray[i] = ModelBridgeInternal.toJsonFromJsonSerializable((JsonSerializable) object);
            } else {

                // POJO, ObjectNode, number, STRING or Boolean
                try {
                    stringArray[i] = mapper.writeValueAsString(object);
                } catch (IOException e) {
                    throw new IllegalArgumentException("Can't serialize the object into the json string", e);
                }
            }
        }

        return String.format("[%s]", StringUtils.join(stringArray, ","));
    }

    private static void validateResource(Resource resource) {
        if (!StringUtils.isEmpty(resource.getId())) {
            if (resource.getId().indexOf('/') != -1 || resource.getId().indexOf('\\') != -1 ||
                    resource.getId().indexOf('?') != -1 || resource.getId().indexOf('#') != -1) {
                throw new IllegalArgumentException("Id contains illegal chars.");
            }

            if (resource.getId().endsWith(" ")) {
                throw new IllegalArgumentException("Id ends with a space.");
            }
        }
    }

    private Map<String, String> getRequestHeaders(RequestOptions options, ResourceType resourceType, OperationType operationType) {
        Map<String, String> headers = new HashMap<>();

        if (this.useMultipleWriteLocations) {
            headers.put(HttpConstants.HttpHeaders.ALLOW_TENTATIVE_WRITES, Boolean.TRUE.toString());
        }

        if (consistencyLevel != null) {
            headers.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, consistencyLevel.toString());
        }

        if (options == null) {
            //  Corner case, if options are null, then just check this flag from CosmosClientBuilder
            //  If content response on write is not enabled, and operation is document write - then add minimal prefer header
            //  Otherwise, don't add this header, which means return the full response
            if (!this.contentResponseOnWriteEnabled && resourceType.equals(ResourceType.Document) && operationType.isWriteOperation()) {
                headers.put(HttpConstants.HttpHeaders.PREFER, HttpConstants.HeaderValues.PREFER_RETURN_MINIMAL);
            }
            return headers;
        }

        Map<String, String> customOptions = options.getHeaders();
        if (customOptions != null) {
            headers.putAll(customOptions);
        }

        boolean contentResponseOnWriteEnabled = this.contentResponseOnWriteEnabled;
        //  If options has contentResponseOnWriteEnabled set to true / false, override the value from CosmosClientBuilder
        if (options.isContentResponseOnWriteEnabled() != null) {
            contentResponseOnWriteEnabled = options.isContentResponseOnWriteEnabled();
        }

        //  If content response on write is not enabled, and operation is document write - then add minimal prefer header
        //  Otherwise, don't add this header, which means return the full response
        if (!contentResponseOnWriteEnabled && resourceType.equals(ResourceType.Document) && operationType.isWriteOperation()) {
            headers.put(HttpConstants.HttpHeaders.PREFER, HttpConstants.HeaderValues.PREFER_RETURN_MINIMAL);
        }

        if (options.getIfMatchETag() != null) {
                headers.put(HttpConstants.HttpHeaders.IF_MATCH, options.getIfMatchETag());
        }

        if(options.getIfNoneMatchETag() != null) {
            headers.put(HttpConstants.HttpHeaders.IF_NONE_MATCH, options.getIfNoneMatchETag());
        }

        if (options.getConsistencyLevel() != null) {
            headers.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, options.getConsistencyLevel().toString());
        }

        if (options.getIndexingDirective() != null) {
            headers.put(HttpConstants.HttpHeaders.INDEXING_DIRECTIVE, options.getIndexingDirective().toString());
        }

        if (options.getPostTriggerInclude() != null && options.getPostTriggerInclude().size() > 0) {
            String postTriggerInclude = StringUtils.join(options.getPostTriggerInclude(), ",");
            headers.put(HttpConstants.HttpHeaders.POST_TRIGGER_INCLUDE, postTriggerInclude);
        }

        if (options.getPreTriggerInclude() != null && options.getPreTriggerInclude().size() > 0) {
            String preTriggerInclude = StringUtils.join(options.getPreTriggerInclude(), ",");
            headers.put(HttpConstants.HttpHeaders.PRE_TRIGGER_INCLUDE, preTriggerInclude);
        }

        if (!Strings.isNullOrEmpty(options.getSessionToken())) {
            headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, options.getSessionToken());
        }

        if (options.getResourceTokenExpirySeconds() != null) {
            headers.put(HttpConstants.HttpHeaders.RESOURCE_TOKEN_EXPIRY,
                    String.valueOf(options.getResourceTokenExpirySeconds()));
        }

        if (options.getOfferThroughput() != null && options.getOfferThroughput() >= 0) {
            headers.put(HttpConstants.HttpHeaders.OFFER_THROUGHPUT, options.getOfferThroughput().toString());
        } else if (options.getOfferType() != null) {
            headers.put(HttpConstants.HttpHeaders.OFFER_TYPE, options.getOfferType());
        }

        if (options.getOfferThroughput() == null) {
            if (options.getThroughputProperties() != null) {
                Offer offer = ModelBridgeInternal.getOfferFromThroughputProperties(options.getThroughputProperties());
                final OfferAutoscaleSettings offerAutoscaleSettings = offer.getOfferAutoScaleSettings();
                OfferAutoscaleAutoUpgradeProperties autoscaleAutoUpgradeProperties = null;
                if (offerAutoscaleSettings != null) {
                     autoscaleAutoUpgradeProperties
                        = offer.getOfferAutoScaleSettings().getAutoscaleAutoUpgradeProperties();
                }
                if (offer.hasOfferThroughput() &&
                        (offerAutoscaleSettings != null && offerAutoscaleSettings.getMaxThroughput() >= 0 ||
                             autoscaleAutoUpgradeProperties != null &&
                                 autoscaleAutoUpgradeProperties
                                     .getAutoscaleThroughputProperties()
                                     .getIncrementPercent() >= 0)) {
                    throw new IllegalArgumentException("Autoscale provisioned throughput can not be configured with "
                                                           + "fixed offer");
                }

                if (offer.hasOfferThroughput()) {
                    headers.put(HttpConstants.HttpHeaders.OFFER_THROUGHPUT, String.valueOf(offer.getThroughput()));
                } else if (offer.getOfferAutoScaleSettings() != null) {
                    headers.put(HttpConstants.HttpHeaders.OFFER_AUTOPILOT_SETTINGS,
                                ModelBridgeInternal.toJsonFromJsonSerializable(offer.getOfferAutoScaleSettings()));
                }
            }
        }

        if (options.isQuotaInfoEnabled()) {
            headers.put(HttpConstants.HttpHeaders.POPULATE_QUOTA_INFO, String.valueOf(true));
        }

        if (options.isScriptLoggingEnabled()) {
            headers.put(HttpConstants.HttpHeaders.SCRIPT_ENABLE_LOGGING, String.valueOf(true));
        }

        if (options.getDedicatedGatewayRequestOptions() != null &&
            options.getDedicatedGatewayRequestOptions().getMaxIntegratedCacheStaleness() != null) {
            headers.put(HttpConstants.HttpHeaders.DEDICATED_GATEWAY_PER_REQUEST_CACHE_STALENESS,
                String.valueOf(Utils.getMaxIntegratedCacheStalenessInMillis(options.getDedicatedGatewayRequestOptions())));
        }

        return headers;
    }

    public IRetryPolicyFactory getResetSessionTokenRetryPolicy() {
        return this.resetSessionTokenRetryPolicy;
    }

    private Mono<RxDocumentServiceRequest> addPartitionKeyInformation(RxDocumentServiceRequest request,
                                                                      ByteBuffer contentAsByteBuffer,
                                                                      Document document,
                                                                      RequestOptions options) {

        Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);
        return collectionObs
                .map(collectionValueHolder -> {
                    addPartitionKeyInformation(request, contentAsByteBuffer, document, options, collectionValueHolder.v);
                    return request;
                });
    }

    private Mono<RxDocumentServiceRequest> addPartitionKeyInformation(RxDocumentServiceRequest request,
                                                                      ByteBuffer contentAsByteBuffer,
                                                                      Object document,
                                                                      RequestOptions options,
                                                                      Mono<Utils.ValueHolder<DocumentCollection>> collectionObs) {

        return collectionObs.map(collectionValueHolder -> {
            addPartitionKeyInformation(request, contentAsByteBuffer, document, options, collectionValueHolder.v);
            return request;
        });
    }

    private void addPartitionKeyInformation(RxDocumentServiceRequest request,
                                            ByteBuffer contentAsByteBuffer,
                                            Object objectDoc, RequestOptions options,
                                            DocumentCollection collection) {
        PartitionKeyDefinition partitionKeyDefinition = collection.getPartitionKey();

        PartitionKeyInternal partitionKeyInternal = null;
        if (options != null && options.getPartitionKey() != null && options.getPartitionKey().equals(PartitionKey.NONE)){
            partitionKeyInternal = ModelBridgeInternal.getNonePartitionKey(partitionKeyDefinition);
        } else if (options != null && options.getPartitionKey() != null) {
            partitionKeyInternal = BridgeInternal.getPartitionKeyInternal(options.getPartitionKey());
        } else if (partitionKeyDefinition == null || partitionKeyDefinition.getPaths().size() == 0) {
            // For backward compatibility, if collection doesn't have partition key defined, we assume all documents
            // have empty value for it and user doesn't need to specify it explicitly.
            partitionKeyInternal = PartitionKeyInternal.getEmpty();
        } else if (contentAsByteBuffer != null || objectDoc != null) {
            InternalObjectNode internalObjectNode;
            if (objectDoc instanceof InternalObjectNode) {
                internalObjectNode = (InternalObjectNode) objectDoc;
            } else if (objectDoc instanceof ObjectNode) {
                internalObjectNode = new InternalObjectNode((ObjectNode)objectDoc);
            } else if (contentAsByteBuffer != null) {
                contentAsByteBuffer.rewind();
                internalObjectNode = new InternalObjectNode(contentAsByteBuffer);
            } else {
                //  This is a safety check, this should not happen ever.
                //  If it does, it is a SDK bug
                throw new IllegalStateException("ContentAsByteBuffer and objectDoc are null");
            }

            Instant serializationStartTime = Instant.now();
            partitionKeyInternal =  extractPartitionKeyValueFromDocument(internalObjectNode, partitionKeyDefinition);
            Instant serializationEndTime = Instant.now();
            SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics(
                serializationStartTime,
                serializationEndTime,
                SerializationDiagnosticsContext.SerializationType.PARTITION_KEY_FETCH_SERIALIZATION
            );
            SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics);
            if (serializationDiagnosticsContext != null) {
                serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics);
            }

        } else {
            throw new UnsupportedOperationException("PartitionKey value must be supplied for this operation.");
        }

        request.setPartitionKeyInternal(partitionKeyInternal);
        request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY, Utils.escapeNonAscii(partitionKeyInternal.toJson()));
    }

    public static PartitionKeyInternal extractPartitionKeyValueFromDocument(
            InternalObjectNode document,
            PartitionKeyDefinition partitionKeyDefinition) {
        if (partitionKeyDefinition != null) {
            switch (partitionKeyDefinition.getKind()) {
                case HASH:
                    String path = partitionKeyDefinition.getPaths().iterator().next();
                    List<String> parts = PathParser.getPathParts(path);
                    if (parts.size() >= 1) {
                        Object value = ModelBridgeInternal.getObjectByPathFromJsonSerializable(document, parts);
                        if (value == null || value.getClass() == ObjectNode.class) {
                            value = ModelBridgeInternal.getNonePartitionKey(partitionKeyDefinition);
                        }

                        if (value instanceof PartitionKeyInternal) {
                            return (PartitionKeyInternal) value;
                        } else {
                            return PartitionKeyInternal.fromObjectArray(Collections.singletonList(value), false);
                        }
                    }
                    break;
                case MULTI_HASH:
                    Object[] partitionKeyValues = new Object[partitionKeyDefinition.getPaths().size()];
                    for(int pathIter = 0 ; pathIter < partitionKeyDefinition.getPaths().size(); pathIter++){
                        String partitionPath = partitionKeyDefinition.getPaths().get(pathIter);
                        List<String> partitionPathParts = PathParser.getPathParts(partitionPath);
                        partitionKeyValues[pathIter] = ModelBridgeInternal.getObjectByPathFromJsonSerializable(document, partitionPathParts);
                    }
                    return PartitionKeyInternal.fromObjectArray(partitionKeyValues, false);

                default:
                    throw new IllegalArgumentException("Unrecognized Partition kind: " + partitionKeyDefinition.getKind());
                }
        }

        return null;
    }

    private Mono<RxDocumentServiceRequest> getCreateDocumentRequest(DocumentClientRetryPolicy requestRetryPolicy,
                                                                    String documentCollectionLink,
                                                                    Object document,
                                                                    RequestOptions options,
                                                                    boolean disableAutomaticIdGeneration,
                                                                    OperationType operationType) {

        if (StringUtils.isEmpty(documentCollectionLink)) {
            throw new IllegalArgumentException("documentCollectionLink");
        }
        if (document == null) {
            throw new IllegalArgumentException("document");
        }

        Instant serializationStartTimeUTC = Instant.now();
        ByteBuffer content = BridgeInternal.serializeJsonToByteBuffer(document, mapper);
        Instant serializationEndTimeUTC = Instant.now();

        SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics(
            serializationStartTimeUTC,
            serializationEndTimeUTC,
            SerializationDiagnosticsContext.SerializationType.ITEM_SERIALIZATION);

        String path = Utils.joinPath(documentCollectionLink, Paths.DOCUMENTS_PATH_SEGMENT);
        Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.Document, operationType);

        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
            operationType, ResourceType.Document, path, requestHeaders, options, content);
        if (requestRetryPolicy != null) {
            requestRetryPolicy.onBeforeSendRequest(request);
        }

        SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics);
        if (serializationDiagnosticsContext != null) {
            serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics);
        }

        Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);
        return addPartitionKeyInformation(request, content, document, options, collectionObs);
    }

    private Mono<RxDocumentServiceRequest> getBatchDocumentRequest(DocumentClientRetryPolicy requestRetryPolicy,
                                                                   String documentCollectionLink,
                                                                   ServerBatchRequest serverBatchRequest,
                                                                   RequestOptions options,
                                                                   boolean disableAutomaticIdGeneration) {

        checkArgument(StringUtils.isNotEmpty(documentCollectionLink), "expected non empty documentCollectionLink");
        checkNotNull(serverBatchRequest, "expected non null serverBatchRequest");

        Instant serializationStartTimeUTC = Instant.now();
        ByteBuffer content = ByteBuffer.wrap(Utils.getUTF8Bytes(serverBatchRequest.getRequestBody()));
        Instant serializationEndTimeUTC = Instant.now();

        SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics(
            serializationStartTimeUTC,
            serializationEndTimeUTC,
            SerializationDiagnosticsContext.SerializationType.ITEM_SERIALIZATION);

        String path = Utils.joinPath(documentCollectionLink, Paths.DOCUMENTS_PATH_SEGMENT);
        Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.Document, OperationType.Batch);

        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
            this,
            OperationType.Batch,
            ResourceType.Document,
            path,
            requestHeaders,
            options,
            content);

        if (requestRetryPolicy != null) {
            requestRetryPolicy.onBeforeSendRequest(request);
        }

        SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics);
        if (serializationDiagnosticsContext != null) {
            serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics);
        }

        Mono<Utils.ValueHolder<DocumentCollection>> collectionObs =
            this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

        return collectionObs.map((Utils.ValueHolder<DocumentCollection> collectionValueHolder) -> {
            addBatchHeaders(request, serverBatchRequest, collectionValueHolder.v);
            return request;
        });
    }

    private RxDocumentServiceRequest addBatchHeaders(RxDocumentServiceRequest request,
                                                     ServerBatchRequest serverBatchRequest,
                                                     DocumentCollection collection) {

        if(serverBatchRequest instanceof SinglePartitionKeyServerBatchRequest) {

            PartitionKey partitionKey = ((SinglePartitionKeyServerBatchRequest) serverBatchRequest).getPartitionKeyValue();
            PartitionKeyInternal partitionKeyInternal;

            if (partitionKey.equals(PartitionKey.NONE)) {
                PartitionKeyDefinition partitionKeyDefinition = collection.getPartitionKey();
                partitionKeyInternal = ModelBridgeInternal.getNonePartitionKey(partitionKeyDefinition);
            } else {
                // Partition key is always non-null
                partitionKeyInternal = BridgeInternal.getPartitionKeyInternal(partitionKey);
            }

            request.setPartitionKeyInternal(partitionKeyInternal);
            request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY, Utils.escapeNonAscii(partitionKeyInternal.toJson()));
        } else if(serverBatchRequest instanceof PartitionKeyRangeServerBatchRequest) {
            request.setPartitionKeyRangeIdentity(new PartitionKeyRangeIdentity(((PartitionKeyRangeServerBatchRequest) serverBatchRequest).getPartitionKeyRangeId()));
        } else {
            throw new UnsupportedOperationException("Unknown Server request.");
        }

        request.getHeaders().put(HttpConstants.HttpHeaders.IS_BATCH_REQUEST, Boolean.TRUE.toString());
        request.getHeaders().put(HttpConstants.HttpHeaders.IS_BATCH_ATOMIC, String.valueOf(serverBatchRequest.isAtomicBatch()));
        request.getHeaders().put(HttpConstants.HttpHeaders.SHOULD_BATCH_CONTINUE_ON_ERROR, String.valueOf(serverBatchRequest.isShouldContinueOnError()));

        request.setNumberOfItemsInBatchRequest(serverBatchRequest.getOperations().size());

        return request;
    }

    private Mono<RxDocumentServiceRequest> populateHeaders(RxDocumentServiceRequest request, RequestVerb httpMethod) {
        request.getHeaders().put(HttpConstants.HttpHeaders.X_DATE, Utils.nowAsRFC1123());
        if (this.masterKeyOrResourceToken != null || this.resourceTokensMap != null
            || this.cosmosAuthorizationTokenResolver != null || this.credential != null) {
            String resourceName = request.getResourceAddress();

            String authorization = this.getUserAuthorizationToken(
                    resourceName, request.getResourceType(), httpMethod, request.getHeaders(),
                    AuthorizationTokenType.PrimaryMasterKey, request.properties);
            try {
                authorization = URLEncoder.encode(authorization, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException("Failed to encode authtoken.", e);
            }
            request.getHeaders().put(HttpConstants.HttpHeaders.AUTHORIZATION, authorization);
        }

        if (this.apiType != null)   {
            request.getHeaders().put(HttpConstants.HttpHeaders.API_TYPE, this.apiType.toString());
        }

        if ((RequestVerb.POST.equals(httpMethod) || RequestVerb.PUT.equals(httpMethod))
                && !request.getHeaders().containsKey(HttpConstants.HttpHeaders.CONTENT_TYPE)) {
            request.getHeaders().put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON);
        }

        if (RequestVerb.PATCH.equals(httpMethod) &&
            !request.getHeaders().containsKey(HttpConstants.HttpHeaders.CONTENT_TYPE)) {
            request.getHeaders().put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON_PATCH);
        }

        if (!request.getHeaders().containsKey(HttpConstants.HttpHeaders.ACCEPT)) {
            request.getHeaders().put(HttpConstants.HttpHeaders.ACCEPT, RuntimeConstants.MediaTypes.JSON);
        }

        MetadataDiagnosticsContext metadataDiagnosticsCtx =
            BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);

        if (this.requiresFeedRangeFiltering(request)) {
            return request.getFeedRange()
                          .populateFeedRangeFilteringHeaders(
                              this.getPartitionKeyRangeCache(),
                              request,
                              this.collectionCache.resolveCollectionAsync(metadataDiagnosticsCtx, request))
                          .flatMap(this::populateAuthorizationHeader);
        }

        return this.populateAuthorizationHeader(request);
    }

    private boolean requiresFeedRangeFiltering(RxDocumentServiceRequest request) {
        if (request.getResourceType() != ResourceType.Document &&
                request.getResourceType() != ResourceType.Conflict) {
            return false;
        }

        switch (request.getOperationType()) {
            case ReadFeed:
            case Query:
            case SqlQuery:
                return request.getFeedRange() != null;
            default:
                return false;
        }
    }

    @Override
    public Mono<RxDocumentServiceRequest> populateAuthorizationHeader(RxDocumentServiceRequest request) {
        if (request == null) {
            throw new IllegalArgumentException("request");
        }

        if (this.authorizationTokenType == AuthorizationTokenType.AadToken) {
            return AadTokenAuthorizationHelper.getAuthorizationToken(this.tokenCredentialCache)
                .map(authorization -> {
                    request.getHeaders().put(HttpConstants.HttpHeaders.AUTHORIZATION, authorization);
                    return request;
                });
        } else {
            return Mono.just(request);
        }
    }

    @Override
    public Mono<HttpHeaders> populateAuthorizationHeader(HttpHeaders httpHeaders) {
        if (httpHeaders == null) {
            throw new IllegalArgumentException("httpHeaders");
        }

        if (this.authorizationTokenType == AuthorizationTokenType.AadToken) {
            return AadTokenAuthorizationHelper.getAuthorizationToken(this.tokenCredentialCache)
                .map(authorization -> {
                    httpHeaders.set(HttpConstants.HttpHeaders.AUTHORIZATION, authorization);
                    return httpHeaders;
                });
        }

        return Mono.just(httpHeaders);
    }

    @Override
    public AuthorizationTokenType getAuthorizationTokenType() {
        return this.authorizationTokenType;
    }

    @Override
    public String getUserAuthorizationToken(String resourceName,
                                            ResourceType resourceType,
                                            RequestVerb requestVerb,
                                            Map<String, String> headers,
                                            AuthorizationTokenType tokenType,
                                            Map<String, Object> properties) {

        if (this.cosmosAuthorizationTokenResolver != null) {
            return this.cosmosAuthorizationTokenResolver.getAuthorizationToken(requestVerb.toUpperCase(), resourceName, this.resolveCosmosResourceType(resourceType).toString(),
                    properties != null ? Collections.unmodifiableMap(properties) : null);
        } else if (credential != null) {
            return this.authorizationTokenProvider.generateKeyAuthorizationSignature(requestVerb, resourceName,
                    resourceType, headers);
        } else if (masterKeyOrResourceToken != null && hasAuthKeyResourceToken && resourceTokensMap == null) {
            return masterKeyOrResourceToken;
        } else {
            assert resourceTokensMap != null;
            if(resourceType.equals(ResourceType.DatabaseAccount)) {
                return this.firstResourceTokenFromPermissionFeed;
            }

            return ResourceTokenAuthorizationHelper.getAuthorizationTokenUsingResourceTokens(resourceTokensMap, requestVerb, resourceName, headers);
        }
    }

    private CosmosResourceType resolveCosmosResourceType(ResourceType resourceType) {
        CosmosResourceType cosmosResourceType =
            ModelBridgeInternal.fromServiceSerializedFormat(resourceType.toString());
        if (cosmosResourceType == null) {
            return CosmosResourceType.SYSTEM;
        }
        return cosmosResourceType;
    }

    void captureSessionToken(RxDocumentServiceRequest request, RxDocumentServiceResponse response) {
        this.sessionContainer.setSessionToken(request, response.getResponseHeaders());
    }

    private Mono<RxDocumentServiceResponse> create(RxDocumentServiceRequest request,
                                                   DocumentClientRetryPolicy documentClientRetryPolicy,
                                                   OperationContextAndListenerTuple operationContextAndListenerTuple) {
        return populateHeaders(request, RequestVerb.POST)
            .flatMap(requestPopulated -> {
                RxStoreModel storeProxy = this.getStoreProxy(requestPopulated);
                if (documentClientRetryPolicy.getRetryContext() != null && documentClientRetryPolicy.getRetryContext().getRetryCount() > 0) {
                    documentClientRetryPolicy.getRetryContext().updateEndTime();
                }

                return storeProxy.processMessage(requestPopulated, operationContextAndListenerTuple);
            });
    }

    private Mono<RxDocumentServiceResponse> upsert(RxDocumentServiceRequest request,
                                                   DocumentClientRetryPolicy documentClientRetryPolicy,
                                                   OperationContextAndListenerTuple operationContextAndListenerTuple) {

        return populateHeaders(request, RequestVerb.POST)
            .flatMap(requestPopulated -> {
                Map<String, String> headers = requestPopulated.getHeaders();
                // headers can never be null, since it will be initialized even when no
                // request options are specified,
                // hence using assertion here instead of exception, being in the private
                // method
                assert (headers != null);
                headers.put(HttpConstants.HttpHeaders.IS_UPSERT, "true");
                if (documentClientRetryPolicy.getRetryContext() != null && documentClientRetryPolicy.getRetryContext().getRetryCount() > 0) {
                    documentClientRetryPolicy.getRetryContext().updateEndTime();
                }

                return getStoreProxy(requestPopulated).processMessage(requestPopulated, operationContextAndListenerTuple)
                    .map(response -> {
                            this.captureSessionToken(requestPopulated, response);
                            return response;
                        }
                    );
            });
    }

    private Mono<RxDocumentServiceResponse> replace(RxDocumentServiceRequest request, DocumentClientRetryPolicy documentClientRetryPolicy) {
        return populateHeaders(request, RequestVerb.PUT)
            .flatMap(requestPopulated -> {
                if (documentClientRetryPolicy.getRetryContext() != null && documentClientRetryPolicy.getRetryContext().getRetryCount() > 0) {
                    documentClientRetryPolicy.getRetryContext().updateEndTime();
                }

                return getStoreProxy(requestPopulated).processMessage(requestPopulated);
            });
    }

    private Mono<RxDocumentServiceResponse> patch(RxDocumentServiceRequest request, DocumentClientRetryPolicy documentClientRetryPolicy) {
        populateHeaders(request, RequestVerb.PATCH);
        if (documentClientRetryPolicy.getRetryContext() != null && documentClientRetryPolicy.getRetryContext().getRetryCount() > 0) {
            documentClientRetryPolicy.getRetryContext().updateEndTime();
        }

        return getStoreProxy(request).processMessage(request);
    }

    @Override
    public Mono<ResourceResponse<Document>> createDocument(String collectionLink, Object document,
                                                           RequestOptions options, boolean disableAutomaticIdGeneration) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        if (options == null || options.getPartitionKey() == null) {
            requestRetryPolicy = new PartitionKeyMismatchRetryPolicy(collectionCache, requestRetryPolicy, collectionLink, options);
        }

        DocumentClientRetryPolicy finalRetryPolicyInstance = requestRetryPolicy;
        return ObservableHelper.inlineIfPossibleAsObs(() -> createDocumentInternal(collectionLink, document, options, disableAutomaticIdGeneration, finalRetryPolicyInstance), requestRetryPolicy);
    }

    private Mono<ResourceResponse<Document>> createDocumentInternal(String collectionLink, Object document,
                                                                    RequestOptions options, boolean disableAutomaticIdGeneration, DocumentClientRetryPolicy requestRetryPolicy) {
        try {
            logger.debug("Creating a Document. collectionLink: [{}]", collectionLink);

            Mono<RxDocumentServiceRequest> requestObs = getCreateDocumentRequest(requestRetryPolicy, collectionLink, document,
                options, disableAutomaticIdGeneration, OperationType.Create);

            Mono<RxDocumentServiceResponse> responseObservable =
                requestObs.flatMap(request -> create(request, requestRetryPolicy, getOperationContextAndListenerTuple(options)));

            return responseObservable
                    .map(serviceResponse -> toResourceResponse(serviceResponse, Document.class));

        } catch (Exception e) {
            logger.debug("Failure in creating a document due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Document>> upsertDocument(String collectionLink, Object document,
                                                                 RequestOptions options, boolean disableAutomaticIdGeneration) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        if (options == null || options.getPartitionKey() == null) {
            requestRetryPolicy = new PartitionKeyMismatchRetryPolicy(collectionCache, requestRetryPolicy, collectionLink, options);
        }
        DocumentClientRetryPolicy finalRetryPolicyInstance = requestRetryPolicy;
        return ObservableHelper.inlineIfPossibleAsObs(() -> upsertDocumentInternal(collectionLink, document, options, disableAutomaticIdGeneration, finalRetryPolicyInstance), finalRetryPolicyInstance);
    }

    private Mono<ResourceResponse<Document>> upsertDocumentInternal(String collectionLink, Object document,
                                                                    RequestOptions options, boolean disableAutomaticIdGeneration, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            logger.debug("Upserting a Document. collectionLink: [{}]", collectionLink);

            Mono<RxDocumentServiceRequest> reqObs = getCreateDocumentRequest(retryPolicyInstance, collectionLink, document,
                options, disableAutomaticIdGeneration, OperationType.Upsert);

            Mono<RxDocumentServiceResponse> responseObservable = reqObs.flatMap(request -> upsert(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)));

            return responseObservable
                    .map(serviceResponse -> toResourceResponse(serviceResponse, Document.class));
        } catch (Exception e) {
            logger.debug("Failure in upserting a document due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Document>> replaceDocument(String documentLink, Object document,
                                                            RequestOptions options) {

        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        if (options == null || options.getPartitionKey() == null) {
            String collectionLink = Utils.getCollectionName(documentLink);
            requestRetryPolicy = new PartitionKeyMismatchRetryPolicy(collectionCache, requestRetryPolicy, collectionLink, options);
        }
        DocumentClientRetryPolicy finalRequestRetryPolicy = requestRetryPolicy;
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceDocumentInternal(documentLink, document, options, finalRequestRetryPolicy), requestRetryPolicy);
    }

    private Mono<ResourceResponse<Document>> replaceDocumentInternal(String documentLink, Object document,
                                                                     RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(documentLink)) {
                throw new IllegalArgumentException("documentLink");
            }

            if (document == null) {
                throw new IllegalArgumentException("document");
            }

            Document typedDocument = documentFromObject(document, mapper);

            return this.replaceDocumentInternal(documentLink, typedDocument, options, retryPolicyInstance);

        } catch (Exception e) {
            logger.debug("Failure in replacing a document due to [{}]", e.getMessage());
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Document>> replaceDocument(Document document, RequestOptions options) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        if (options == null || options.getPartitionKey() == null) {
            String collectionLink = document.getSelfLink();
            requestRetryPolicy = new PartitionKeyMismatchRetryPolicy(collectionCache, requestRetryPolicy, collectionLink, options);
        }
        DocumentClientRetryPolicy finalRequestRetryPolicy = requestRetryPolicy;
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceDocumentInternal(document, options, finalRequestRetryPolicy), requestRetryPolicy);
    }

    private Mono<ResourceResponse<Document>> replaceDocumentInternal(Document document, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {

        try {
            if (document == null) {
                throw new IllegalArgumentException("document");
            }

            return this.replaceDocumentInternal(document.getSelfLink(), document, options, retryPolicyInstance);

        } catch (Exception e) {
            logger.debug("Failure in replacing a database due to [{}]", e.getMessage());
            return Mono.error(e);
        }
    }

    private Mono<ResourceResponse<Document>> replaceDocumentInternal(String documentLink,
                                                                     Document document,
                                                                     RequestOptions options,
                                                                     DocumentClientRetryPolicy retryPolicyInstance) {

        if (document == null) {
            throw new IllegalArgumentException("document");
        }

        logger.debug("Replacing a Document. documentLink: [{}]", documentLink);
        final String path = Utils.joinPath(documentLink, null);
        final Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Document, OperationType.Replace);
        Instant serializationStartTimeUTC = Instant.now();
        ByteBuffer content = serializeJsonToByteBuffer(document);
        Instant serializationEndTime = Instant.now();
        SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics(
            serializationStartTimeUTC,
            serializationEndTime,
            SerializationDiagnosticsContext.SerializationType.ITEM_SERIALIZATION);

        final RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
            OperationType.Replace, ResourceType.Document, path, requestHeaders, options, content);
        if (retryPolicyInstance != null) {
            retryPolicyInstance.onBeforeSendRequest(request);
        }

        SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics);
        if (serializationDiagnosticsContext != null) {
            serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics);
        }

        Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);
        Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, content, document, options, collectionObs);

        return requestObs.flatMap(req -> replace(request, retryPolicyInstance)
            .map(resp -> toResourceResponse(resp, Document.class)));
    }

    @Override
    public Mono<ResourceResponse<Document>> patchDocument(String documentLink,
                                                          CosmosPatchOperations cosmosPatchOperations,
                                                          RequestOptions options) {
        DocumentClientRetryPolicy documentClientRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> patchDocumentInternal(documentLink, cosmosPatchOperations, options, documentClientRetryPolicy), documentClientRetryPolicy);
    }

    private Mono<ResourceResponse<Document>> patchDocumentInternal(String documentLink,
                                                                   CosmosPatchOperations cosmosPatchOperations,
                                                                   RequestOptions options,
                                                                   DocumentClientRetryPolicy retryPolicyInstance) {

        checkArgument(StringUtils.isNotEmpty(documentLink), "expected non empty documentLink");
        checkNotNull(cosmosPatchOperations, "expected non null cosmosPatchOperations");

        logger.debug("Running patch operations on Document. documentLink: [{}]", documentLink);

        final String path = Utils.joinPath(documentLink, null);

        final Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Document, OperationType.Patch);
        Instant serializationStartTimeUTC = Instant.now();

        ByteBuffer content = ByteBuffer.wrap(PatchUtil.serializeCosmosPatchToByteArray(cosmosPatchOperations, options));

        Instant serializationEndTime = Instant.now();
        SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics(
            serializationStartTimeUTC,
            serializationEndTime,
            SerializationDiagnosticsContext.SerializationType.ITEM_SERIALIZATION);

        final RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
            this,
            OperationType.Patch,
            ResourceType.Document,
            path,
            requestHeaders,
            options,
            content);

        if (retryPolicyInstance != null) {
            retryPolicyInstance.onBeforeSendRequest(request);
        }

        SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics);
        if (serializationDiagnosticsContext != null) {
            serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics);
        }

        Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(
            BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

        // options will always have partition key info, so contentAsByteBuffer can be null and is not needed.
        Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(
            request,
            null,
            null,
            options,
            collectionObs);

        return requestObs.flatMap(req -> patch(request, retryPolicyInstance)
            .map(resp -> toResourceResponse(resp, Document.class)));
    }

    @Override
    public Mono<ResourceResponse<Document>> deleteDocument(String documentLink, RequestOptions options) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteDocumentInternal(documentLink, null, options, requestRetryPolicy), requestRetryPolicy);
    }

    @Override
    public Mono<ResourceResponse<Document>> deleteDocument(String documentLink, InternalObjectNode internalObjectNode, RequestOptions options) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteDocumentInternal(documentLink, internalObjectNode, options, requestRetryPolicy),
            requestRetryPolicy);
    }

    private Mono<ResourceResponse<Document>> deleteDocumentInternal(String documentLink, InternalObjectNode internalObjectNode, RequestOptions options,
                                                                    DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(documentLink)) {
                throw new IllegalArgumentException("documentLink");
            }

            logger.debug("Deleting a Document. documentLink: [{}]", documentLink);
            String path = Utils.joinPath(documentLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.Document, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.Document, path, requestHeaders, options);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

            Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, internalObjectNode, options, collectionObs);

            return requestObs.flatMap(req -> this
                .delete(req, retryPolicyInstance, getOperationContextAndListenerTuple(options))
                .map(serviceResponse -> toResourceResponse(serviceResponse, Document.class)));

        } catch (Exception e) {
            logger.debug("Failure in deleting a document due to [{}]", e.getMessage());
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Document>> deleteAllDocumentsByPartitionKey(String collectionLink, PartitionKey partitionKey, RequestOptions options) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteAllDocumentsByPartitionKeyInternal(collectionLink, options, requestRetryPolicy),
            requestRetryPolicy);
    }

    private Mono<ResourceResponse<Document>> deleteAllDocumentsByPartitionKeyInternal(String collectionLink, RequestOptions options,
                                                                                  DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(collectionLink)) {
                throw new IllegalArgumentException("collectionLink");
            }

            logger.debug("Deleting all items by Partition Key. collectionLink: [{}]", collectionLink);
            String path = Utils.joinPath(collectionLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.PartitionKey, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.PartitionKey, path, requestHeaders, options);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

            Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, null, options, collectionObs);

            return requestObs.flatMap(req -> this
                .deleteAllItemsByPartitionKey(req, retryPolicyInstance, getOperationContextAndListenerTuple(options))
                .map(serviceResponse -> toResourceResponse(serviceResponse, Document.class)));
        } catch (Exception e) {
            logger.debug("Failure in deleting documents due to [{}]", e.getMessage());
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Document>> readDocument(String documentLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readDocumentInternal(documentLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Document>> readDocumentInternal(String documentLink, RequestOptions options,
                                                                  DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(documentLink)) {
                throw new IllegalArgumentException("documentLink");
            }

            logger.debug("Reading a Document. documentLink: [{}]", documentLink);
            String path = Utils.joinPath(documentLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.Document, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.Document, path, requestHeaders, options);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

            Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, null, options, collectionObs);

            return requestObs.flatMap(req -> {
                return this.read(request, retryPolicyInstance).map(serviceResponse -> toResourceResponse(serviceResponse, Document.class));
            });

        } catch (Exception e) {
            logger.debug("Failure in reading a document due to [{}]", e.getMessage());
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<Document>> readDocuments(String collectionLink, CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }

        return queryDocuments(collectionLink, "SELECT * FROM r", options);
    }

    @Override
    public <T> Mono<FeedResponse<T>> readMany(
        List<CosmosItemIdentity> itemIdentityList,
        String collectionLink,
        CosmosQueryRequestOptions options,
        Class<T> klass) {

        String resourceLink = parentResourceLinkToQueryLink(collectionLink, ResourceType.Document);
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
            OperationType.Query,
            ResourceType.Document,
            collectionLink, null
        );

        // This should not got to backend
        Mono<Utils.ValueHolder<DocumentCollection>> collectionObs =
            collectionCache.resolveCollectionAsync(null, request);

        return collectionObs
            .flatMap(documentCollectionResourceResponse -> {
                    final DocumentCollection collection = documentCollectionResourceResponse.v;
                    if (collection == null) {
                        throw new IllegalStateException("Collection cannot be null");
                    }

                    final PartitionKeyDefinition pkDefinition = collection.getPartitionKey();

                    Mono<Utils.ValueHolder<CollectionRoutingMap>> valueHolderMono = partitionKeyRangeCache
                        .tryLookupAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
                            collection.getResourceId(),
                            null,
                            null);
                    return valueHolderMono.flatMap(collectionRoutingMapValueHolder -> {
                        Map<PartitionKeyRange, List<CosmosItemIdentity>> partitionRangeItemKeyMap =
                            new HashMap<>();
                        CollectionRoutingMap routingMap = collectionRoutingMapValueHolder.v;
                        if (routingMap == null) {
                            throw new IllegalStateException("Failed to get routing map.");
                        }
                        itemIdentityList
                            .forEach(itemIdentity -> {

                                String effectivePartitionKeyString =  PartitionKeyInternalHelper
                                    .getEffectivePartitionKeyString(
                                        BridgeInternal.getPartitionKeyInternal(
                                            itemIdentity.getPartitionKey()),
                                        pkDefinition);

                                //use routing map to find the partitionKeyRangeId of each
                                // effectivePartitionKey
                                PartitionKeyRange range =
                                    routingMap.getRangeByEffectivePartitionKey(effectivePartitionKeyString);

                                //group the itemKeyList based on partitionKeyRangeId
                                if (partitionRangeItemKeyMap.get(range) == null) {
                                    List<CosmosItemIdentity> list = new ArrayList<>();
                                    list.add(itemIdentity);
                                    partitionRangeItemKeyMap.put(range, list);
                                } else {
                                    List<CosmosItemIdentity> pairs =
                                        partitionRangeItemKeyMap.get(range);
                                    pairs.add(itemIdentity);
                                    partitionRangeItemKeyMap.put(range, pairs);
                                }

                            });

                        //Create the range query map that contains the query to be run for that
                        // partitionkeyrange
                        Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap;
                        rangeQueryMap = getRangeQueryMap(partitionRangeItemKeyMap,
                            collection.getPartitionKey());

                        // create the executable query
                        return createReadManyQuery(
                            resourceLink,
                            new SqlQuerySpec(DUMMY_SQL_QUERY),
                            options,
                            Document.class,
                            ResourceType.Document,
                            collection,
                            Collections.unmodifiableMap(rangeQueryMap))
                            .collectList() // aggregating the result construct a FeedResponse and
                            // aggregate RUs.
                            .map(feedList -> {
                                List<T> finalList = new ArrayList<>();
                                HashMap<String, String> headers = new HashMap<>();
                                ConcurrentMap<String, QueryMetrics> aggregatedQueryMetrics = new ConcurrentHashMap<>();
                                double requestCharge = 0;
                                for (FeedResponse<Document> page : feedList) {
                                    ConcurrentMap<String, QueryMetrics> pageQueryMetrics =
                                        ModelBridgeInternal.queryMetrics(page);
                                    if (pageQueryMetrics != null) {
                                        pageQueryMetrics.forEach(
                                            aggregatedQueryMetrics::putIfAbsent);
                                    }

                                    requestCharge += page.getRequestCharge();
                                    // TODO: this does double serialization: FIXME
                                    finalList.addAll(page.getResults().stream().map(document ->
                                        ModelBridgeInternal.toObjectFromJsonSerializable(document, klass)).collect(Collectors.toList()));
                                }
                                headers.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double
                                    .toString(requestCharge));
                                FeedResponse<T> frp = BridgeInternal
                                    .createFeedResponse(finalList, headers);
                                return frp;
                            });
                    });
                }
            );

    }

    private Map<PartitionKeyRange, SqlQuerySpec> getRangeQueryMap(
        Map<PartitionKeyRange, List<CosmosItemIdentity>> partitionRangeItemKeyMap,
        PartitionKeyDefinition partitionKeyDefinition) {
        //TODO: Optimise this to include all types of partitionkeydefinitions. ex: c["prop1./ab"]["key1"]

        Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap = new HashMap<>();
        String partitionKeySelector = createPkSelector(partitionKeyDefinition);

        for(Map.Entry<PartitionKeyRange, List<CosmosItemIdentity>> entry: partitionRangeItemKeyMap.entrySet()) {

            SqlQuerySpec sqlQuerySpec;
            if (partitionKeySelector.equals("[\"id\"]")) {
                sqlQuerySpec = createReadManyQuerySpecPartitionKeyIdSame(entry.getValue(), partitionKeySelector);
            } else {
                sqlQuerySpec = createReadManyQuerySpec(entry.getValue(), partitionKeySelector);
            }
            // Add query for this partition to rangeQueryMap
            rangeQueryMap.put(entry.getKey(), sqlQuerySpec);

        }

        return rangeQueryMap;
    }

    private SqlQuerySpec createReadManyQuerySpecPartitionKeyIdSame(
        List<CosmosItemIdentity> idPartitionKeyPairList,
        String partitionKeySelector) {

        StringBuilder queryStringBuilder = new StringBuilder();
        List<SqlParameter> parameters = new ArrayList<>();

        queryStringBuilder.append("SELECT * FROM c WHERE c.id IN ( ");
        for (int i = 0; i < idPartitionKeyPairList.size(); i++) {
            CosmosItemIdentity itemIdentity = idPartitionKeyPairList.get(i);

            String idValue = itemIdentity.getId();
            String idParamName = "@param" + i;

            PartitionKey pkValueAsPartitionKey = itemIdentity.getPartitionKey();
            Object pkValue = ModelBridgeInternal.getPartitionKeyObject(pkValueAsPartitionKey);

            if (!Objects.equals(idValue, pkValue)) {
                // this is sanity check to ensure id and pk are the same
                continue;
            }

            parameters.add(new SqlParameter(idParamName, idValue));
            queryStringBuilder.append(idParamName);

            if (i < idPartitionKeyPairList.size() - 1) {
                queryStringBuilder.append(", ");
            }
        }
        queryStringBuilder.append(" )");

        return new SqlQuerySpec(queryStringBuilder.toString(), parameters);
    }

    private SqlQuerySpec createReadManyQuerySpec(List<CosmosItemIdentity> itemIdentities, String partitionKeySelector) {
        StringBuilder queryStringBuilder = new StringBuilder();
        List<SqlParameter> parameters = new ArrayList<>();

        queryStringBuilder.append("SELECT * FROM c WHERE ( ");
        for (int i = 0; i < itemIdentities.size(); i++) {
            CosmosItemIdentity itemIdentity = itemIdentities.get(i);

            PartitionKey pkValueAsPartitionKey = itemIdentity.getPartitionKey();
            Object pkValue = ModelBridgeInternal.getPartitionKeyObject(pkValueAsPartitionKey);
            String pkParamName = "@param" + (2 * i);
            parameters.add(new SqlParameter(pkParamName, pkValue));

            String idValue = itemIdentity.getId();
            String idParamName = "@param" + (2 * i + 1);
            parameters.add(new SqlParameter(idParamName, idValue));

            queryStringBuilder.append("(");
            queryStringBuilder.append("c.id = ");
            queryStringBuilder.append(idParamName);
            queryStringBuilder.append(" AND ");
            queryStringBuilder.append(" c");
            // partition key def
            queryStringBuilder.append(partitionKeySelector);
            queryStringBuilder.append((" = "));
            queryStringBuilder.append(pkParamName);
            queryStringBuilder.append(" )");

            if (i < itemIdentities.size() - 1) {
                queryStringBuilder.append(" OR ");
            }
        }
        queryStringBuilder.append(" )");

        return new SqlQuerySpec(queryStringBuilder.toString(), parameters);
    }

    private String createPkSelector(PartitionKeyDefinition partitionKeyDefinition) {
        return partitionKeyDefinition.getPaths()
            .stream()
            .map(pathPart -> StringUtils.substring(pathPart, 1)) // skip starting /
            .map(pathPart -> StringUtils.replace(pathPart, "\"", "\\")) // escape quote
            .map(part -> "[\"" + part + "\"]")
            .collect(Collectors.joining());
    }

    private <T extends Resource> Flux<FeedResponse<T>> createReadManyQuery(
        String parentResourceLink,
        SqlQuerySpec sqlQuery,
        CosmosQueryRequestOptions options,
        Class<T> klass,
        ResourceType resourceTypeEnum,
        DocumentCollection collection,
        Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap) {

        UUID activityId = Utils.randomUUID();
        IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this, getOperationContextAndListenerTuple(options));
        Flux<? extends IDocumentQueryExecutionContext<T>> executionContext =
            DocumentQueryExecutionContextFactory.createReadManyQueryAsync(this, queryClient, collection.getResourceId(),
                                                                          sqlQuery,
                                                                          rangeQueryMap,
                                                                          options,
                                                                          collection.getResourceId(),
                                                                          parentResourceLink,
                                                                          activityId,
                                                                          klass,
                                                                          resourceTypeEnum);
        return executionContext.flatMap(IDocumentQueryExecutionContext<T>::executeAsync);
    }

    @Override
    public Flux<FeedResponse<Document>> queryDocuments(String collectionLink, String query, CosmosQueryRequestOptions options) {
        return queryDocuments(collectionLink, new SqlQuerySpec(query), options);
    }

    private IDocumentQueryClient documentQueryClientImpl(RxDocumentClientImpl rxDocumentClientImpl, OperationContextAndListenerTuple operationContextAndListenerTuple) {

        return new IDocumentQueryClient () {

            @Override
            public RxCollectionCache getCollectionCache() {
                return RxDocumentClientImpl.this.collectionCache;
            }

            @Override
            public RxPartitionKeyRangeCache getPartitionKeyRangeCache() {
                return RxDocumentClientImpl.this.partitionKeyRangeCache;
            }

            @Override
            public IRetryPolicyFactory getResetSessionTokenRetryPolicy() {
                return RxDocumentClientImpl.this.resetSessionTokenRetryPolicy;
            }

            @Override
            public ConsistencyLevel getDefaultConsistencyLevelAsync() {
                return RxDocumentClientImpl.this.gatewayConfigurationReader.getDefaultConsistencyLevel();
            }

            @Override
            public ConsistencyLevel getDesiredConsistencyLevelAsync() {
                // TODO Auto-generated method stub
                return RxDocumentClientImpl.this.consistencyLevel;
            }

            @Override
            public Mono<RxDocumentServiceResponse> executeQueryAsync(RxDocumentServiceRequest request) {
                if (operationContextAndListenerTuple == null) {
                    return RxDocumentClientImpl.this.query(request).single();
                } else {
                    final OperationListener listener =
                        operationContextAndListenerTuple.getOperationListener();
                    final OperationContext operationContext = operationContextAndListenerTuple.getOperationContext();
                    request.getHeaders().put(HttpConstants.HttpHeaders.CORRELATED_ACTIVITY_ID, operationContext.getCorrelationActivityId());
                    listener.requestListener(operationContext, request);

                    return RxDocumentClientImpl.this.query(request).single().doOnNext(
                        response -> listener.responseListener(operationContext, response)
                    ).doOnError(
                        ex -> listener.exceptionListener(operationContext, ex)
                    );
                }
            }

            @Override
            public QueryCompatibilityMode getQueryCompatibilityMode() {
                // TODO Auto-generated method stub
                return QueryCompatibilityMode.Default;
            }

            @Override
            public Mono<RxDocumentServiceResponse> readFeedAsync(RxDocumentServiceRequest request) {
                // TODO Auto-generated method stub
                return null;
            }
        };
    }

    @Override
    public Flux<FeedResponse<Document>> queryDocuments(String collectionLink, SqlQuerySpec querySpec,
                                                             CosmosQueryRequestOptions options) {
        SqlQuerySpecLogger.getInstance().logQuery(querySpec);
        return createQuery(collectionLink, querySpec, options, Document.class, ResourceType.Document);
    }

    @Override
    public Flux<FeedResponse<Document>> queryDocumentChangeFeed(
        final DocumentCollection collection,
        final CosmosChangeFeedRequestOptions changeFeedOptions) {

        checkNotNull(collection, "Argument 'collection' must not be null.");

        ChangeFeedQueryImpl<Document> changeFeedQueryImpl = new ChangeFeedQueryImpl<>(
            this,
            ResourceType.Document,
            Document.class,
            collection.getAltLink(),
            collection.getResourceId(),
            changeFeedOptions);

        return changeFeedQueryImpl.executeAsync();
    }

    @Override
    public Flux<FeedResponse<Document>> readAllDocuments(
        String collectionLink,
        PartitionKey partitionKey,
        CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }

        if (partitionKey == null) {
            throw new IllegalArgumentException("partitionKey");
        }

        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
            OperationType.Query,
            ResourceType.Document,
            collectionLink,
            null
        );

        // This should not got to backend
        Flux<Utils.ValueHolder<DocumentCollection>> collectionObs =
            collectionCache.resolveCollectionAsync(null, request).flux();

        return collectionObs.flatMap(documentCollectionResourceResponse -> {

            DocumentCollection collection = documentCollectionResourceResponse.v;
            if (collection == null) {
                throw new IllegalStateException("Collection cannot be null");
            }

            PartitionKeyDefinition pkDefinition = collection.getPartitionKey();
            String pkSelector = createPkSelector(pkDefinition);
            SqlQuerySpec querySpec = createLogicalPartitionScanQuerySpec(partitionKey, pkSelector);

            String resourceLink = parentResourceLinkToQueryLink(collectionLink, ResourceType.Document);
            UUID activityId = Utils.randomUUID();
            IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this, getOperationContextAndListenerTuple(options));

            final CosmosQueryRequestOptions effectiveOptions =
                ModelBridgeInternal.createQueryRequestOptions(options);

            // Trying to put this logic as low as the query pipeline
            // Since for parallelQuery, each partition will have its own request, so at this point, there will be no request associate with this retry policy.
            // For default document context, it already wired up InvalidPartitionExceptionRetry, but there is no harm to wire it again here
            InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy(
                this.collectionCache,
                null,
                resourceLink,
                ModelBridgeInternal.getPropertiesFromQueryRequestOptions(effectiveOptions));

            return ObservableHelper.fluxInlineIfPossibleAsObs(
                () -> {
                    Flux<Utils.ValueHolder<CollectionRoutingMap>> valueHolderMono = this.partitionKeyRangeCache
                        .tryLookupAsync(
                            BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
                            collection.getResourceId(),
                            null,
                            null).flux();
                    return valueHolderMono.flatMap(collectionRoutingMapValueHolder -> {

                        CollectionRoutingMap routingMap = collectionRoutingMapValueHolder.v;
                        if (routingMap == null) {
                            throw new IllegalStateException("Failed to get routing map.");
                        }

                        String effectivePartitionKeyString = PartitionKeyInternalHelper
                            .getEffectivePartitionKeyString(
                                BridgeInternal.getPartitionKeyInternal(partitionKey),
                                pkDefinition);

                        //use routing map to find the partitionKeyRangeId of each
                        // effectivePartitionKey
                        PartitionKeyRange range =
                            routingMap.getRangeByEffectivePartitionKey(effectivePartitionKeyString);

                        return createQueryInternal(
                            resourceLink,
                            querySpec,
                            ModelBridgeInternal.setPartitionKeyRangeIdInternal(effectiveOptions, range.getId()),
                            Document.class,
                            ResourceType.Document,
                            queryClient,
                            activityId);
                    });
                },
                invalidPartitionExceptionRetryPolicy);
        });
    }

    @Override
    public Map<String, PartitionedQueryExecutionInfo> getQueryPlanCache() {
        return queryPlanCache;
    }

    @Override
    public Flux<FeedResponse<PartitionKeyRange>> readPartitionKeyRanges(final String collectionLink,
                                                                              CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }

        return readFeed(options, ResourceType.PartitionKeyRange, PartitionKeyRange.class,
                Utils.joinPath(collectionLink, Paths.PARTITION_KEY_RANGES_PATH_SEGMENT));
    }

    private RxDocumentServiceRequest getStoredProcedureRequest(String collectionLink, StoredProcedure storedProcedure,
                                                               RequestOptions options, OperationType operationType) {
        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }
        if (storedProcedure == null) {
            throw new IllegalArgumentException("storedProcedure");
        }

        validateResource(storedProcedure);

        String path = Utils.joinPath(collectionLink, Paths.STORED_PROCEDURES_PATH_SEGMENT);
        Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.StoredProcedure, operationType);
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this, operationType,
            ResourceType.StoredProcedure, path, storedProcedure, requestHeaders, options);

        return request;
    }

    private RxDocumentServiceRequest getUserDefinedFunctionRequest(String collectionLink, UserDefinedFunction udf,
                                                                   RequestOptions options, OperationType operationType) {
        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }
        if (udf == null) {
            throw new IllegalArgumentException("udf");
        }

        validateResource(udf);

        String path = Utils.joinPath(collectionLink, Paths.USER_DEFINED_FUNCTIONS_PATH_SEGMENT);
        Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.UserDefinedFunction, operationType);
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
            operationType, ResourceType.UserDefinedFunction, path, udf, requestHeaders, options);

        return request;
    }

    @Override
    public Mono<ResourceResponse<StoredProcedure>> createStoredProcedure(String collectionLink,
                                                                               StoredProcedure storedProcedure, RequestOptions options) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> createStoredProcedureInternal(collectionLink, storedProcedure, options, requestRetryPolicy), requestRetryPolicy);
    }

    private Mono<ResourceResponse<StoredProcedure>> createStoredProcedureInternal(String collectionLink,
                                                                                        StoredProcedure storedProcedure, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {

            logger.debug("Creating a StoredProcedure. collectionLink: [{}], storedProcedure id [{}]",
                    collectionLink, storedProcedure.getId());
            RxDocumentServiceRequest request = getStoredProcedureRequest(collectionLink, storedProcedure, options,
                    OperationType.Create);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.create(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, StoredProcedure.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in creating a StoredProcedure due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<StoredProcedure>> upsertStoredProcedure(String collectionLink,
                                                                               StoredProcedure storedProcedure, RequestOptions options) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> upsertStoredProcedureInternal(collectionLink, storedProcedure, options, requestRetryPolicy), requestRetryPolicy);
    }

    private Mono<ResourceResponse<StoredProcedure>> upsertStoredProcedureInternal(String collectionLink,
                                                                                        StoredProcedure storedProcedure, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {

            logger.debug("Upserting a StoredProcedure. collectionLink: [{}], storedProcedure id [{}]",
                    collectionLink, storedProcedure.getId());
            RxDocumentServiceRequest request = getStoredProcedureRequest(collectionLink, storedProcedure, options,
                    OperationType.Upsert);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.upsert(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, StoredProcedure.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in upserting a StoredProcedure due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<StoredProcedure>> replaceStoredProcedure(StoredProcedure storedProcedure,
                                                                                RequestOptions options) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceStoredProcedureInternal(storedProcedure, options, requestRetryPolicy), requestRetryPolicy);
    }

    private Mono<ResourceResponse<StoredProcedure>> replaceStoredProcedureInternal(StoredProcedure storedProcedure,
                                                                                         RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {

            if (storedProcedure == null) {
                throw new IllegalArgumentException("storedProcedure");
            }
            logger.debug("Replacing a StoredProcedure. storedProcedure id [{}]", storedProcedure.getId());

            RxDocumentClientImpl.validateResource(storedProcedure);

            String path = Utils.joinPath(storedProcedure.getSelfLink(), null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.StoredProcedure, OperationType.Replace);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Replace, ResourceType.StoredProcedure, path, storedProcedure, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.replace(request, retryPolicyInstance).map(response -> toResourceResponse(response, StoredProcedure.class));

        } catch (Exception e) {
            logger.debug("Failure in replacing a StoredProcedure due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<StoredProcedure>> deleteStoredProcedure(String storedProcedureLink,
                                                                               RequestOptions options) {
        DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteStoredProcedureInternal(storedProcedureLink, options, requestRetryPolicy), requestRetryPolicy);
    }

    private Mono<ResourceResponse<StoredProcedure>> deleteStoredProcedureInternal(String storedProcedureLink,
                                                                                        RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {

            if (StringUtils.isEmpty(storedProcedureLink)) {
                throw new IllegalArgumentException("storedProcedureLink");
            }

            logger.debug("Deleting a StoredProcedure. storedProcedureLink [{}]", storedProcedureLink);
            String path = Utils.joinPath(storedProcedureLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.StoredProcedure, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.StoredProcedure, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.delete(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, StoredProcedure.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in deleting a StoredProcedure due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<StoredProcedure>> readStoredProcedure(String storedProcedureLink,
                                                                             RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readStoredProcedureInternal(storedProcedureLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<StoredProcedure>> readStoredProcedureInternal(String storedProcedureLink,
                                                                                      RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {

        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {

            if (StringUtils.isEmpty(storedProcedureLink)) {
                throw new IllegalArgumentException("storedProcedureLink");
            }

            logger.debug("Reading a StoredProcedure. storedProcedureLink [{}]", storedProcedureLink);
            String path = Utils.joinPath(storedProcedureLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.StoredProcedure, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.StoredProcedure, path, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, StoredProcedure.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in reading a StoredProcedure due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<StoredProcedure>> readStoredProcedures(String collectionLink,
                                                                          CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }

        return readFeed(options, ResourceType.StoredProcedure, StoredProcedure.class,
                Utils.joinPath(collectionLink, Paths.STORED_PROCEDURES_PATH_SEGMENT));
    }

    @Override
    public Flux<FeedResponse<StoredProcedure>> queryStoredProcedures(String collectionLink, String query,
                                                                           CosmosQueryRequestOptions options) {
        return queryStoredProcedures(collectionLink, new SqlQuerySpec(query), options);
    }

    @Override
    public Flux<FeedResponse<StoredProcedure>> queryStoredProcedures(String collectionLink,
                                                                           SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
        return createQuery(collectionLink, querySpec, options, StoredProcedure.class, ResourceType.StoredProcedure);
    }

    @Override
    public Mono<StoredProcedureResponse> executeStoredProcedure(String storedProcedureLink,
                                                                      List<Object> procedureParams) {
        return this.executeStoredProcedure(storedProcedureLink, null, procedureParams);
    }

    @Override
    public Mono<StoredProcedureResponse> executeStoredProcedure(String storedProcedureLink,
                                                                      RequestOptions options, List<Object> procedureParams) {
        DocumentClientRetryPolicy documentClientRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> executeStoredProcedureInternal(storedProcedureLink, options, procedureParams, documentClientRetryPolicy), documentClientRetryPolicy);
    }

    @Override
    public Mono<CosmosBatchResponse> executeBatchRequest(String collectionLink,
                                                         ServerBatchRequest serverBatchRequest,
                                                         RequestOptions options,
                                                         boolean disableAutomaticIdGeneration) {
        DocumentClientRetryPolicy documentClientRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> executeBatchRequestInternal(collectionLink, serverBatchRequest, options, documentClientRetryPolicy, disableAutomaticIdGeneration), documentClientRetryPolicy);
    }

    private Mono<StoredProcedureResponse> executeStoredProcedureInternal(String storedProcedureLink,
                                                                               RequestOptions options, List<Object> procedureParams, DocumentClientRetryPolicy retryPolicy) {

        try {
            logger.debug("Executing a StoredProcedure. storedProcedureLink [{}]", storedProcedureLink);
            String path = Utils.joinPath(storedProcedureLink, null);

            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.StoredProcedure, OperationType.ExecuteJavaScript);
            requestHeaders.put(HttpConstants.HttpHeaders.ACCEPT, RuntimeConstants.MediaTypes.JSON);

            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                    OperationType.ExecuteJavaScript,
                    ResourceType.StoredProcedure, path,
                    procedureParams != null && !procedureParams.isEmpty() ? RxDocumentClientImpl.serializeProcedureParams(procedureParams) : "",
                    requestHeaders, options);

            if (retryPolicy != null) {
                retryPolicy.onBeforeSendRequest(request);
            }

            Mono<RxDocumentServiceRequest> reqObs = addPartitionKeyInformation(request, null, null, options);
            return reqObs.flatMap(req -> create(request, retryPolicy, getOperationContextAndListenerTuple(options))
                    .map(response -> {
                        this.captureSessionToken(request, response);
                        return toStoredProcedureResponse(response);
                    }));

        } catch (Exception e) {
            logger.debug("Failure in executing a StoredProcedure due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    private Mono<CosmosBatchResponse> executeBatchRequestInternal(String collectionLink,
                                                                         ServerBatchRequest serverBatchRequest,
                                                                         RequestOptions options,
                                                                         DocumentClientRetryPolicy requestRetryPolicy,
                                                                         boolean disableAutomaticIdGeneration) {

        try {
            logger.debug("Executing a Batch request with number of operations {}", serverBatchRequest.getOperations().size());

            Mono<RxDocumentServiceRequest> requestObs = getBatchDocumentRequest(requestRetryPolicy, collectionLink, serverBatchRequest, options, disableAutomaticIdGeneration);
            Mono<RxDocumentServiceResponse> responseObservable =
                requestObs.flatMap(request -> create(request, requestRetryPolicy, getOperationContextAndListenerTuple(options)));

            return responseObservable
                .map(serviceResponse -> BatchResponseParser.fromDocumentServiceResponse(serviceResponse, serverBatchRequest, true));

        } catch (Exception ex) {
            logger.debug("Failure in executing a batch due to [{}]", ex.getMessage(), ex);
            return Mono.error(ex);
        }
    }

    @Override
    public Mono<ResourceResponse<Trigger>> createTrigger(String collectionLink, Trigger trigger,
                                                               RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> createTriggerInternal(collectionLink, trigger, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Trigger>> createTriggerInternal(String collectionLink, Trigger trigger,
                                                                        RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {

            logger.debug("Creating a Trigger. collectionLink [{}], trigger id [{}]", collectionLink,
                    trigger.getId());
            RxDocumentServiceRequest request = getTriggerRequest(collectionLink, trigger, options,
                    OperationType.Create);
            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.create(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, Trigger.class));

        } catch (Exception e) {
            logger.debug("Failure in creating a Trigger due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Trigger>> upsertTrigger(String collectionLink, Trigger trigger,
                                                               RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> upsertTriggerInternal(collectionLink, trigger, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Trigger>> upsertTriggerInternal(String collectionLink, Trigger trigger,
                                                                        RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {

            logger.debug("Upserting a Trigger. collectionLink [{}], trigger id [{}]", collectionLink,
                    trigger.getId());
            RxDocumentServiceRequest request = getTriggerRequest(collectionLink, trigger, options,
                    OperationType.Upsert);
            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.upsert(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, Trigger.class));

        } catch (Exception e) {
            logger.debug("Failure in upserting a Trigger due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    private RxDocumentServiceRequest getTriggerRequest(String collectionLink, Trigger trigger, RequestOptions options,
                                                       OperationType operationType) {
        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }
        if (trigger == null) {
            throw new IllegalArgumentException("trigger");
        }

        RxDocumentClientImpl.validateResource(trigger);

        String path = Utils.joinPath(collectionLink, Paths.TRIGGERS_PATH_SEGMENT);
        Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Trigger, operationType);
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this, operationType, ResourceType.Trigger, path,
                trigger, requestHeaders, options);

        return request;
    }

    @Override
    public Mono<ResourceResponse<Trigger>> replaceTrigger(Trigger trigger, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceTriggerInternal(trigger, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Trigger>> replaceTriggerInternal(Trigger trigger, RequestOptions options,
                                                                         DocumentClientRetryPolicy retryPolicyInstance) {

        try {
            if (trigger == null) {
                throw new IllegalArgumentException("trigger");
            }

            logger.debug("Replacing a Trigger. trigger id [{}]", trigger.getId());
            RxDocumentClientImpl.validateResource(trigger);

            String path = Utils.joinPath(trigger.getSelfLink(), null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Trigger, OperationType.Replace);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Replace, ResourceType.Trigger, path, trigger, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.replace(request, retryPolicyInstance).map(response -> toResourceResponse(response, Trigger.class));

        } catch (Exception e) {
            logger.debug("Failure in replacing a Trigger due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Trigger>> deleteTrigger(String triggerLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteTriggerInternal(triggerLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Trigger>> deleteTriggerInternal(String triggerLink, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(triggerLink)) {
                throw new IllegalArgumentException("triggerLink");
            }

            logger.debug("Deleting a Trigger. triggerLink [{}]", triggerLink);
            String path = Utils.joinPath(triggerLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Trigger, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.Trigger, path, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.delete(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, Trigger.class));

        } catch (Exception e) {
            logger.debug("Failure in deleting a Trigger due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Trigger>> readTrigger(String triggerLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readTriggerInternal(triggerLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Trigger>> readTriggerInternal(String triggerLink, RequestOptions options,
                                                                      DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(triggerLink)) {
                throw new IllegalArgumentException("triggerLink");
            }

            logger.debug("Reading a Trigger. triggerLink [{}]", triggerLink);
            String path = Utils.joinPath(triggerLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Trigger, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.Trigger, path, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, Trigger.class));

        } catch (Exception e) {
            logger.debug("Failure in reading a Trigger due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<Trigger>> readTriggers(String collectionLink, CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }

        return readFeed(options, ResourceType.Trigger, Trigger.class,
                Utils.joinPath(collectionLink, Paths.TRIGGERS_PATH_SEGMENT));
    }

    @Override
    public Flux<FeedResponse<Trigger>> queryTriggers(String collectionLink, String query,
                                                           CosmosQueryRequestOptions options) {
        return queryTriggers(collectionLink, new SqlQuerySpec(query), options);
    }

    @Override
    public Flux<FeedResponse<Trigger>> queryTriggers(String collectionLink, SqlQuerySpec querySpec,
                                                           CosmosQueryRequestOptions options) {
        return createQuery(collectionLink, querySpec, options, Trigger.class, ResourceType.Trigger);
    }

    @Override
    public Mono<ResourceResponse<UserDefinedFunction>> createUserDefinedFunction(String collectionLink,
                                                                                       UserDefinedFunction udf, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> createUserDefinedFunctionInternal(collectionLink, udf, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<UserDefinedFunction>> createUserDefinedFunctionInternal(String collectionLink,
                                                                                                UserDefinedFunction udf, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {
            logger.debug("Creating a UserDefinedFunction. collectionLink [{}], udf id [{}]", collectionLink,
                    udf.getId());
            RxDocumentServiceRequest request = getUserDefinedFunctionRequest(collectionLink, udf, options,
                    OperationType.Create);
            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.create(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, UserDefinedFunction.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in creating a UserDefinedFunction due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<UserDefinedFunction>> upsertUserDefinedFunction(String collectionLink,
                                                                                       UserDefinedFunction udf, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> upsertUserDefinedFunctionInternal(collectionLink, udf, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<UserDefinedFunction>> upsertUserDefinedFunctionInternal(String collectionLink,
                                                                                                UserDefinedFunction udf, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {
            logger.debug("Upserting a UserDefinedFunction. collectionLink [{}], udf id [{}]", collectionLink,
                    udf.getId());
            RxDocumentServiceRequest request = getUserDefinedFunctionRequest(collectionLink, udf, options,
                    OperationType.Upsert);
            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.upsert(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, UserDefinedFunction.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in upserting a UserDefinedFunction due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<UserDefinedFunction>> replaceUserDefinedFunction(UserDefinedFunction udf,
                                                                                        RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceUserDefinedFunctionInternal(udf, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<UserDefinedFunction>> replaceUserDefinedFunctionInternal(UserDefinedFunction udf,
                                                                                                 RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {
            if (udf == null) {
                throw new IllegalArgumentException("udf");
            }

            logger.debug("Replacing a UserDefinedFunction. udf id [{}]", udf.getId());
            validateResource(udf);

            String path = Utils.joinPath(udf.getSelfLink(), null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.UserDefinedFunction, OperationType.Replace);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Replace, ResourceType.UserDefinedFunction, path, udf, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.replace(request, retryPolicyInstance).map(response -> toResourceResponse(response, UserDefinedFunction.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in replacing a UserDefinedFunction due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<UserDefinedFunction>> deleteUserDefinedFunction(String udfLink,
                                                                                       RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteUserDefinedFunctionInternal(udfLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<UserDefinedFunction>> deleteUserDefinedFunctionInternal(String udfLink,
                                                                                                RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {
            if (StringUtils.isEmpty(udfLink)) {
                throw new IllegalArgumentException("udfLink");
            }

            logger.debug("Deleting a UserDefinedFunction. udfLink [{}]", udfLink);
            String path = Utils.joinPath(udfLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.UserDefinedFunction, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.UserDefinedFunction, path, requestHeaders, options);

            if (retryPolicyInstance != null){
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.delete(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, UserDefinedFunction.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in deleting a UserDefinedFunction due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<UserDefinedFunction>> readUserDefinedFunction(String udfLink,
                                                                                     RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readUserDefinedFunctionInternal(udfLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<UserDefinedFunction>> readUserDefinedFunctionInternal(String udfLink,
                                                                                              RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        // we are using an observable factory here
        // observable will be created fresh upon subscription
        // this is to ensure we capture most up to date information (e.g.,
        // session)
        try {
            if (StringUtils.isEmpty(udfLink)) {
                throw new IllegalArgumentException("udfLink");
            }

            logger.debug("Reading a UserDefinedFunction. udfLink [{}]", udfLink);
            String path = Utils.joinPath(udfLink, null);
            Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.UserDefinedFunction, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.UserDefinedFunction, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, UserDefinedFunction.class));

        } catch (Exception e) {
            // this is only in trace level to capture what's going on
            logger.debug("Failure in reading a UserDefinedFunction due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<UserDefinedFunction>> readUserDefinedFunctions(String collectionLink,
                                                                                  CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }

        return readFeed(options, ResourceType.UserDefinedFunction, UserDefinedFunction.class,
                Utils.joinPath(collectionLink, Paths.USER_DEFINED_FUNCTIONS_PATH_SEGMENT));
    }

    @Override
    public Flux<FeedResponse<UserDefinedFunction>> queryUserDefinedFunctions(String collectionLink,
                                                                                   String query, CosmosQueryRequestOptions options) {
        return queryUserDefinedFunctions(collectionLink, new SqlQuerySpec(query), options);
    }

    @Override
    public Flux<FeedResponse<UserDefinedFunction>> queryUserDefinedFunctions(String collectionLink,
                                                                                   SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
        return createQuery(collectionLink, querySpec, options, UserDefinedFunction.class, ResourceType.UserDefinedFunction);
    }

    @Override
    public Mono<ResourceResponse<Conflict>> readConflict(String conflictLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readConflictInternal(conflictLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Conflict>> readConflictInternal(String conflictLink, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {

        try {
            if (StringUtils.isEmpty(conflictLink)) {
                throw new IllegalArgumentException("conflictLink");
            }

            logger.debug("Reading a Conflict. conflictLink [{}]", conflictLink);
            String path = Utils.joinPath(conflictLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Conflict, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.Conflict, path, requestHeaders, options);

            Mono<RxDocumentServiceRequest> reqObs = addPartitionKeyInformation(request, null, null, options);

            return reqObs.flatMap(req -> {
                if (retryPolicyInstance != null) {
                    retryPolicyInstance.onBeforeSendRequest(request);
                }
                return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, Conflict.class));
            });

        } catch (Exception e) {
            logger.debug("Failure in reading a Conflict due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<Conflict>> readConflicts(String collectionLink, CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }

        return readFeed(options, ResourceType.Conflict, Conflict.class,
                Utils.joinPath(collectionLink, Paths.CONFLICTS_PATH_SEGMENT));
    }

    @Override
    public Flux<FeedResponse<Conflict>> queryConflicts(String collectionLink, String query,
                                                             CosmosQueryRequestOptions options) {
        return queryConflicts(collectionLink, new SqlQuerySpec(query), options);
    }

    @Override
    public Flux<FeedResponse<Conflict>> queryConflicts(String collectionLink, SqlQuerySpec querySpec,
                                                             CosmosQueryRequestOptions options) {
        return createQuery(collectionLink, querySpec, options, Conflict.class, ResourceType.Conflict);
    }

    @Override
    public Mono<ResourceResponse<Conflict>> deleteConflict(String conflictLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteConflictInternal(conflictLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Conflict>> deleteConflictInternal(String conflictLink, RequestOptions options,
                                                                          DocumentClientRetryPolicy retryPolicyInstance) {

        try {
            if (StringUtils.isEmpty(conflictLink)) {
                throw new IllegalArgumentException("conflictLink");
            }

            logger.debug("Deleting a Conflict. conflictLink [{}]", conflictLink);
            String path = Utils.joinPath(conflictLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Conflict, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.Conflict, path, requestHeaders, options);

            Mono<RxDocumentServiceRequest> reqObs = addPartitionKeyInformation(request, null, null, options);
            return reqObs.flatMap(req -> {
                if (retryPolicyInstance != null) {
                    retryPolicyInstance.onBeforeSendRequest(request);
                }

                return this.delete(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, Conflict.class));
            });

        } catch (Exception e) {
            logger.debug("Failure in deleting a Conflict due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<User>> createUser(String databaseLink, User user, RequestOptions options) {
        DocumentClientRetryPolicy documentClientRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> createUserInternal(databaseLink, user, options, documentClientRetryPolicy), documentClientRetryPolicy);
    }

    private Mono<ResourceResponse<User>> createUserInternal(String databaseLink, User user, RequestOptions options, DocumentClientRetryPolicy documentClientRetryPolicy) {
        try {
            logger.debug("Creating a User. databaseLink [{}], user id [{}]", databaseLink, user.getId());
            RxDocumentServiceRequest request = getUserRequest(databaseLink, user, options, OperationType.Create);
            return this.create(request, documentClientRetryPolicy, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, User.class));

        } catch (Exception e) {
            logger.debug("Failure in creating a User due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<User>> upsertUser(String databaseLink, User user, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> upsertUserInternal(databaseLink, user, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<User>> upsertUserInternal(String databaseLink, User user, RequestOptions options,
                                                                  DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            logger.debug("Upserting a User. databaseLink [{}], user id [{}]", databaseLink, user.getId());
            RxDocumentServiceRequest request = getUserRequest(databaseLink, user, options, OperationType.Upsert);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.upsert(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, User.class));

        } catch (Exception e) {
            logger.debug("Failure in upserting a User due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    private RxDocumentServiceRequest getUserRequest(String databaseLink, User user, RequestOptions options,
                                                    OperationType operationType) {
        if (StringUtils.isEmpty(databaseLink)) {
            throw new IllegalArgumentException("databaseLink");
        }
        if (user == null) {
            throw new IllegalArgumentException("user");
        }

        RxDocumentClientImpl.validateResource(user);

        String path = Utils.joinPath(databaseLink, Paths.USERS_PATH_SEGMENT);
        Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.User, operationType);
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
            operationType, ResourceType.User, path, user, requestHeaders, options);

        return request;
    }

    @Override
    public Mono<ResourceResponse<User>> replaceUser(User user, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceUserInternal(user, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<User>> replaceUserInternal(User user, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (user == null) {
                throw new IllegalArgumentException("user");
            }
            logger.debug("Replacing a User. user id [{}]", user.getId());
            RxDocumentClientImpl.validateResource(user);

            String path = Utils.joinPath(user.getSelfLink(), null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.User, OperationType.Replace);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Replace, ResourceType.User, path, user, requestHeaders, options);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.replace(request, retryPolicyInstance).map(response -> toResourceResponse(response, User.class));

        } catch (Exception e) {
            logger.debug("Failure in replacing a User due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }


    public Mono<ResourceResponse<User>> deleteUser(String userLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance =  this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deleteUserInternal(userLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<User>> deleteUserInternal(String userLink, RequestOptions options,
                                                                  DocumentClientRetryPolicy retryPolicyInstance) {

        try {
            if (StringUtils.isEmpty(userLink)) {
                throw new IllegalArgumentException("userLink");
            }
            logger.debug("Deleting a User. userLink [{}]", userLink);
            String path = Utils.joinPath(userLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.User, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.User, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.delete(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, User.class));

        } catch (Exception e) {
            logger.debug("Failure in deleting a User due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }
    @Override
    public Mono<ResourceResponse<User>> readUser(String userLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readUserInternal(userLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<User>> readUserInternal(String userLink, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(userLink)) {
                throw new IllegalArgumentException("userLink");
            }
            logger.debug("Reading a User. userLink [{}]", userLink);
            String path = Utils.joinPath(userLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.User, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.User, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }
            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, User.class));

        } catch (Exception e) {
            logger.debug("Failure in reading a User due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<User>> readUsers(String databaseLink, CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(databaseLink)) {
            throw new IllegalArgumentException("databaseLink");
        }

        return readFeed(options, ResourceType.User, User.class,
                Utils.joinPath(databaseLink, Paths.USERS_PATH_SEGMENT));
    }

    @Override
    public Flux<FeedResponse<User>> queryUsers(String databaseLink, String query, CosmosQueryRequestOptions options) {
        return queryUsers(databaseLink, new SqlQuerySpec(query), options);
    }

    @Override
    public Flux<FeedResponse<User>> queryUsers(String databaseLink, SqlQuerySpec querySpec,
                                                     CosmosQueryRequestOptions options) {
        return createQuery(databaseLink, querySpec, options, User.class, ResourceType.User);
    }

    @Override
    public Mono<ResourceResponse<ClientEncryptionKey>> readClientEncryptionKey(String clientEncryptionKeyLink,
                                                                RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readClientEncryptionKeyInternal(clientEncryptionKeyLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<ClientEncryptionKey>> readClientEncryptionKeyInternal(String clientEncryptionKeyLink, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(clientEncryptionKeyLink)) {
                throw new IllegalArgumentException("clientEncryptionKeyLink");
            }
            logger.debug("Reading a client encryption key. clientEncryptionKeyLink [{}]", clientEncryptionKeyLink);
            String path = Utils.joinPath(clientEncryptionKeyLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.ClientEncryptionKey, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.ClientEncryptionKey, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }
            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, ClientEncryptionKey.class));

        } catch (Exception e) {
            logger.debug("Failure in reading a client encryption key due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<ClientEncryptionKey>> createClientEncryptionKey(String databaseLink,
     ClientEncryptionKey clientEncryptionKey, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> createClientEncryptionKeyInternal(databaseLink, clientEncryptionKey, options, retryPolicyInstance), retryPolicyInstance);

    }

    private Mono<ResourceResponse<ClientEncryptionKey>> createClientEncryptionKeyInternal(String databaseLink, ClientEncryptionKey clientEncryptionKey, RequestOptions options, DocumentClientRetryPolicy documentClientRetryPolicy) {
        try {
            logger.debug("Creating a client encryption key. databaseLink [{}], clientEncryptionKey id [{}]", databaseLink, clientEncryptionKey.getId());
            RxDocumentServiceRequest request = getClientEncryptionKeyRequest(databaseLink, clientEncryptionKey, options, OperationType.Create);
            return this.create(request, documentClientRetryPolicy, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, ClientEncryptionKey.class));

        } catch (Exception e) {
            logger.debug("Failure in creating a client encryption key due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    private RxDocumentServiceRequest getClientEncryptionKeyRequest(String databaseLink, ClientEncryptionKey clientEncryptionKey, RequestOptions options,
                                                    OperationType operationType) {
        if (StringUtils.isEmpty(databaseLink)) {
            throw new IllegalArgumentException("databaseLink");
        }
        if (clientEncryptionKey == null) {
            throw new IllegalArgumentException("clientEncryptionKey");
        }

        RxDocumentClientImpl.validateResource(clientEncryptionKey);

        String path = Utils.joinPath(databaseLink, Paths.CLIENT_ENCRYPTION_KEY_PATH_SEGMENT);
        Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.ClientEncryptionKey, operationType);
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
            operationType, ResourceType.ClientEncryptionKey, path, clientEncryptionKey, requestHeaders, options);

        return request;
    }

    @Override
    public Mono<ResourceResponse<ClientEncryptionKey>> replaceClientEncryptionKey(ClientEncryptionKey clientEncryptionKey,
                                                                                  String nameBasedLink,
                                                                                  RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceClientEncryptionKeyInternal(clientEncryptionKey,
            nameBasedLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<ClientEncryptionKey>> replaceClientEncryptionKeyInternal(ClientEncryptionKey clientEncryptionKey, String nameBasedLink, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (clientEncryptionKey == null) {
                throw new IllegalArgumentException("clientEncryptionKey");
            }
            logger.debug("Replacing a clientEncryptionKey. clientEncryptionKey id [{}]", clientEncryptionKey.getId());
            RxDocumentClientImpl.validateResource(clientEncryptionKey);

            String path = Utils.joinPath(nameBasedLink, null);
            //String path = Utils.joinPath(clientEncryptionKey.getSelfLink(), null); TODO need to check with BE service
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.ClientEncryptionKey,
             OperationType.Replace);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Replace, ResourceType.ClientEncryptionKey, path, clientEncryptionKey, requestHeaders,
                 options);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.replace(request, retryPolicyInstance).map(response -> toResourceResponse(response, ClientEncryptionKey.class));

        } catch (Exception e) {
            logger.debug("Failure in replacing a clientEncryptionKey due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<ClientEncryptionKey>> readClientEncryptionKeys(String databaseLink, CosmosQueryRequestOptions options) {
        if (StringUtils.isEmpty(databaseLink)) {
            throw new IllegalArgumentException("databaseLink");
        }

        return readFeed(options, ResourceType.ClientEncryptionKey, ClientEncryptionKey.class,
            Utils.joinPath(databaseLink, Paths.CLIENT_ENCRYPTION_KEY_PATH_SEGMENT));
    }

    @Override
    public Flux<FeedResponse<ClientEncryptionKey>> queryClientEncryptionKeys(String databaseLink, String query,
     CosmosQueryRequestOptions options) {
        return queryClientEncryptionKeys(databaseLink, new SqlQuerySpec(query), options);
    }

    @Override
    public Flux<FeedResponse<ClientEncryptionKey>> queryClientEncryptionKeys(String databaseLink, SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
        return createQuery(databaseLink, querySpec, options, ClientEncryptionKey.class, ResourceType.ClientEncryptionKey);
    }

    @Override
    public Mono<ResourceResponse<Permission>> createPermission(String userLink, Permission permission,
                                                                     RequestOptions options) {
        DocumentClientRetryPolicy documentClientRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> createPermissionInternal(userLink, permission, options, documentClientRetryPolicy), this.resetSessionTokenRetryPolicy.getRequestPolicy());
    }

    private Mono<ResourceResponse<Permission>> createPermissionInternal(String userLink, Permission permission,
                                                                              RequestOptions options, DocumentClientRetryPolicy documentClientRetryPolicy) {

        try {
            logger.debug("Creating a Permission. userLink [{}], permission id [{}]", userLink, permission.getId());
            RxDocumentServiceRequest request = getPermissionRequest(userLink, permission, options,
                    OperationType.Create);
            return this.create(request, documentClientRetryPolicy, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, Permission.class));

        } catch (Exception e) {
            logger.debug("Failure in creating a Permission due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Permission>> upsertPermission(String userLink, Permission permission,
                                                                     RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> upsertPermissionInternal(userLink, permission, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Permission>> upsertPermissionInternal(String userLink, Permission permission,
                                                                              RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {

        try {
            logger.debug("Upserting a Permission. userLink [{}], permission id [{}]", userLink, permission.getId());
            RxDocumentServiceRequest request = getPermissionRequest(userLink, permission, options,
                    OperationType.Upsert);
            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.upsert(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, Permission.class));

        } catch (Exception e) {
            logger.debug("Failure in upserting a Permission due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    private RxDocumentServiceRequest getPermissionRequest(String userLink, Permission permission,
                                                          RequestOptions options, OperationType operationType) {
        if (StringUtils.isEmpty(userLink)) {
            throw new IllegalArgumentException("userLink");
        }
        if (permission == null) {
            throw new IllegalArgumentException("permission");
        }

        RxDocumentClientImpl.validateResource(permission);

        String path = Utils.joinPath(userLink, Paths.PERMISSIONS_PATH_SEGMENT);
        Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Permission, operationType);
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
            operationType, ResourceType.Permission, path, permission, requestHeaders, options);

        return request;
    }

    @Override
    public Mono<ResourceResponse<Permission>> replacePermission(Permission permission, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> replacePermissionInternal(permission, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Permission>> replacePermissionInternal(Permission permission, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (permission == null) {
                throw new IllegalArgumentException("permission");
            }
            logger.debug("Replacing a Permission. permission id [{}]", permission.getId());
            RxDocumentClientImpl.validateResource(permission);

            String path = Utils.joinPath(permission.getSelfLink(), null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Permission, OperationType.Replace);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Replace, ResourceType.Permission, path, permission, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.replace(request, retryPolicyInstance).map(response -> toResourceResponse(response, Permission.class));

        } catch (Exception e) {
            logger.debug("Failure in replacing a Permission due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Permission>> deletePermission(String permissionLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> deletePermissionInternal(permissionLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Permission>> deletePermissionInternal(String permissionLink, RequestOptions options,
                                                                              DocumentClientRetryPolicy retryPolicyInstance) {

        try {
            if (StringUtils.isEmpty(permissionLink)) {
                throw new IllegalArgumentException("permissionLink");
            }
            logger.debug("Deleting a Permission. permissionLink [{}]", permissionLink);
            String path = Utils.joinPath(permissionLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Permission, OperationType.Delete);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Delete, ResourceType.Permission, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.delete(request, retryPolicyInstance, getOperationContextAndListenerTuple(options)).map(response -> toResourceResponse(response, Permission.class));

        } catch (Exception e) {
            logger.debug("Failure in deleting a Permission due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Permission>> readPermission(String permissionLink, RequestOptions options) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readPermissionInternal(permissionLink, options, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Permission>> readPermissionInternal(String permissionLink, RequestOptions options, DocumentClientRetryPolicy retryPolicyInstance ) {
        try {
            if (StringUtils.isEmpty(permissionLink)) {
                throw new IllegalArgumentException("permissionLink");
            }
            logger.debug("Reading a Permission. permissionLink [{}]", permissionLink);
            String path = Utils.joinPath(permissionLink, null);
            Map<String, String> requestHeaders = getRequestHeaders(options, ResourceType.Permission, OperationType.Read);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.Permission, path, requestHeaders, options);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }
            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, Permission.class));

        } catch (Exception e) {
            logger.debug("Failure in reading a Permission due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<Permission>> readPermissions(String userLink, CosmosQueryRequestOptions options) {

        if (StringUtils.isEmpty(userLink)) {
            throw new IllegalArgumentException("userLink");
        }

        return readFeed(options, ResourceType.Permission, Permission.class,
                Utils.joinPath(userLink, Paths.PERMISSIONS_PATH_SEGMENT));
    }

    @Override
    public Flux<FeedResponse<Permission>> queryPermissions(String userLink, String query,
                                                                 CosmosQueryRequestOptions options) {
        return queryPermissions(userLink, new SqlQuerySpec(query), options);
    }

    @Override
    public Flux<FeedResponse<Permission>> queryPermissions(String userLink, SqlQuerySpec querySpec,
                                                                 CosmosQueryRequestOptions options) {
        return createQuery(userLink, querySpec, options, Permission.class, ResourceType.Permission);
    }

    @Override
    public Mono<ResourceResponse<Offer>> replaceOffer(Offer offer) {
        DocumentClientRetryPolicy documentClientRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> replaceOfferInternal(offer, documentClientRetryPolicy), documentClientRetryPolicy);
    }

    private Mono<ResourceResponse<Offer>> replaceOfferInternal(Offer offer, DocumentClientRetryPolicy documentClientRetryPolicy) {
        try {
            if (offer == null) {
                throw new IllegalArgumentException("offer");
            }
            logger.debug("Replacing an Offer. offer id [{}]", offer.getId());
            RxDocumentClientImpl.validateResource(offer);

            String path = Utils.joinPath(offer.getSelfLink(), null);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this, OperationType.Replace,
                    ResourceType.Offer, path, offer, null, null);
            return this.replace(request, documentClientRetryPolicy).map(response -> toResourceResponse(response, Offer.class));

        } catch (Exception e) {
            logger.debug("Failure in replacing an Offer due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Mono<ResourceResponse<Offer>> readOffer(String offerLink) {
        DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> readOfferInternal(offerLink, retryPolicyInstance), retryPolicyInstance);
    }

    private Mono<ResourceResponse<Offer>> readOfferInternal(String offerLink, DocumentClientRetryPolicy retryPolicyInstance) {
        try {
            if (StringUtils.isEmpty(offerLink)) {
                throw new IllegalArgumentException("offerLink");
            }
            logger.debug("Reading an Offer. offerLink [{}]", offerLink);
            String path = Utils.joinPath(offerLink, null);
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.Offer, path, (HashMap<String, String>)null, null);

            if (retryPolicyInstance != null) {
                retryPolicyInstance.onBeforeSendRequest(request);
            }

            return this.read(request, retryPolicyInstance).map(response -> toResourceResponse(response, Offer.class));

        } catch (Exception e) {
            logger.debug("Failure in reading an Offer due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    @Override
    public Flux<FeedResponse<Offer>> readOffers(CosmosQueryRequestOptions options) {
        return readFeed(options, ResourceType.Offer, Offer.class,
                Utils.joinPath(Paths.OFFERS_PATH_SEGMENT, null));
    }

//    private <T extends Resource> Flux<FeedResponse<T>> readFeedCollectionChild(FeedOptions options, ResourceType resourceType,
//                                                                               Class<T> klass, String resourceLink) {
//        if (options == null) {
//            options = new FeedOptions();
//        }
//
//        int maxPageSize = options.getMaxItemCount() != null ? options.getMaxItemCount() : -1;
//
//        final FeedOptions finalFeedOptions = options;
//        RequestOptions requestOptions = new RequestOptions();
//        requestOptions.setPartitionKey(options.getPartitionKey());
//        BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc = (continuationToken, pageSize) -> {
//            Map<String, String> requestHeaders = new HashMap<>();
//            if (continuationToken != null) {
//                requestHeaders.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken);
//            }
//            requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Integer.toString(pageSize));
//            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.ReadFeed,
//                resourceType, resourceLink, requestHeaders, finalFeedOptions);
//            return request;
//        };
//
//        Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc = request -> {
//            return ObservableHelper.inlineIfPossibleAsObs(() -> {
//                Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
//                Mono<RxDocumentServiceRequest> requestObs = this.addPartitionKeyInformation(request, null, null, requestOptions, collectionObs);
//
//                return requestObs.flatMap(req -> this.readFeed(req)
//                    .map(response -> toFeedResponsePage(response, klass)));
//            }, this.resetSessionTokenRetryPolicy.getRequestPolicy());
//        };
//
//        return Paginator.getPaginatedQueryResultAsObservable(options, createRequestFunc, executeFunc, klass, maxPageSize);
//    }

    private <T extends Resource> Flux<FeedResponse<T>> readFeed(CosmosQueryRequestOptions options, ResourceType resourceType, Class<T> klass, String resourceLink) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        Integer maxItemCount = ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(options);
        int maxPageSize = maxItemCount != null ? maxItemCount : -1;
        final CosmosQueryRequestOptions finalCosmosQueryRequestOptions = options;
        DocumentClientRetryPolicy retryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc = (continuationToken, pageSize) -> {
            Map<String, String> requestHeaders = new HashMap<>();
            if (continuationToken != null) {
                requestHeaders.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken);
            }
            requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Integer.toString(pageSize));
            RxDocumentServiceRequest request =  RxDocumentServiceRequest.create(this,
                OperationType.ReadFeed, resourceType, resourceLink, requestHeaders, finalCosmosQueryRequestOptions);
            retryPolicy.onBeforeSendRequest(request);
            return request;
        };

        Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc = request -> ObservableHelper
            .inlineIfPossibleAsObs(() -> readFeed(request).map(response -> toFeedResponsePage(response, klass)),
                retryPolicy);

        return Paginator.getPaginatedQueryResultAsObservable(options, createRequestFunc, executeFunc, klass, maxPageSize);
    }

    @Override
    public Flux<FeedResponse<Offer>> queryOffers(String query, CosmosQueryRequestOptions options) {
        return queryOffers(new SqlQuerySpec(query), options);
    }

    @Override
    public Flux<FeedResponse<Offer>> queryOffers(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
        return createQuery(null, querySpec, options, Offer.class, ResourceType.Offer);
    }

    @Override
    public Mono<DatabaseAccount> getDatabaseAccount() {
        DocumentClientRetryPolicy documentClientRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
        return ObservableHelper.inlineIfPossibleAsObs(() -> getDatabaseAccountInternal(documentClientRetryPolicy),
         documentClientRetryPolicy);
    }

    @Override
    public DatabaseAccount getLatestDatabaseAccount() {
        return this.globalEndpointManager.getLatestDatabaseAccount();
    }

    private Mono<DatabaseAccount> getDatabaseAccountInternal(DocumentClientRetryPolicy documentClientRetryPolicy) {
        try {
            logger.debug("Getting Database Account");
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                    OperationType.Read,
                    ResourceType.DatabaseAccount, "", // path
                    (HashMap<String, String>) null,
                    null);
            return this.read(request, documentClientRetryPolicy).map(ModelBridgeInternal::toDatabaseAccount);

        } catch (Exception e) {
            logger.debug("Failure in getting Database Account due to [{}]", e.getMessage(), e);
            return Mono.error(e);
        }
    }

    public Object getSession() {
        return this.sessionContainer;
    }

    public void setSession(Object sessionContainer) {
        this.sessionContainer = (SessionContainer) sessionContainer;
    }

    @Override
    public RxClientCollectionCache getCollectionCache() {
        return this.collectionCache;
    }

    @Override
    public RxPartitionKeyRangeCache getPartitionKeyRangeCache() {
        return partitionKeyRangeCache;
    }

    public Flux<DatabaseAccount> getDatabaseAccountFromEndpoint(URI endpoint) {
        return Flux.defer(() -> {
            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
                OperationType.Read, ResourceType.DatabaseAccount, "", null, (Object) null);
            return this.populateHeaders(request, RequestVerb.GET)
                .flatMap(requestPopulated -> {

                    requestPopulated.setEndpointOverride(endpoint);
                    return this.gatewayProxy.processMessage(requestPopulated).doOnError(e -> {
                        String message = String.format("Failed to retrieve database account information. %s",
                            e.getCause() != null
                                ? e.getCause().toString()
                                : e.toString());
                        logger.warn(message);
                    }).map(rsp -> rsp.getResource(DatabaseAccount.class))
                        .doOnNext(databaseAccount ->
                            this.useMultipleWriteLocations = this.connectionPolicy.isMultipleWriteRegionsEnabled()
                            && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount));
                });
        });
    }

    /**
     * Certain requests must be routed through gateway even when the client connectivity mode is direct.
     *
     * @param request
     * @return RxStoreModel
     */
    private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) {
        // If a request is configured to always use GATEWAY mode(in some cases when targeting .NET Core)
        // we return the GATEWAY store model
        if (request.UseGatewayMode) {
            return this.gatewayProxy;
        }

        ResourceType resourceType = request.getResourceType();
        OperationType operationType = request.getOperationType();

        if (resourceType == ResourceType.Offer ||
            resourceType == ResourceType.ClientEncryptionKey ||
            resourceType.isScript() && operationType != OperationType.ExecuteJavaScript ||
            resourceType == ResourceType.PartitionKeyRange ||
            resourceType == ResourceType.PartitionKey && operationType == OperationType.Delete) {
            return this.gatewayProxy;
        }

        if (operationType == OperationType.Create
                || operationType == OperationType.Upsert) {
            if (resourceType == ResourceType.Database ||
                    resourceType == ResourceType.User ||
                    resourceType == ResourceType.DocumentCollection ||
                    resourceType == ResourceType.Permission) {
                return this.gatewayProxy;
            } else {
                return this.storeModel;
            }
        } else if (operationType == OperationType.Delete) {
            if (resourceType == ResourceType.Database ||
                    resourceType == ResourceType.User ||
                    resourceType == ResourceType.DocumentCollection) {
                return this.gatewayProxy;
            } else {
                return this.storeModel;
            }
        } else if (operationType == OperationType.Replace) {
            if (resourceType == ResourceType.DocumentCollection) {
                return this.gatewayProxy;
            } else {
                return this.storeModel;
            }
        } else if (operationType == OperationType.Read) {
            if (resourceType == ResourceType.DocumentCollection) {
                return this.gatewayProxy;
            } else {
                return this.storeModel;
            }
        } else {
            if ((operationType == OperationType.Query ||
                operationType == OperationType.SqlQuery ||
                operationType == OperationType.ReadFeed) &&
                    Utils.isCollectionChild(request.getResourceType())) {
                // Go to gateway only when partition key range and partition key are not set. This should be very rare
                if (request.getPartitionKeyRangeIdentity() == null &&
                        request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) == null) {
                    return this.gatewayProxy;
                }
            }

            return this.storeModel;
        }
    }

    @Override
    public void close() {
        logger.info("Attempting to close client {}", this.clientId);
        if (!closed.getAndSet(true)) {
            logger.info("Shutting down ...");
            logger.info("Closing Global Endpoint Manager ...");
            LifeCycleUtils.closeQuietly(this.globalEndpointManager);
            logger.info("Closing StoreClientFactory ...");
            LifeCycleUtils.closeQuietly(this.storeClientFactory);
            logger.info("Shutting down reactorHttpClient ...");
            LifeCycleUtils.closeQuietly(this.reactorHttpClient);
            logger.info("Shutting down CpuMonitor ...");
            CpuMemoryMonitor.unregister(this);

            if (this.throughputControlEnabled.get()) {
                logger.info("Closing ThroughputControlStore ...");
                this.throughputControlStore.close();
            }

            logger.info("Shutting down completed.");
        } else {
            logger.warn("Already shutdown!");
        }
    }

    @Override
    public ItemDeserializer getItemDeserializer() {
        return this.itemDeserializer;
    }

    @Override
    public synchronized void enableThroughputControlGroup(ThroughputControlGroupInternal group) {
        checkNotNull(group, "Throughput control group can not be null");

        if (this.throughputControlEnabled.compareAndSet(false, true)) {
            this.throughputControlStore =
                new ThroughputControlStore(
                    this.collectionCache,
                    this.connectionPolicy.getConnectionMode(),
                    this.partitionKeyRangeCache);

            this.storeModel.enableThroughputControl(throughputControlStore);
        }

        this.throughputControlStore.enableThroughputControlGroup(group);
    }

    private static SqlQuerySpec createLogicalPartitionScanQuerySpec(
        PartitionKey partitionKey,
        String partitionKeySelector) {

        StringBuilder queryStringBuilder = new StringBuilder();
        List<SqlParameter> parameters = new ArrayList<>();

        queryStringBuilder.append("SELECT * FROM c WHERE");
        Object pkValue = ModelBridgeInternal.getPartitionKeyObject(partitionKey);
        String pkParamName = "@pkValue";
        parameters.add(new SqlParameter(pkParamName, pkValue));

        queryStringBuilder.append(" c");
        // partition key def
        queryStringBuilder.append(partitionKeySelector);
        queryStringBuilder.append((" = "));
        queryStringBuilder.append(pkParamName);

        return new SqlQuerySpec(queryStringBuilder.toString(), parameters);
    }

    @Override
    public Mono<List<FeedRange>> getFeedRanges(String collectionLink) {
        InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy(
            this.collectionCache,
            null,
            collectionLink,
            new HashMap<>());

        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
            this,
            OperationType.Query,
            ResourceType.Document,
            collectionLink,
            null);

        invalidPartitionExceptionRetryPolicy.onBeforeSendRequest(request);

        return ObservableHelper.inlineIfPossibleAsObs(
            () -> getFeedRangesInternal(request, collectionLink),
            invalidPartitionExceptionRetryPolicy);
    }

    private Mono<List<FeedRange>> getFeedRangesInternal(RxDocumentServiceRequest request, String collectionLink) {
        logger.debug("getFeedRange collectionLink=[{}]", collectionLink);

        if (StringUtils.isEmpty(collectionLink)) {
            throw new IllegalArgumentException("collectionLink");
        }

        Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(null,
            request);

        return collectionObs.flatMap(documentCollectionResourceResponse -> {
            final DocumentCollection collection = documentCollectionResourceResponse.v;
            if (collection == null) {
                throw new IllegalStateException("Collection cannot be null");
            }

            Mono<Utils.ValueHolder<List<PartitionKeyRange>>> valueHolderMono = partitionKeyRangeCache
                .tryGetOverlappingRangesAsync(
                    BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
                    collection.getResourceId(), RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES, true, null);

            return valueHolderMono.map(partitionKeyRangeList -> toFeedRanges(partitionKeyRangeList, request));
        });
    }

    private static List<FeedRange> toFeedRanges(
        Utils.ValueHolder<List<PartitionKeyRange>> partitionKeyRangeListValueHolder, RxDocumentServiceRequest request) {
        final List<PartitionKeyRange> partitionKeyRangeList = partitionKeyRangeListValueHolder.v;
        if (partitionKeyRangeList == null) {
            request.forceNameCacheRefresh = true;
            throw new InvalidPartitionException();
        }

        List<FeedRange> feedRanges = new ArrayList<>();
        partitionKeyRangeList.forEach(pkRange -> feedRanges.add(toFeedRange(pkRange)));

        return feedRanges;
    }

    private static FeedRange toFeedRange(PartitionKeyRange pkRange) {
        return new FeedRangeEpkImpl(pkRange.toRange());
    }
}