FeedRangePartitionKeyImpl.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.feedranges;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.IRoutingMapProvider;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.PartitionKeyDefinition;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static com.azure.cosmos.BridgeInternal.setProperty;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public final class FeedRangePartitionKeyImpl extends FeedRangeInternal {
    private final PartitionKeyInternal partitionKey;

    public FeedRangePartitionKeyImpl(PartitionKeyInternal partitionKey) {
        checkNotNull(partitionKey, "Argument 'partitionKey' must not be null");
        this.partitionKey = partitionKey;
    }

    public PartitionKeyInternal getPartitionKeyInternal() {
        return this.partitionKey;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        FeedRangePartitionKeyImpl that = (FeedRangePartitionKeyImpl)o;
        return Objects.equals(this.partitionKey, that.partitionKey);
    }

    @Override
    public int hashCode() {
        return Objects.hash(partitionKey);
    }

    @Override
    public Mono<Range<String>> getEffectiveRange(
        IRoutingMapProvider routingMapProvider,
        MetadataDiagnosticsContext metadataDiagnosticsCtx,
        Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono) {

        checkNotNull(
            collectionResolutionMono,
            "Argument 'collectionResolutionMono' must not be null");

        return collectionResolutionMono
            .flatMap(documentCollectionResourceResponse -> {

                final DocumentCollection collection = documentCollectionResourceResponse.v;
                if (collection == null) {
                    throw new IllegalStateException("Collection cannot be null");
                }

                Range<String> range = getEffectiveRange(collection.getPartitionKey());
                return Mono.just(range);
            });
    }

    public Range<String> getEffectiveRange(
        PartitionKeyDefinition pkDefinition) {

        checkNotNull(
            pkDefinition,
            "Argument 'pkDefinition' must not be null");

        final String effectivePartitionKey =
            this.partitionKey.getEffectivePartitionKeyString(
                this.partitionKey,
                pkDefinition);

        return Range.getPointRange(effectivePartitionKey);
    }

    @Override
    public Mono<List<String>> getPartitionKeyRanges(
        IRoutingMapProvider routingMapProvider,
        RxDocumentServiceRequest request,
        Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono) {

        checkNotNull(
            routingMapProvider,
            "Argument 'routingMapProvider' must not be null");
        checkNotNull(
            request,
            "Argument 'request' must not be null");
        checkNotNull(
            collectionResolutionMono,
            "Argument 'collectionResolutionMono' must not be null");

        MetadataDiagnosticsContext metadataDiagnosticsCtx =
            BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);

        return collectionResolutionMono
            .flatMap(documentCollectionResourceResponse -> {

                final DocumentCollection collection = documentCollectionResourceResponse.v;
                if (collection == null) {
                    throw new IllegalStateException("Collection cannot be null");
                }

                final String containerRid = collection.getResourceId();
                final String effectivePartitionKey =
                    this.partitionKey.getEffectivePartitionKeyString(
                    this.partitionKey,
                    collection.getPartitionKey());

                return routingMapProvider
                    .tryGetOverlappingRangesAsync(
                        metadataDiagnosticsCtx,
                        containerRid,
                        Range.getPointRange(effectivePartitionKey),
                        false,
                        null)
                    .flatMap(pkRangeHolder -> {
                        ArrayList<String> rangeList = new ArrayList<>(1);

                        if (pkRangeHolder.v != null) {
                            String rangeId = pkRangeHolder.v.get(0).getId();
                            rangeList.add(rangeId);
                        }

                        return Mono.just((UnmodifiableList<String>)UnmodifiableList.unmodifiableList(rangeList));
                    });
            });
    }

    @Override
    public Mono<RxDocumentServiceRequest> populateFeedRangeFilteringHeaders(
        IRoutingMapProvider routingMapProvider,
        RxDocumentServiceRequest request,
        Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono) {

        checkNotNull(
            request,
            "Argument 'request' must not be null");

        request.getHeaders().put(
            HttpConstants.HttpHeaders.PARTITION_KEY,
            this.partitionKey.toJson());
        request.setPartitionKeyInternal(this.partitionKey);

        MetadataDiagnosticsContext metadataDiagnosticsCtx =
            BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);

        return this
            .getNormalizedEffectiveRange(routingMapProvider, metadataDiagnosticsCtx, collectionResolutionMono)
            .map(effectiveRange -> {
                request.setEffectiveRange(effectiveRange);

                return request;
            });
    }

    @Override
    public void populatePropertyBag() {
        super.populatePropertyBag();
        setProperties(this, false);
    }

    @Override
    public void removeProperties(JsonSerializable serializable) {
        checkNotNull(serializable, "Argument 'serializable' must not be null.");
        serializable.remove(Constants.Properties.FEED_RANGE_PARTITION_KEY);
    }

    @Override
    public void setProperties(JsonSerializable serializable, boolean populateProperties) {
        checkNotNull(serializable, "Argument 'serializable' must not be null.");
        if (populateProperties) {
            super.populatePropertyBag();
        }

        if (this.partitionKey != null) {
            setProperty(serializable, Constants.Properties.FEED_RANGE_PARTITION_KEY,
                this.partitionKey);
        }
    }
}