AddressResolver.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ICollectionRoutingMapCache;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.InvalidPartitionException;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.RMResources;
import com.azure.cosmos.implementation.ResourceId;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.util.concurrent.Callable;
import java.util.function.Function;

/**
 * Abstracts out the logic to resolve physical replica addresses for the given {@link RxDocumentServiceRequest}
 * <p>
 * AddressCache internally maintains CollectionCache, CollectionRoutingMapCache and BackendAddressCache.
 * Logic in this class mainly joins these 3 caches and deals with potential staleness of the caches.
 */
public class AddressResolver implements IAddressResolver {
    private static Logger logger = LoggerFactory.getLogger(AddressResolver.class);

    private final static PartitionKeyRangeIdentity masterPartitionKeyRangeIdentity =
        new PartitionKeyRangeIdentity(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID);

    private RxCollectionCache collectionCache;
    private ICollectionRoutingMapCache collectionRoutingMapCache;
    private IAddressCache addressCache;

    public AddressResolver() {
    }

    public void initializeCaches(
        RxCollectionCache collectionCache,
        ICollectionRoutingMapCache collectionRoutingMapCache,
        IAddressCache addressCache) {
        this.collectionCache = collectionCache;
        this.addressCache = addressCache;
        this.collectionRoutingMapCache = collectionRoutingMapCache;
    }

    public Mono<AddressInformation[]> resolveAsync(
        RxDocumentServiceRequest request,
        boolean forceRefreshPartitionAddresses) {

        Mono<ResolutionResult> resultObs = this.resolveAddressesAndIdentityAsync(request, forceRefreshPartitionAddresses);

        return resultObs.flatMap(result -> {

            try {
                this.throwIfTargetChanged(request, result.TargetPartitionKeyRange);
            } catch (Exception e) {
                return Mono.error(e);
            }

            request.requestContext.resolvedPartitionKeyRange = result.TargetPartitionKeyRange;

            return Mono.just(result.Addresses);
        });
    }

    @Override
    public void updateAddresses(RxDocumentServiceRequest request, URI serverKey) {
        throw new NotImplementedException("updateAddresses() is not supported in AddressResolver");
    }

    private static boolean isSameCollection(PartitionKeyRange initiallyResolved, PartitionKeyRange newlyResolved) {
        if (initiallyResolved == null) {
            throw new IllegalArgumentException("parent");
        }

        if (newlyResolved == null) {
            return false;
        }

        if (Strings.areEqual(initiallyResolved.getId(), PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID) &&
            Strings.areEqual(newlyResolved.getId(), PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {
            return true;
        }

        if (Strings.areEqual(initiallyResolved.getId(), PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)
            || Strings.areEqual(newlyResolved.getId(), PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {
            String message =
                "Request was resolved to master partition and then to server partition.";
            assert false : message;
            logger.warn(message);
            return false;
        }

        if (ResourceId.parse(initiallyResolved.getResourceId()).getDocumentCollection()
            != ResourceId.parse(newlyResolved.getResourceId()).getDocumentCollection()) {
            return false;
        }

        if (!Strings.areEqual(initiallyResolved.getId(), newlyResolved.getId()) &&
            !(newlyResolved.getParents() != null && newlyResolved.getParents().contains(initiallyResolved.getId()))) {
            // the above getCondition should be always false in current codebase.
            // We don't need to refresh any caches if we resolved to a range which is child of previously resolved range.
            // Quorum reads should be handled transparently as child partitions share LSNs with parent partitions which are gone.
            String message =
                "Request is targeted at a partition key range which is not child of previously targeted range.";
            assert false : message;
            logger.warn(message);

            return false;
        }

        return true;
    }

    /**
     * Validates if the target partition to which the request is being sent has changed during retry.
     * <p>
     * If that happens, the request is no more valid and need to be retried.
     *
     * @param request     Request in progress
     * @param targetRange Target partition key range determined by address resolver
     * */
    private void throwIfTargetChanged(RxDocumentServiceRequest request, PartitionKeyRange targetRange) {
        // If new range is child of previous range, we don't need to throw any exceptions
        // as LSNs are continued on child ranges.
        if (request.requestContext.resolvedPartitionKeyRange != null &&
            !isSameCollection(request.requestContext.resolvedPartitionKeyRange, targetRange)) {
            if (!request.getIsNameBased()) {
                String message = String.format(
                    "Target should not change for non name based requests. Previous target %s, Current %s",
                    request.requestContext.resolvedPartitionKeyRange, targetRange);
                assert false : message;
                logger.warn(message);
            }

            request.requestContext.resolvedPartitionKeyRange = null;
            throw new InvalidPartitionException(RMResources.InvalidTarget, request.requestContext.resourcePhysicalAddress);
        }
    }

    private static void ensureRoutingMapPresent(
        RxDocumentServiceRequest request,
        CollectionRoutingMap routingMap,
        DocumentCollection collection) {
        if (routingMap == null && request.getIsNameBased() && request.getPartitionKeyRangeIdentity() != null
            && request.getPartitionKeyRangeIdentity().getCollectionRid() != null) {
            // By design, if partitionkeyrangeid header is present and it contains collectionrid for collection
            // which doesn't exist, we return InvalidPartitionException. Backend does the same.
            // Caller (client SDK or whoever attached the header) supposedly has outdated collection cache and will refresh it.
            // We cannot retry here, as the logic for retry in this case is use-case specific.

            if (logger.isDebugEnabled()) {
                logger.debug(
                    "Routing map for request with partitionkeyrageid {} was not found",
                    request.getPartitionKeyRangeIdentity().toHeader());
            }
            InvalidPartitionException invalidPartitionException = new InvalidPartitionException();
            BridgeInternal.setResourceAddress(invalidPartitionException, request.requestContext.resourcePhysicalAddress);
            throw invalidPartitionException;
        }

        if (routingMap == null) {
            if (logger.isDebugEnabled()) {
                logger.debug(
                    "Routing map was not found although collection cache is upto date for collection {}",
                    collection.getResourceId());
            }
            // Routing map not found although collection was resolved correctly.
            NotFoundException e = new NotFoundException();
            BridgeInternal.setResourceAddress(e, request.requestContext.resourcePhysicalAddress);
            throw e;
        }
    }

    private Mono<Utils.ValueHolder<ResolutionResult>> tryResolveServerPartitionAsync(
        RxDocumentServiceRequest request,
        DocumentCollection collection,
        CollectionRoutingMap routingMap,
        boolean collectionCacheIsUptodate,
        boolean collectionRoutingMapCacheIsUptodate,
        boolean forceRefreshPartitionAddresses) {

        try {
            // Check if this request partitionkeyrange-aware routing logic. We cannot retry here in this case
            // and need to bubble up errors.
            if (request.getPartitionKeyRangeIdentity() != null) {
                return this.tryResolveServerPartitionByPartitionKeyRangeIdAsync(
                    request,
                    collection,
                    routingMap,
                    collectionCacheIsUptodate,
                    collectionRoutingMapCacheIsUptodate,
                    forceRefreshPartitionAddresses);
            }

            if (!request.getResourceType().isPartitioned() &&
                !(request.getResourceType() == ResourceType.StoredProcedure && request.getOperationType() == OperationType.ExecuteJavaScript) &&
                // Collection head is sent internally for strong consistency given routing hints from original requst, which is for partitioned resource.
                !(request.getResourceType() == ResourceType.DocumentCollection && request.getOperationType() == OperationType.Head)) {
                logger.error(
                    "Shouldn't come here for non partitioned resources. resourceType : {}, operationtype:{}, resourceaddress:{}",
                    request.getResourceType(),
                    request.getOperationType(),
                    request.getResourceAddress());
                return Mono.error(BridgeInternal.setResourceAddress(new InternalServerErrorException(RMResources.InternalServerError), request.requestContext.resourcePhysicalAddress));
            }

            PartitionKeyRange range;
            PartitionKeyInternal partitionKeyInternal = request.getPartitionKeyInternal();

            if (partitionKeyInternal != null || request.getHeaders().containsKey(HttpConstants.HttpHeaders.PARTITION_KEY)) {
                range = this.tryResolveServerPartitionByPartitionKey(
                    request,
                    partitionKeyInternal,
                    collectionCacheIsUptodate,
                    collection,
                    routingMap);
            } else {
                range = this.tryResolveSinglePartitionCollection(request, routingMap, collectionCacheIsUptodate);
            }

            if (range == null) {
                // Collection cache or routing map cache is potentially outdated. Return empty -
                // upper logic will refresh cache and retry.
                logger.debug("Collection cache or routing map cache is potentially outdated." +
                                 " Returning null. Upper logic will refresh cache and retry.");
                return Mono.just(new Utils.ValueHolder<>(null));
            }

            Mono<Utils.ValueHolder<AddressInformation[]>> addressesObs = this.addressCache.tryGetAddresses(
                request,
                new PartitionKeyRangeIdentity(collection.getResourceId(), range.getId()),
                forceRefreshPartitionAddresses);

            return addressesObs.flatMap(addressesValueHolder -> {

                if (addressesValueHolder.v == null) {
                    logger.info(
                        "Could not resolve addresses for identity {}/{}. Potentially collection cache or routing map cache is outdated. Return empty - upper logic will refresh and retry. ",
                        new PartitionKeyRangeIdentity(collection.getResourceId(), range.getId()));
                    return Mono.just(new Utils.ValueHolder<>(null));
                }

                return Mono.just(new Utils.ValueHolder<>(new ResolutionResult(range, addressesValueHolder.v)));
            });

        } catch (Exception e) {
            return Mono.error(e);
        }
    }

    private PartitionKeyRange tryResolveSinglePartitionCollection(
        RxDocumentServiceRequest request,
        CollectionRoutingMap routingMap,
        boolean collectionCacheIsUptoDate) {
        // Neither partitionkey nor partitionkeyrangeid is specified.
        // Three options here:
        //    * This is non-partitioned collection and old client SDK which doesn't send partition key. In
        //      this case there's single entry in routing map. But can be multiple entries if before that
        //      existed partitioned collection with same name.
        //    * This is partitioned collection and old client SDK which doesn't send partition key.
        //      In this case there can be multiple ranges in routing map.
        //    * This is partitioned collection and this is custom written REST sdk, which has a bug and doesn't send
        //      partition key.
        // We cannot know for sure whether this is partitioned collection or not, because
        // partition key definition cache can be outdated.
        // So we route request to the first partition. If this is non-partitioned collection - request will succeed.
        // If it is partitioned collection - backend will return bad request as partition key header is required in this case.
        if (routingMap.getOrderedPartitionKeyRanges().size() == 1) {
            return routingMap.getOrderedPartitionKeyRanges().get(0);
        }

        logger.debug("tryResolveSinglePartitionCollection: collectionCacheIsUptoDate = {}", collectionCacheIsUptoDate);
        if (collectionCacheIsUptoDate) {
            throw BridgeInternal.setResourceAddress(new BadRequestException(RMResources.MissingPartitionKeyValue), request.requestContext.resourcePhysicalAddress);
        } else {
            return null;
        }
    }

    private Mono<ResolutionResult> resolveMasterResourceAddress(RxDocumentServiceRequest request,
                                                                  boolean forceRefreshPartitionAddresses) {
        assert ReplicatedResourceClient.isReadingFromMaster(request.getResourceType(), request.getOperationType())
            && request.getPartitionKeyRangeIdentity() == null;

        //  ServiceIdentity serviceIdentity = this.masterServiceIdentity;
        Mono<Utils.ValueHolder<AddressInformation[]>> addressesObs = this.addressCache.tryGetAddresses(request,
                masterPartitionKeyRangeIdentity,forceRefreshPartitionAddresses);

        return addressesObs.flatMap(addressesValueHolder -> {
            if (addressesValueHolder.v == null) {
                logger.warn("Could not get addresses for master partition");

                // return Observable.getError()
                NotFoundException e = new NotFoundException();
                BridgeInternal.setResourceAddress(e, request.requestContext.resourcePhysicalAddress);
                return Mono.error(e);
            }

            PartitionKeyRange partitionKeyRange = new PartitionKeyRange();
            partitionKeyRange.setId(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID);
            return Mono.just(new ResolutionResult(partitionKeyRange, addressesValueHolder.v));
        });
    }

    private static class RefreshState {

        volatile boolean collectionCacheIsUptoDate;
        volatile boolean collectionRoutingMapCacheIsUptoDate;
        volatile DocumentCollection collection;
        volatile CollectionRoutingMap routingMap;
        volatile ResolutionResult resolutionResult;
    }

    private Mono<RefreshState> getOrRefreshRoutingMap(RxDocumentServiceRequest request, boolean forceRefreshPartitionAddresses) {

        RefreshState state = new RefreshState();

        state.collectionCacheIsUptoDate = !request.getIsNameBased() ||
            (request.getPartitionKeyRangeIdentity() != null && request.getPartitionKeyRangeIdentity().getCollectionRid() != null);
        state.collectionRoutingMapCacheIsUptoDate = false;

        Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

        Mono<RefreshState> stateObs = collectionObs.flatMap(collectionValueHolder -> {
            state.collection = collectionValueHolder.v;
            Mono<Utils.ValueHolder<CollectionRoutingMap>> routingMapObs =
                this.collectionRoutingMapCache.tryLookupAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), collectionValueHolder.v.getResourceId(), null, request.forceCollectionRoutingMapRefresh, request.properties);
            final Utils.ValueHolder<DocumentCollection> underlyingCollection = collectionValueHolder;
            return routingMapObs.flatMap(routingMapValueHolder -> {
                state.routingMap = routingMapValueHolder.v;

                if (request.forcePartitionKeyRangeRefresh) {
                    state.collectionRoutingMapCacheIsUptoDate = true;
                    request.forcePartitionKeyRangeRefresh = false;
                    if (routingMapValueHolder.v != null) {
                        return this.collectionRoutingMapCache.tryLookupAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), underlyingCollection.v.getResourceId(), routingMapValueHolder.v, request.properties)
                            .map(newRoutingMapValueHolder -> {
                                state.routingMap = newRoutingMapValueHolder.v;
                                return state;
                            });
                    }
                }

                return Mono.just(state);
            });
        });

        return stateObs.flatMap(newState -> {

            if (newState.routingMap == null && !newState.collectionCacheIsUptoDate) {
                // Routing map was not found by resolved collection rid. Maybe collection rid is outdated.
                // Refresh collection cache and reresolve routing map.
                request.forceNameCacheRefresh = true;
                newState.collectionCacheIsUptoDate = true;
                newState.collectionRoutingMapCacheIsUptoDate = false;

                Mono<Utils.ValueHolder<DocumentCollection>> newCollectionObs = this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

                return newCollectionObs.flatMap(collectionValueHolder -> {
                        newState.collection = collectionValueHolder.v;
                    Mono<Utils.ValueHolder<CollectionRoutingMap>> newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync(
                        BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
                            collectionValueHolder.v.getResourceId(),
                            null,
                            request.properties);

                        return newRoutingMapObs.map(routingMapValueHolder -> {
                            newState.routingMap = routingMapValueHolder.v;
                            return newState;
                        });
                    }
                );

            }

            return Mono.just(newState);
        });
    }

    private Mono<RefreshState> getStateWithNewRoutingMap(RefreshState state, Mono<Utils.ValueHolder<CollectionRoutingMap>> routingMapSingle) {
        return routingMapSingle.map(routingMapValueHolder -> {
            state.routingMap = routingMapValueHolder.v;
            return state;
        });
    }

    /**
     * Resolves the endpoint of the partition for the given request
     *
     * @param request                        Request for which the partition endpoint resolution is to be performed
     * @param forceRefreshPartitionAddresses Force refresh the partition's endpoint
     * @return ResolutionResult
     */
    private Mono<ResolutionResult> resolveAddressesAndIdentityAsync(
        RxDocumentServiceRequest request,
        boolean forceRefreshPartitionAddresses) {

        if (ReplicatedResourceClient.isReadingFromMaster(request.getResourceType(), request.getOperationType())
            && request.getPartitionKeyRangeIdentity() == null) {
            return resolveMasterResourceAddress(request, forceRefreshPartitionAddresses);
        }

        Mono<RefreshState> refreshStateObs = this.getOrRefreshRoutingMap(request, forceRefreshPartitionAddresses);

        return refreshStateObs.flatMap(
            state -> {
                try {
                    AddressResolver.ensureRoutingMapPresent(request, state.routingMap, state.collection);

                } catch (Exception e) {
                    return Mono.error(e);
                }

                // At this point we have both collection and routingMap.
                Mono<Utils.ValueHolder<ResolutionResult>> resultObs = this.tryResolveServerPartitionAsync(
                    request,
                    state.collection,
                    state.routingMap,
                    state.collectionCacheIsUptoDate,
                    state.collectionRoutingMapCacheIsUptoDate,
                    forceRefreshPartitionAddresses);


                Function<ResolutionResult, Mono<ResolutionResult>> addCollectionRidIfNameBased = funcResolutionResult -> {
                    assert funcResolutionResult != null;
                    if (request.getIsNameBased()) {
                        // Append collection rid.
                        // If we resolved collection rid incorrectly because of outdated cache, this can lead
                        // to incorrect routing decisions. But backend will validate collection rid and throw
                        // InvalidPartitionException if we reach wrong collection.
                        // Also this header will be used by backend to inject collection rid into metrics for
                        // throttled requests.
                        request.getHeaders().put(WFConstants.BackendHeaders.COLLECTION_RID, state.collection.getResourceId());
                    }

                    return Mono.just(funcResolutionResult);
                };

                return resultObs.flatMap(resolutionResultValueHolder -> {
                    if (resolutionResultValueHolder.v != null) {
                        return addCollectionRidIfNameBased.apply(resolutionResultValueHolder.v);
                    }
                    // result is empty

                    // result is null:
                    assert resolutionResultValueHolder.v == null;

                    Function<RefreshState, Mono<RefreshState>> ensureCollectionRoutingMapCacheIsUptoDateFunc = funcState -> {
                        if (!funcState.collectionRoutingMapCacheIsUptoDate) {
                            funcState.collectionRoutingMapCacheIsUptoDate = true;
                            Mono<Utils.ValueHolder<CollectionRoutingMap>> newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync(
                                BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
                                funcState.collection.getResourceId(),
                                funcState.routingMap,
                                request.properties);

                            return getStateWithNewRoutingMap(funcState, newRoutingMapObs);
                        } else {
                            return Mono.just(state);
                        }
                    };

                    Function<RefreshState, Mono<Utils.ValueHolder<ResolutionResult>>> resolveServerPartition = funcState -> {

                        try {
                            AddressResolver.ensureRoutingMapPresent(request, funcState.routingMap, funcState.collection);
                        } catch (Exception e) {
                            return Mono.error(e);
                        }

                        return this.tryResolveServerPartitionAsync(
                            request,
                            funcState.collection,
                            funcState.routingMap,
                            true,
                            true,
                            forceRefreshPartitionAddresses);
                    };

                    Function<Utils.ValueHolder<ResolutionResult>, Mono<ResolutionResult>> onNullThrowNotFound = funcResolutionResult -> {
                        if (funcResolutionResult.v == null) {
                            logger.debug("Couldn't route partitionkeyrange-oblivious request after retry/cache refresh. Collection doesn't exist.");

                            // At this point collection cache and routing map caches are refreshed.
                            // The only reason we will get here is if collection doesn't exist.
                            // Case when partition-key-range doesn't exist is handled in the corresponding method.

                            return Mono.error(BridgeInternal.setResourceAddress(new NotFoundException(), request.requestContext.resourcePhysicalAddress));
                        }

                        return Mono.just(funcResolutionResult.v);
                    };

                    // Couldn't resolve server partition or its addresses.
                    // Either collection cache is outdated or routing map cache is outdated.
                    if (!state.collectionCacheIsUptoDate) {
                        request.forceNameCacheRefresh = true;
                        state.collectionCacheIsUptoDate = true;

                        Mono<Utils.ValueHolder<DocumentCollection>> newCollectionObs = this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);
                        Mono<RefreshState> newRefreshStateObs = newCollectionObs.flatMap(collectionValueHolder -> {
                            state.collection = collectionValueHolder.v;

                            if (!StringUtils.equals(collectionValueHolder.v.getResourceId(), state.routingMap.getCollectionUniqueId())) {
                                // Collection cache was stale. We resolved to new Rid. routing map cache is potentially stale
                                // for this new collection rid. Mark it as such.
                                state.collectionRoutingMapCacheIsUptoDate = false;
                                Mono<Utils.ValueHolder<CollectionRoutingMap>> newRoutingMap = this.collectionRoutingMapCache.tryLookupAsync(
                                    BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
                                    collectionValueHolder.v.getResourceId(),
                                    null,
                                    request.properties);

                                return getStateWithNewRoutingMap(state, newRoutingMap);
                            }

                            return Mono.just(state);
                        });

                        Mono<Utils.ValueHolder<ResolutionResult>> newResultObs = newRefreshStateObs.flatMap(ensureCollectionRoutingMapCacheIsUptoDateFunc)
                                                                                                  .flatMap(resolveServerPartition);

                        return newResultObs.flatMap(onNullThrowNotFound).flatMap(addCollectionRidIfNameBased);

                    } else {
                        return ensureCollectionRoutingMapCacheIsUptoDateFunc.apply(state)
                                                                            .flatMap(resolveServerPartition)
                                                                            .flatMap(onNullThrowNotFound)
                                                                            .flatMap(addCollectionRidIfNameBased);
                    }
                });
            }
        );
    }

    private ResolutionResult handleRangeAddressResolutionFailure(
        RxDocumentServiceRequest request,
        boolean collectionCacheIsUpToDate,
        boolean routingMapCacheIsUpToDate,
        CollectionRoutingMap routingMap) {
        // Optimization to not refresh routing map unnecessary. As we keep track of parent child relationships,
        // we can determine that a range is gone just by looking up in the routing map.
        if (collectionCacheIsUpToDate && routingMapCacheIsUpToDate ||
            collectionCacheIsUpToDate && routingMap.isGone(request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId())) {
            String errorMessage = String.format(
                RMResources.PartitionKeyRangeNotFound,
                request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId(),
                request.getPartitionKeyRangeIdentity().getCollectionRid());
            throw BridgeInternal.setResourceAddress(new PartitionKeyRangeGoneException(errorMessage), request.requestContext.resourcePhysicalAddress);
        }
        logger.debug("handleRangeAddressResolutionFailure returns null");
        return null;
    }

    private <T> Mono<T> returnOrError(Callable<T> function) {
        try {
            return Mono.just(function.call());
        } catch (Exception e) {
            return Mono.error(e);
        }
    }

    private Mono<Utils.ValueHolder<ResolutionResult>> tryResolveServerPartitionByPartitionKeyRangeIdAsync(
        RxDocumentServiceRequest request,
        DocumentCollection collection,
        CollectionRoutingMap routingMap,
        boolean collectionCacheIsUpToDate,
        boolean routingMapCacheIsUpToDate,
        boolean forceRefreshPartitionAddresses) {

        PartitionKeyRange partitionKeyRange = routingMap.getRangeByPartitionKeyRangeId(request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId());
        if (partitionKeyRange == null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Cannot resolve range '{}'", request.getPartitionKeyRangeIdentity().toHeader());
            }
            return returnOrError(() -> new Utils.ValueHolder<>(this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap)));
        }

        Mono<Utils.ValueHolder<AddressInformation[]>> addressesObs = this.addressCache.tryGetAddresses(
            request,
            new PartitionKeyRangeIdentity(collection.getResourceId(), request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId()),
            forceRefreshPartitionAddresses);

        return addressesObs.flatMap(addressesValueHolder -> {
            if (addressesValueHolder.v == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Cannot resolve addresses for range '{}'", request.getPartitionKeyRangeIdentity().toHeader());
                }
                try {
                    return Mono.just(new Utils.ValueHolder<>(this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap)));
                } catch (CosmosException e) {
                    return Mono.error(e);
                }
            }
            return Mono.just(new Utils.ValueHolder<>(new ResolutionResult(partitionKeyRange, addressesValueHolder.v)));
        });
    }

    private PartitionKeyRange tryResolveServerPartitionByPartitionKey(
        RxDocumentServiceRequest request,
        PartitionKeyInternal partitionKey,
        boolean collectionCacheUptoDate,
        DocumentCollection collection,
        CollectionRoutingMap routingMap) {
        if (request == null) {
            throw new NullPointerException("request");
        }

        if (collection == null) {
            throw new NullPointerException("collection");
        }

        if (routingMap == null) {
            throw new NullPointerException("routingMap");
        }

        if (partitionKey == null) {
            // this is just a safe guard to ensure if partitionKeyInternal is not set in DSR
            // but its encoded value is set in headers, we try deserializing partitionKeyInternal from header
            String partitionKeyString = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY);

            if (partitionKeyString != null) {
                try {
                    logger.warn("PartitionKeyInternal is not set in DocumentServiceRequest, attempting to deserialize header {}." +
                        " Note, any code setting PARTITION_KEY header value must also set PartitionKeyInternal to avoid deserialization cost.", partitionKeyString);
                    partitionKey = PartitionKeyInternal.fromJsonString(partitionKeyString);
                } catch (Exception ex) {
                    throw BridgeInternal.setResourceAddress(new BadRequestException(
                        String.format(RMResources.InvalidPartitionKey, partitionKeyString),
                        ex), request.requestContext.resourcePhysicalAddress);
                }
            }
        }

        if (partitionKey == null) {
            throw new InternalServerErrorException(String.format("partition key is null"));
        }

        if (partitionKey.equals(PartitionKeyInternal.Empty) || Utils.getCollectionSize(partitionKey.getComponents()) == collection.getPartitionKey().getPaths().size()) {
            // Although we can compute effective partition getKey here, in general case this GATEWAY can have outdated
            // partition getKey definition cached - like if collection with same getName but with RANGE partitioning is created.
            // In this case server will not pass x-ms-documentdb-collection-rid check and will return back InvalidPartitionException.
            // GATEWAY will refresh its cache and retry.
            String effectivePartitionKey = PartitionKeyInternalHelper.getEffectivePartitionKeyString(partitionKey, collection.getPartitionKey());

            // There should be exactly one range which contains a partition key. Always.
            return routingMap.getRangeByEffectivePartitionKey(effectivePartitionKey);
        }

        if (collectionCacheUptoDate) {
            BadRequestException badRequestException = BridgeInternal.setResourceAddress(new BadRequestException(RMResources.PartitionKeyMismatch),
                request.requestContext.resourcePhysicalAddress);
            badRequestException.getResponseHeaders().put(WFConstants.BackendHeaders.SUB_STATUS, Integer.toString(HttpConstants.SubStatusCodes.PARTITION_KEY_MISMATCH));

            throw badRequestException;
        }

        // Partition key supplied has different number paths than locally cached partition key definition.
        // Three things can happen:
        //    1. User supplied wrong partition key.
        //    2. Client SDK has outdated partition key definition cache and extracted wrong value from the document.
        //    3. GATEWAY's cache is outdated.
        //
        // What we will do is append x-ms-documentdb-collection-rid header and forward it to random collection partition.
        // * If collection rid matches, server will send back 400.1001, because it also will not be able to compute
        // effective partition key. GATEWAY will forward this status code to client - client will handle it.
        // * If collection rid doesn't match, server will send back InvalidPartiitonException and GATEWAY will
        //   refresh name routing cache - this will refresh partition key definition as well, and retry.

        if (logger.isDebugEnabled()) {
            logger.debug(
                "Cannot compute effective partition getKey. Definition has '{}' getPaths, values supplied has '{}' getPaths. Will refresh cache and retry.",
                collection.getPartitionKey().getPaths().size(),
                Utils.getCollectionSize(partitionKey.getComponents()));
        }

        return null;
    }

    private static class ResolutionResult {
        final PartitionKeyRange TargetPartitionKeyRange;
        final AddressInformation[] Addresses;

        ResolutionResult(
                PartitionKeyRange targetPartitionKeyRange,
                AddressInformation[] addresses) {
            if (targetPartitionKeyRange == null) {
                throw new NullPointerException("targetPartitionKeyRange");
            }

            if (addresses == null) {
                throw new NullPointerException("addresses");
            }

            this.TargetPartitionKeyRange = targetPartitionKeyRange;
            this.Addresses = addresses;
        }
    }
}