FeedRangeEpkImpl.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.feedranges;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.IRoutingMapProvider;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.ReadFeedKeyType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static com.azure.cosmos.BridgeInternal.setProperty;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public final class FeedRangeEpkImpl extends FeedRangeInternal {
    private static final FeedRangeEpkImpl fullRangeEPK =
        new FeedRangeEpkImpl(PartitionKeyInternalHelper.FullRange);

    private final Range<String> range;

    public FeedRangeEpkImpl(final Range<String> range) {
        checkNotNull(range, "Argument 'range' must not be null");
        if (range.getMin().compareTo(range.getMax()) > 0) {
            throw new IllegalArgumentException("The provided range is incorrect min is larger than max");
        }
        this.range = range;
    }

    public Range<String> getRange() {
        return this.range;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        FeedRangeEpkImpl that = (FeedRangeEpkImpl)o;
        return Objects.equals(this.range, that.range);
    }

    @Override
    public int hashCode() {
        return Objects.hash(range);
    }

    public static FeedRangeEpkImpl forFullRange() {
        return fullRangeEPK;
    }

    @Override
    public Mono<Range<String>> getEffectiveRange(
        IRoutingMapProvider routingMapProvider,
        MetadataDiagnosticsContext metadataDiagnosticsCtx,
        Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono) {

        return Mono.just(this.range);
    }

    @Override
    public Mono<List<String>> getPartitionKeyRanges(
        IRoutingMapProvider routingMapProvider,
        RxDocumentServiceRequest request,
        Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono) {

        checkNotNull(
            routingMapProvider,
            "Argument 'routingMapProvider' must not be null");
        checkNotNull(
            request,
            "Argument 'request' must not be null");
        checkNotNull(
            collectionResolutionMono,
            "Argument 'collectionResolutionMono' must not be null");

        MetadataDiagnosticsContext metadataDiagnosticsCtx =
            BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);

        return collectionResolutionMono
            .flatMap(documentCollectionResourceResponse -> {

                final DocumentCollection collection = documentCollectionResourceResponse.v;
                if (collection == null) {
                    throw new IllegalStateException("Collection cannot be null");
                }

                final String containerRid = collection.getResourceId();

                return routingMapProvider
                    .tryGetOverlappingRangesAsync(
                        metadataDiagnosticsCtx,
                        containerRid,
                        this.range,
                        false,
                        null)
                    .flatMap(pkRangeHolder -> {
                        final ArrayList<String> rangeList = new ArrayList<>();

                        if (pkRangeHolder.v != null) {
                            final List<PartitionKeyRange> pkRanges = pkRangeHolder.v;
                            for (final PartitionKeyRange pkRange : pkRanges) {
                                rangeList.add(pkRange.getId());
                            }
                        }

                        return Mono.just(UnmodifiableList.unmodifiableList(rangeList));
                    });
            });
    }

    @Override
    public Mono<RxDocumentServiceRequest> populateFeedRangeFilteringHeaders(
        IRoutingMapProvider routingMapProvider,
        RxDocumentServiceRequest request,
        Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono) {

        checkNotNull(
            routingMapProvider,
            "Argument 'routingMapProvider' must not be null");
        checkNotNull(
            request,
            "Argument 'request' must not be null");
        checkNotNull(
            collectionResolutionMono,
            "Argument 'collectionResolutionMono' must not be null");

        MetadataDiagnosticsContext metadataDiagnosticsCtx =
            BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);

        return collectionResolutionMono
            .flatMap(documentCollectionResourceResponse -> {

                final DocumentCollection collection = documentCollectionResourceResponse.v;
                if (collection == null) {
                    throw new IllegalStateException("Collection cannot be null");
                }

                final String containerRid = collection.getResourceId();
                request.setEffectiveRange(this.range);

                return routingMapProvider
                    .tryGetOverlappingRangesAsync(
                        metadataDiagnosticsCtx,
                        containerRid,
                        this.range,
                        false,
                        null)
                    .flatMap(pkRangeHolder -> {
                        if (pkRangeHolder == null) {
                            return Mono.error(new NotFoundException(
                                String.format("Stale cache for collection rid '%s'.",
                                    containerRid)));
                        }

                        final List<PartitionKeyRange> pkRanges = pkRangeHolder.v;
                        if (pkRanges == null || pkRanges.size() == 0) {
                            return Mono.error(new NotFoundException(
                                String.format("Stale cache for collection rid '%s'.",
                                    containerRid)));
                        }

                        // For epk range filtering we can end up in one of 3 cases:
                        if (pkRanges.size() > 1) {

                            // 1) The EpkRange spans more than one physical partition
                            // In this case it means we have encountered a split and
                            // we need to bubble that up to the higher layers to update their
                            // datastructures
                            GoneException goneException = new GoneException(
                                String.format("Epk Range '%s' is gone.", this.range));
                            BridgeInternal.setSubStatusCode(
                                goneException,
                                HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE);

                            return Mono.error(goneException);
                        }

                        final Range<String> singleRange = pkRanges.get(0).toRange();
                        if (singleRange.getMin().equals(this.range.getMin()) &&
                            singleRange.getMax().equals(this.range.getMax())) {

                            // 2) The EpkRange spans exactly one physical partition
                            // In this case we can route to the physical pkrange id
                            request.routeTo(new PartitionKeyRangeIdentity(pkRanges.get(0).getId()));
                        } else {
                            // 3) The EpkRange spans less than single physical partition
                            // In this case we route to the physical partition and
                            // pass the epk range headers to filter within partition
                            request.routeTo(new PartitionKeyRangeIdentity(pkRanges.get(0).getId()));

                            final Map<String, String> headers = request.getHeaders();
                            headers.put(
                                HttpConstants.HttpHeaders.READ_FEED_KEY_TYPE,
                                ReadFeedKeyType.EffectivePartitionKeyRange.name());
                            headers.put(
                                HttpConstants.HttpHeaders.START_EPK,
                                this.range.getMin());
                            headers.put(
                                HttpConstants.HttpHeaders.END_EPK,
                                this.range.getMax());

                        }

                        return Mono.just(request);
                    });
            });
    }

    @Override
    public void populatePropertyBag() {
        super.populatePropertyBag();
        setProperties(this, false);
    }

    @Override
    public void setProperties(JsonSerializable serializable, boolean populateProperties) {
        checkNotNull(serializable, "Argument 'serializable' must not be null.");
        if (populateProperties) {
            super.populatePropertyBag();
        }

        if (this.range != null) {
            ModelBridgeInternal.populatePropertyBag(this.range);
            setProperty(serializable, Constants.Properties.RANGE, this.range);
        }
    }

    @Override
    public void removeProperties(JsonSerializable serializable) {
        checkNotNull(serializable, "Argument 'serializable' must not be null.");
        serializable.remove(Constants.Properties.RANGE);
    }
}