HttpTransportClient.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.ConflictException;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.ForbiddenException;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.InvalidPartitionException;
import com.azure.cosmos.implementation.LockedException;
import com.azure.cosmos.implementation.MethodNotAllowedException;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.PreconditionFailedException;
import com.azure.cosmos.implementation.RequestEntityTooLargeException;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.RetryWithException;
import com.azure.cosmos.implementation.ServiceUnavailableException;
import com.azure.cosmos.implementation.UnauthorizedException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.Integers;
import com.azure.cosmos.implementation.Lists;
import com.azure.cosmos.implementation.Longs;
import com.azure.cosmos.implementation.MutableVolatile;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.RMResources;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RuntimeConstants;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpClientConfig;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.azure.cosmos.implementation.Utils.trimBeginningAndEndingSlashes;
/*
 * The following code only support Document Write without any error handling support.
 */
public class HttpTransportClient extends TransportClient {
    private final Logger logger = LoggerFactory.getLogger(HttpTransportClient.class);
    private final HttpClient httpClient;
    private final Map<String, String> defaultHeaders;
    private final Configs configs;

    HttpClient createHttpClient(ConnectionPolicy connectionPolicy) {
        // TODO: use one instance of SSL context everywhere
        HttpClientConfig httpClientConfig = new HttpClientConfig(this.configs);
        httpClientConfig.withNetworkRequestTimeout(connectionPolicy.getHttpNetworkRequestTimeout());
        httpClientConfig.withPoolSize(configs.getDirectHttpsMaxConnectionLimit());

        return HttpClient.createFixed(httpClientConfig);
    }

    public HttpTransportClient(Configs configs, ConnectionPolicy connectionPolicy, UserAgentContainer userAgent) {
        this.configs = configs;
        this.httpClient = createHttpClient(connectionPolicy);

        this.defaultHeaders = new HashMap<>();

        // Set requested API version header for version enforcement.
        this.defaultHeaders.put(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION);
        this.defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL, HttpConstants.HeaderValues.NO_CACHE);

        if (userAgent == null) {
            userAgent = new UserAgentContainer();
        }

        this.defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgent.getUserAgent());
        this.defaultHeaders.put(HttpConstants.HttpHeaders.ACCEPT, RuntimeConstants.MediaTypes.JSON);
    }

    @Override
    public void close() {
        httpClient.shutdown();
    }

    public Mono<StoreResponse> invokeStoreAsync(
        Uri physicalAddressUri,
        RxDocumentServiceRequest request) {

        try {
            URI physicalAddress = physicalAddressUri.getURI();

            ResourceOperation resourceOperation = new ResourceOperation(request.getOperationType(), request.getResourceType());
            // uuid correlation manager
            String activityId = request.getActivityId().toString();

            if (resourceOperation.operationType == OperationType.Recreate) {
                Map<String, String> errorResponseHeaders = new HashMap<>();
                errorResponseHeaders.put(HttpConstants.HttpHeaders.REQUEST_VALIDATION_FAILURE, "1");

                logger.error("Received Recreate request on Http client");
                throw new InternalServerErrorException(RMResources.InternalServerError, null, errorResponseHeaders, null);
            }

            HttpRequest httpRequest = prepareHttpMessage(activityId, physicalAddressUri, resourceOperation, request);

            MutableVolatile<Instant> sendTimeUtc = new MutableVolatile<>();

            Duration responseTimeout = Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds());
            if (OperationType.QueryPlan.equals(request.getOperationType())) {
                responseTimeout = Duration.ofSeconds(Configs.getQueryPlanResponseTimeoutInSeconds());
            } else if (request.isAddressRefresh()) {
                responseTimeout = Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds());
            }

            Mono<HttpResponse> httpResponseMono = this.httpClient
                    .send(httpRequest, responseTimeout)
                    .doOnSubscribe(subscription -> {
                        sendTimeUtc.v = Instant.now();
                        this.beforeRequest(
                                activityId,
                                httpRequest.uri(),
                                request.getResourceType(),
                                httpRequest.headers());
                    })
                    .onErrorResume(t -> {
                        Exception exception = Utils.as(t, Exception.class);
                        if (exception == null) {
                            logger.error("critical failure", t);
                            t.printStackTrace();
                            assert false : "critical failure";
                            return Mono.error(t);
                        }

                        //Trace.CorrelationManager.ActivityId = activityId;
                        if (WebExceptionUtility.isWebExceptionRetriable(exception)) {
                            logger.debug("Received retriable exception {} " +
                                            "sending the request to {}, will re-resolve the address " +
                                            "send time UTC: {}",
                                    exception,
                                    physicalAddress,
                                    sendTimeUtc);

                            GoneException goneException = new GoneException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            RMResources.Gone),
                                    exception,
                                    null,
                                    physicalAddress);

                            return Mono.error(goneException);
                        } else if (request.isReadOnlyRequest()) {
                            logger.trace("Received exception {} on readonly request" +
                                            "sending the request to {}, will reresolve the address " +
                                            "send time UTC: {}",
                                    exception,
                                    physicalAddress,
                                    sendTimeUtc);

                            GoneException goneException = new GoneException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            RMResources.Gone),
                                    exception,
                                    null,
                                    physicalAddress);

                            return Mono.error(goneException);
                        } else {
                            // We can't throw a GoneException here because it will cause retry and we don't
                            // know if the request failed before or after the message got sent to the server.
                            // So in order to avoid duplicating the request we will not retry.
                            // TODO: a possible solution for this is to add the ability to send a request to the server
                            // to check if the previous request was received or not and act accordingly.
                            ServiceUnavailableException serviceUnavailableException = new ServiceUnavailableException(
                                exception.getMessage(),
                                exception,
                                null,
                                physicalAddress.toString());
                            serviceUnavailableException.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_VALIDATION_FAILURE, "1");
                            serviceUnavailableException.getResponseHeaders().put(HttpConstants.HttpHeaders.WRITE_REQUEST_TRIGGER_ADDRESS_REFRESH, "1");
                            return Mono.error(serviceUnavailableException);
                        }})
                    .doOnSuccess(httpClientResponse -> {
                        Instant receivedTimeUtc = Instant.now();
                        double durationInMilliSeconds = (receivedTimeUtc.toEpochMilli() - sendTimeUtc.v.toEpochMilli());
                        this.afterRequest(
                                activityId,
                                httpClientResponse.statusCode(),
                                durationInMilliSeconds,
                                httpClientResponse.headers());
                    })
                    .doOnError(e -> {
                        Instant receivedTimeUtc = Instant.now();
                        double durationInMilliSeconds = (receivedTimeUtc.toEpochMilli() - sendTimeUtc.v.toEpochMilli());
                        this.afterRequest(
                                activityId,
                                0,
                                durationInMilliSeconds,
                                null);
                    });

            return httpResponseMono.flatMap(rsp -> processHttpResponse(request.requestContext.resourcePhysicalAddress,
                    httpRequest, activityId, rsp, physicalAddress));

        } catch (Exception e) {
            return Mono.error(e);
        }
    }

    private void beforeRequest(String activityId, URI uri, ResourceType resourceType, HttpHeaders requestHeaders) {
        // TODO: perf counters
        // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/258624
    }

    private void afterRequest(String activityId,
                              int statusCode,
                              double durationInMilliSeconds,
                              HttpHeaders responseHeaders) {
        // TODO: perf counters
        // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/258624
    }

    private static void addHeader(HttpHeaders requestHeaders, String headerName, RxDocumentServiceRequest request) {
        String headerValue = request.getHeaders().get(headerName);
        if (!Strings.isNullOrEmpty(headerValue)) {
            requestHeaders.set(headerName, headerValue);
        }
    }

    private static void addHeader(HttpHeaders requestHeaders, String headerName, String headerValue) {
        if (!Strings.isNullOrEmpty(headerValue)) {
            requestHeaders.set(headerName, headerValue);
        }
    }

    private String getMatch(RxDocumentServiceRequest request, ResourceOperation resourceOperation) {
        switch (resourceOperation.operationType) {
            case Delete:
            case ExecuteJavaScript:
            case Replace:
            case Patch:
            case Upsert:
                return request.getHeaders().get(HttpConstants.HttpHeaders.IF_MATCH);

            case Read:
            case ReadFeed:
                return request.getHeaders().get(HttpConstants.HttpHeaders.IF_NONE_MATCH);

            default:
                return null;
        }
    }

    private HttpRequest prepareHttpMessage(
        String activityId,
        Uri physicalAddress,
        ResourceOperation resourceOperation,
        RxDocumentServiceRequest request) throws Exception {

        HttpRequest httpRequestMessage;
        String requestUri;
        HttpMethod method;

        // The StreamContent created below will own and dispose its underlying stream, but we may need to reuse the stream on the
        // RxDocumentServiceRequest for future requests. Hence we need to clone without incurring copy cost, so that when
        // HttpRequestMessage -> StreamContent -> MemoryStream all get disposed, the original stream will be left open.
        switch (resourceOperation.operationType) {
            case Create:
            case Batch:
                requestUri = getResourceFeedUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.POST;
                assert request.getContentAsByteArrayFlux() != null;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                httpRequestMessage.withBody(request.getContentAsByteArrayFlux());
                break;

            case ExecuteJavaScript:
                requestUri = getResourceEntryUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.POST;
                assert request.getContentAsByteArrayFlux() != null;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                httpRequestMessage.withBody(request.getContentAsByteArrayFlux());
                break;

            case Delete:
                requestUri = getResourceEntryUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.DELETE;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                break;

            case Read:
                requestUri = getResourceEntryUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.GET;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                break;

            case ReadFeed:
                requestUri = getResourceFeedUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.GET;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                break;

            case Replace:
                requestUri = getResourceEntryUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.PUT;
                assert request.getContentAsByteArrayFlux() != null;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                httpRequestMessage.withBody(request.getContentAsByteArrayFlux());
                break;

            case Patch:
                requestUri = getResourceEntryUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.PATCH;
                assert request.getContentAsByteArrayFlux() != null;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                httpRequestMessage.withBody(request.getContentAsByteArrayFlux());
                break;

            case Query:
            case SqlQuery:
                requestUri = getResourceFeedUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.POST;
                assert request.getContentAsByteArrayFlux() != null;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                httpRequestMessage.withBody(request.getContentAsByteArrayFlux());
                HttpTransportClient.addHeader(httpRequestMessage.headers(), HttpConstants.HttpHeaders.CONTENT_TYPE, request);
                break;

            case Upsert:
                requestUri = getResourceFeedUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.POST;
                assert request.getContentAsByteArrayFlux() != null;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                httpRequestMessage.withBody(request.getContentAsByteArrayFlux());
                break;

            case Head:
                requestUri = getResourceEntryUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.HEAD;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                break;

            case HeadFeed:
                requestUri = getResourceFeedUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request);
                method = HttpMethod.HEAD;
                httpRequestMessage = new HttpRequest(method, requestUri, physicalAddress.getURI().getPort());
                break;

            default:
                assert false : "Unsupported operation type";
                throw new IllegalStateException();
        }

        Map<String, String> documentServiceRequestHeaders = request.getHeaders();
        HttpHeaders httpRequestHeaders = httpRequestMessage.headers();

        // add default headers
        for(Map.Entry<String, String> entry: defaultHeaders.entrySet()) {
            HttpTransportClient.addHeader(httpRequestHeaders, entry.getKey(), entry.getValue());
        }

        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.VERSION, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.USER_AGENT, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.PAGE_SIZE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.PRE_TRIGGER_INCLUDE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.PRE_TRIGGER_EXCLUDE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.POST_TRIGGER_INCLUDE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.POST_TRIGGER_EXCLUDE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.AUTHORIZATION, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.INDEXING_DIRECTIVE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.MIGRATE_COLLECTION_DIRECTIVE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.SESSION_TOKEN, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.PREFER, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.RESOURCE_TOKEN_EXPIRY, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.ENABLE_SCAN_IN_QUERY, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.EMIT_VERBOSE_TRACES_IN_QUERY, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.CAN_CHARGE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.CAN_THROTTLE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.ENABLE_LOW_PRECISION_ORDER_BY, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.ENABLE_LOGGING, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_READ_ONLY_SCRIPT, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.CONTENT_SERIALIZATION_FORMAT, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.CONTINUATION, request.getContinuation());
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.ACTIVITY_ID, activityId);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.PARTITION_KEY, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.READ_FEED_KEY_TYPE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.START_EPK, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.END_EPK, request);

        String dateHeader = HttpUtils.getDateHeader(documentServiceRequestHeaders);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.X_DATE, dateHeader);
        HttpTransportClient.addHeader(httpRequestHeaders, "Match", this.getMatch(request, resourceOperation));
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IF_MODIFIED_SINCE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.A_IM, request);
        if (!request.getIsNameBased()) {
            HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.RESOURCE_ID, request.getResourceId());
        }

        HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.ENTITY_ID, request.entityId);

        String fanoutRequestHeader = request.getHeaders().get(WFConstants.BackendHeaders.IS_FANOUT_REQUEST);
        HttpTransportClient.addHeader(httpRequestMessage.headers(), WFConstants.BackendHeaders.IS_FANOUT_REQUEST, fanoutRequestHeader);

        if (request.getResourceType() == ResourceType.DocumentCollection) {
            HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.COLLECTION_PARTITION_INDEX, documentServiceRequestHeaders.get(WFConstants.BackendHeaders.COLLECTION_PARTITION_INDEX));
            HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.COLLECTION_SERVICE_INDEX, documentServiceRequestHeaders.get(WFConstants.BackendHeaders.COLLECTION_SERVICE_INDEX));
        }

        if (documentServiceRequestHeaders.get(WFConstants.BackendHeaders.BIND_REPLICA_DIRECTIVE) != null) {
            HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.BIND_REPLICA_DIRECTIVE, documentServiceRequestHeaders.get(WFConstants.BackendHeaders.BIND_REPLICA_DIRECTIVE));
            HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.PRIMARY_MASTER_KEY, documentServiceRequestHeaders.get(WFConstants.BackendHeaders.PRIMARY_MASTER_KEY));
            HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.SECONDARY_MASTER_KEY, documentServiceRequestHeaders.get(WFConstants.BackendHeaders.SECONDARY_MASTER_KEY));
            HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.PRIMARY_READONLY_KEY, documentServiceRequestHeaders.get(WFConstants.BackendHeaders.PRIMARY_READONLY_KEY));
            HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.SECONDARY_READONLY_KEY, documentServiceRequestHeaders.get(WFConstants.BackendHeaders.SECONDARY_READONLY_KEY));
        }

        if (documentServiceRequestHeaders.get(HttpConstants.HttpHeaders.CAN_OFFER_REPLACE_COMPLETE) != null) {
            HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.CAN_OFFER_REPLACE_COMPLETE, documentServiceRequestHeaders.get(HttpConstants.HttpHeaders.CAN_OFFER_REPLACE_COMPLETE));
        }

        //Query
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_QUERY, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.QUERY, request);

        // Upsert
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_UPSERT, request);

        // SupportSpatialLegacyCoordinates
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.SUPPORT_SPATIAL_LEGACY_COORDINATES, request);

        HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.PARTITION_COUNT, request);

        HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.COLLECTION_RID, request);

        // Filter by schema
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.FILTER_BY_SCHEMA_RESOURCE_ID, request);

        // UsePolygonsSmallerThanAHemisphere
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.USE_POLYGONS_SMALLER_THAN_AHEMISPHERE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.GATEWAY_SIGNATURE, request);

        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.POPULATE_QUOTA_INFO, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.POPULATE_QUERY_METRICS, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.FORCE_QUERY_SCAN, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB, request);
        HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.REMOTE_STORAGE_TYPE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.SHARE_THROUGHPUT, request);

        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.POPULATE_PARTITION_STATISTICS, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.POPULATE_COLLECTION_THROUGHPUT_INFO, request);

        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.REMAINING_TIME_IN_MS_ON_CLIENT_REQUEST, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.CLIENT_RETRY_ATTEMPT_COUNT, request);

        // target lsn for head requests.
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.TARGET_LSN, request);
        HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.TARGET_GLOBAL_COMMITTED_LSN, request);

        HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.FEDERATION_ID_FOR_AUTH, request);

        HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.FANOUT_OPERATION_STATE, request);
        HttpTransportClient.addHeader(httpRequestHeaders, WFConstants.BackendHeaders.ALLOW_TENTATIVE_WRITES, request);

        HttpTransportClient.addHeader(httpRequestHeaders, CustomHeaders.HttpHeaders.EXCLUDE_SYSTEM_PROPERTIES, request);

        if (resourceOperation.operationType == OperationType.Batch) {
            HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_BATCH_REQUEST, request);
            HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.SHOULD_BATCH_CONTINUE_ON_ERROR, request);
            HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_BATCH_ORDERED, request);
            HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_BATCH_ATOMIC, request);
        }

        return httpRequestMessage;
    }

    static String getResourceFeedUri(ResourceType resourceType, String physicalAddress, RxDocumentServiceRequest request) throws Exception {
        switch (resourceType) {
            case Attachment:
                return getAttachmentFeedUri(physicalAddress, request);
            case DocumentCollection:
                return getCollectionFeedUri(physicalAddress, request);
            case Conflict:
                return getConflictFeedUri(physicalAddress, request);
            case Database:
                return getDatabaseFeedUri(physicalAddress);
            case Document:
                return getDocumentFeedUri(physicalAddress, request);
            case Permission:
                return getPermissionFeedUri(physicalAddress, request);
            case StoredProcedure:
                return getStoredProcedureFeedUri(physicalAddress, request);
            case Trigger:
                return getTriggerFeedUri(physicalAddress, request);
            case User:
                return getUserFeedUri(physicalAddress, request);

            case UserDefinedFunction:
                return getUserDefinedFunctionFeedUri(physicalAddress, request);
            case Schema:
                return getSchemaFeedUri(physicalAddress, request);
            case Offer:
                return getOfferFeedUri(physicalAddress, request);

//          Other types: Replica, Module, ModuleCommand, Record, UserDefinedType not applicable to SDK.

            default:
                assert false : "Unexpected resource type: " + resourceType;
                throw new NotFoundException();
        }
    }

    private static String getResourceEntryUri(ResourceType resourceType, String physicalAddress, RxDocumentServiceRequest request) throws Exception {
        switch (resourceType) {
            case Attachment:
                return getAttachmentEntryUri(physicalAddress, request);
            case DocumentCollection:
                return getCollectionEntryUri(physicalAddress, request);
            case Conflict:
                return getConflictEntryUri(physicalAddress, request);
            case Database:
                return getDatabaseEntryUri(physicalAddress, request);
            case Document:
                return getDocumentEntryUri(physicalAddress, request);
            case Permission:
                return getPermissionEntryUri(physicalAddress, request);
            case StoredProcedure:
                return getStoredProcedureEntryUri(physicalAddress, request);
            case Trigger:
                return getTriggerEntryUri(physicalAddress, request);
            case User:
                return getUserEntryUri(physicalAddress, request);
            case UserDefinedFunction:
                return getUserDefinedFunctionEntryUri(physicalAddress, request);
            case Schema:
                return getSchemaEntryUri(physicalAddress, request);
            case Offer:
                return getOfferEntryUri(physicalAddress, request);

//          Other types: Replica, Module, ModuleCommand, Record, UserDefinedType not applicable to SDK.

            default:
                assert false: "Unexpected resource type: " + resourceType;
                throw new IllegalStateException();
        }
    }

    static String createURI(String baseAddress, String resourcePath) {
        if (baseAddress.charAt(baseAddress.length() - 1) == '/') {
            return baseAddress + HttpUtils.urlEncode(trimBeginningAndEndingSlashes(resourcePath));
        } else {
            return baseAddress + '/' + HttpUtils.urlEncode(trimBeginningAndEndingSlashes(resourcePath));
        }
    }

    static String getRootFeedUri(String baseAddress) {
        return baseAddress;
    }

    private static String getDatabaseFeedUri(String baseAddress) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Database, StringUtils.EMPTY, true));
    }

    private static String getDatabaseEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Database, request, false));
    }

    private static String getCollectionFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.DocumentCollection, request, true));
    }

    private static String getStoredProcedureFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.StoredProcedure, request, true));
    }

    private static String getTriggerFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Trigger, request, true));
    }

    private static String getUserDefinedFunctionFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.UserDefinedFunction, request, true));
    }

    private static String getCollectionEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.DocumentCollection, request, false));
    }

    private static String getStoredProcedureEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.StoredProcedure, request, false));
    }

    private static String getTriggerEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Trigger, request, false));
    }

    private static String getUserDefinedFunctionEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.UserDefinedFunction, request, false));
    }

    private static String getDocumentFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Document, request, true));
    }

    private static String getDocumentEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Document, request, false));
    }

    private static String getConflictFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Conflict, request, true));
    }

    private static String getConflictEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Conflict, request, false));
    }

    private static String getAttachmentFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Attachment, request, true));
    }

    private static String getAttachmentEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Attachment, request, false));
    }

    private static String getUserFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.User, request, true));
    }

    private static String getUserEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.User, request, false));
    }

    private static String getPermissionFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Permission, request, true));
    }

    private static String getPermissionEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Permission, request, false));
    }

    private static String getOfferFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Offer, request, true));
    }

    private static String getSchemaFeedUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Schema, request, true));
    }

    private static String getSchemaEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Schema, request, false));
    }

    private static String getOfferEntryUri(String baseAddress, RxDocumentServiceRequest request) {
        return createURI(baseAddress, PathsHelper.generatePath(ResourceType.Offer, request, false));
    }

    static String getHeader(String[] names, String[] values, String name) {
        for (int idx = 0; idx < names.length; idx++) {
            if (Strings.areEqual(names[idx], name)) {
                return values[idx];
            }
        }

        return null;
    }

    private Mono<StoreResponse> processHttpResponse(String resourceAddress, HttpRequest httpRequest, String activityId, HttpResponse response, URI physicalAddress) {
        if (response == null) {
            InternalServerErrorException exception =
                    new InternalServerErrorException(
                            String.format(
                                    RMResources.ExceptionMessage,
                                    RMResources.InvalidBackendResponse),
                            null,
                            physicalAddress);
            exception.getResponseHeaders().put(HttpConstants.HttpHeaders.ACTIVITY_ID,
                    activityId);
            exception.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_VALIDATION_FAILURE, "1");

            return Mono.error(exception);
        }

        // If the status code is < 300 or 304 NotModified (we treat not modified as success) then it means that it's a success code and shouldn't throw.
        if (response.statusCode() < HttpConstants.StatusCodes.MINIMUM_STATUSCODE_AS_ERROR_GATEWAY ||
                response.statusCode() == HttpConstants.StatusCodes.NOT_MODIFIED) {
            return ResponseUtils.toStoreResponse(response, httpRequest);
        }
        else {
            return this.createErrorResponseFromHttpResponse(resourceAddress, activityId, httpRequest, response);
        }
    }

    private Mono<StoreResponse> createErrorResponseFromHttpResponse(String resourceAddress, String activityId,
                                                                      HttpRequest request,
                                                                      HttpResponse response) {
        int statusCode = response.statusCode();
        Mono<String> errorMessageObs = ErrorUtils.getErrorResponseAsync(response, request);

        return errorMessageObs.flatMap(
                errorMessage -> {
                    long responseLSN = -1;

                    List<String> lsnValues = null;
                    String[] headerValues = response.headers().values(WFConstants.BackendHeaders.LSN);
                    if (headerValues != null) {
                        lsnValues = com.azure.cosmos.implementation.guava25.collect.Lists.newArrayList(headerValues);
                    }

                    if (lsnValues != null) {
                        String temp = lsnValues.isEmpty() ? null : lsnValues.get(0);
                        responseLSN = Longs.tryParse(temp, responseLSN);
                    }

                    String responsePartitionKeyRangeId = null;
                    List<String> partitionKeyRangeIdValues = null;
                    headerValues = response.headers().values(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID);
                    if (headerValues != null) {
                        partitionKeyRangeIdValues
                            = com.azure.cosmos.implementation.guava25.collect.Lists.newArrayList(headerValues);
                    }
                    if (partitionKeyRangeIdValues != null) {
                        responsePartitionKeyRangeId = Lists.firstOrDefault(partitionKeyRangeIdValues, null);
                    }

                    CosmosException exception;

                    switch (statusCode) {
                        case HttpConstants.StatusCodes.UNAUTHORIZED:
                            exception = new UnauthorizedException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.Unauthorized : errorMessage),
                                    response.headers(),
                                    request.uri());
                            break;

                        case HttpConstants.StatusCodes.FORBIDDEN:
                            exception = new ForbiddenException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.Forbidden : errorMessage),
                                    response.headers(),
                                    request.uri());
                            break;

                        case HttpConstants.StatusCodes.NOTFOUND:
                            // HTTP.SYS returns NotFound (404) if the URI
                            // is not registered. This is really an indication that
                            // the replica which registered the URI is not
                            // available at the server. We detect this case by
                            // the presence of Content-Type header in the response
                            // and map it to HTTP Gone (410), which is the more
                            // appropriate response for this case.
                            if (response.body() != null && response.headers() != null && response.headers().value(HttpConstants.HttpHeaders.CONTENT_TYPE) != null &&
                                    !Strings.isNullOrEmpty(response.headers().value(HttpConstants.HttpHeaders.CONTENT_TYPE)) &&
                                    Strings.containsIgnoreCase(response.headers().value(HttpConstants.HttpHeaders.CONTENT_TYPE), RuntimeConstants.MediaTypes.TEXT_HTML)) {
                                // Have the request URL in the exception message for debugging purposes.
                                exception = new GoneException(
                                        String.format(
                                                RMResources.ExceptionMessage,
                                                RMResources.Gone),
                                        request.uri().toString());
                                exception.getResponseHeaders().put(HttpConstants.HttpHeaders.ACTIVITY_ID,
                                        activityId);

                                break;
                            } else {
                                exception = new NotFoundException(
                                        String.format(
                                                RMResources.ExceptionMessage,
                                                Strings.isNullOrEmpty(errorMessage) ? RMResources.NotFound : errorMessage),
                                        response.headers(),
                                        request.uri());
                                break;
                            }

                        case HttpConstants.StatusCodes.BADREQUEST:
                            exception = new BadRequestException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.BadRequest : errorMessage),
                                    response.headers(),
                                    request.uri());
                            break;

                        case HttpConstants.StatusCodes.METHOD_NOT_ALLOWED:
                            exception = new MethodNotAllowedException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.MethodNotAllowed : errorMessage),
                                    null,
                                    response.headers(),
                                    request.uri().toString());
                            break;

                        case HttpConstants.StatusCodes.GONE: {

                            // TODO: update perf counter
                            // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/258624
                            ErrorUtils.logGoneException(request.uri(), activityId);

                            Integer nSubStatus = 0;
                            String valueSubStatus = response.headers().value(WFConstants.BackendHeaders.SUB_STATUS);
                            if (!Strings.isNullOrEmpty(valueSubStatus)) {
                                if ((nSubStatus = Integers.tryParse(valueSubStatus)) == null) {
                                    exception = new InternalServerErrorException(
                                            String.format(
                                                    RMResources.ExceptionMessage,
                                                    RMResources.InvalidBackendResponse),
                                            response.headers(),
                                            request.uri());
                                    break;
                                }
                            }

                            if (nSubStatus == HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE) {
                                exception = new InvalidPartitionException(
                                        String.format(
                                                RMResources.ExceptionMessage,
                                                Strings.isNullOrEmpty(errorMessage) ? RMResources.Gone : errorMessage),
                                        response.headers(),
                                        request.uri().toString());
                                break;
                            } else if (nSubStatus == HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE) {
                                exception = new PartitionKeyRangeGoneException(
                                        String.format(
                                                RMResources.ExceptionMessage,
                                                Strings.isNullOrEmpty(errorMessage) ? RMResources.Gone : errorMessage),
                                        response.headers(),
                                        request.uri().toString());
                                break;
                            } else if (nSubStatus == HttpConstants.SubStatusCodes.COMPLETING_SPLIT) {
                                exception = new PartitionKeyRangeIsSplittingException(
                                        String.format(
                                                RMResources.ExceptionMessage,
                                                Strings.isNullOrEmpty(errorMessage) ? RMResources.Gone : errorMessage),
                                        response.headers(),
                                        request.uri().toString());
                                break;
                            } else if (nSubStatus == HttpConstants.SubStatusCodes.COMPLETING_PARTITION_MIGRATION) {
                                exception = new PartitionIsMigratingException(
                                        String.format(
                                                RMResources.ExceptionMessage,
                                                Strings.isNullOrEmpty(errorMessage) ? RMResources.Gone : errorMessage),
                                        response.headers(),
                                        request.uri().toString());
                                break;
                            } else {
                                // Have the request URL in the exception message for debugging purposes.
                                GoneException goneExceptionFromService = new GoneException(
                                        String.format(
                                                RMResources.ExceptionMessage,
                                                RMResources.Gone),
                                        response.headers(),
                                        request.uri());
                                goneExceptionFromService.setIsBasedOn410ResponseFromService();

                                goneExceptionFromService.getResponseHeaders().put(
                                    HttpConstants.HttpHeaders.ACTIVITY_ID,
                                    activityId);

                                exception = goneExceptionFromService;
                                break;
                            }
                        }

                        case HttpConstants.StatusCodes.CONFLICT:
                            exception = new ConflictException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.EntityAlreadyExists : errorMessage),
                                    response.headers(),
                                    request.uri().toString());
                            break;

                        case HttpConstants.StatusCodes.PRECONDITION_FAILED:
                            exception = new PreconditionFailedException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.PreconditionFailed : errorMessage),
                                    response.headers(),
                                    request.uri().toString());
                            break;

                        case HttpConstants.StatusCodes.REQUEST_ENTITY_TOO_LARGE:
                            exception = new RequestEntityTooLargeException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            String.format(
                                                    RMResources.RequestEntityTooLarge,
                                                    HttpConstants.HttpHeaders.PAGE_SIZE)),
                                    response.headers(),
                                    request.uri().toString());
                            break;

                        case HttpConstants.StatusCodes.LOCKED:
                            exception = new LockedException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.Locked : errorMessage),
                                    response.headers(),
                                    request.uri().toString());
                            break;

                        case HttpConstants.StatusCodes.SERVICE_UNAVAILABLE:
                            exception = new ServiceUnavailableException(errorMessage, response.headers(), request.uri());
                            break;

                        case HttpConstants.StatusCodes.REQUEST_TIMEOUT:
                            exception = new RequestTimeoutException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.RequestTimeout : errorMessage),
                                    response.headers(),
                                    request.uri());
                            break;

                        case HttpConstants.StatusCodes.RETRY_WITH:
                            exception = new RetryWithException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.RetryWith : errorMessage),
                                    response.headers(),
                                    request.uri());
                            break;

                        case HttpConstants.StatusCodes.TOO_MANY_REQUESTS:
                            exception =
                                    new RequestRateTooLargeException(
                                            String.format(
                                                    RMResources.ExceptionMessage,
                                                    Strings.isNullOrEmpty(errorMessage) ? RMResources.TooManyRequests : errorMessage),
                                            response.headers(),
                                            request.uri());

                            List<String> values = null;
                            headerValues = response.headers().values(HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS);
                            if (headerValues != null) {
                                values
                                    = com.azure.cosmos.implementation.guava25.collect.Lists.newArrayList(headerValues);
                            }
                            if (values == null || values.isEmpty()) {
                                logger.warn("RequestRateTooLargeException being thrown without RetryAfter.");
                            } else {
                                exception.getResponseHeaders().put(HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS, values.get(0));
                            }

                            break;

                        case HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR:
                            exception = new InternalServerErrorException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            Strings.isNullOrEmpty(errorMessage) ? RMResources.InternalServerError : errorMessage),
                                    response.headers(),
                                    request.uri());
                            break;

                        default:
                            logger.error("Unrecognized status code {} returned by backend. ActivityId {}", statusCode, activityId);
                            ErrorUtils.logException(request.uri(), activityId);
                            exception = new InternalServerErrorException(
                                    String.format(
                                            RMResources.ExceptionMessage,
                                            RMResources.InvalidBackendResponse),
                                    response.headers(),
                                    request.uri());
                            break;
                    }

                    BridgeInternal.setLSN(exception, responseLSN);
                    BridgeInternal.setPartitionKeyRangeId(exception, responsePartitionKeyRangeId);
                    BridgeInternal.setResourceAddress(exception, resourceAddress);
                    BridgeInternal.setRequestHeaders(exception, HttpUtils.asMap(request.headers()));

                    return Mono.error(exception);
                }
        );
    }
}