FeedRangePartitionKeyRangeExtractorImpl.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.feedranges;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.ResourceResponse;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.routing.Range;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

final class FeedRangePartitionKeyRangeExtractorImpl extends FeedRangeAsyncVisitor<List<Range<String>>> {

    private final RxDocumentClientImpl client;
    private final String collectionLink;

    public FeedRangePartitionKeyRangeExtractorImpl(
        RxDocumentClientImpl client,
        String collectionLink) {

        checkNotNull(client, "'client' must not be null");
        checkNotNull(collectionLink, "'collectionLink' must not be null");

        this.client = client;
        this.collectionLink = collectionLink;
    }

    @Override
    public Mono<List<Range<String>>> visit(FeedRangePartitionKeyImpl feedRange) {
        final RxPartitionKeyRangeCache partitionKeyRangeCache =
            this.client.getPartitionKeyRangeCache();
        final Mono<ResourceResponse<DocumentCollection>> collectionResponseObservable = this.client
            .readCollection(this.collectionLink, null);

        return collectionResponseObservable.flatMap(collectionResponse -> {
            final DocumentCollection collection = collectionResponse.getResource();
            return feedRange.getEffectiveRanges(partitionKeyRangeCache,
                collection.getResourceId(),
                collection.getPartitionKey());
        });
    }

    @Override
    public Mono<List<Range<String>>> visit(FeedRangePartitionKeyRangeImpl feedRange) {
        final RxPartitionKeyRangeCache partitionKeyRangeCache =
            this.client.getPartitionKeyRangeCache();
        final Mono<ResourceResponse<DocumentCollection>> collectionResponseObservable = this.client
            .readCollection(this.collectionLink, null);

        return collectionResponseObservable.flatMap(collectionResponse -> {
            final DocumentCollection collection = collectionResponse.getResource();
            return feedRange.getEffectiveRanges(partitionKeyRangeCache,
                collection.getResourceId(), null);
        });
    }

    @Override
    public Mono<List<Range<String>>> visit(FeedRangeEpkImpl feedRange) {
        final RxPartitionKeyRangeCache partitionKeyRangeCache =
            this.client.getPartitionKeyRangeCache();
        final Mono<ResourceResponse<DocumentCollection>> collectionResponseObservable = this.client
            .readCollection(this.collectionLink, null);

        final Mono<Utils.ValueHolder<List<PartitionKeyRange>>> valueHolderMono =
            collectionResponseObservable
                .flatMap(collectionResponse -> {
                    final DocumentCollection collection = collectionResponse.getResource();
                    return partitionKeyRangeCache.tryGetOverlappingRangesAsync(
                        BridgeInternal.getMetaDataDiagnosticContext(null),
                        collection.getResourceId(),
                        feedRange.getRange(), false, null);
                });

        return valueHolderMono.map(FeedRangePartitionKeyRangeExtractorImpl::toFeedRanges);
    }

    private static UnmodifiableList<Range<String>> toFeedRanges(
        final Utils.ValueHolder<List<PartitionKeyRange>> partitionKeyRangeListValueHolder) {
        final List<PartitionKeyRange> partitionKeyRangeList = partitionKeyRangeListValueHolder.v;
        if (partitionKeyRangeList == null) {
            throw new IllegalStateException("PartitionKeyRange list cannot be null");
        }

        final List<Range<String>> feedRanges = new ArrayList<>();
        partitionKeyRangeList.forEach(pkRange -> feedRanges.add(pkRange.toRange()));

        return (UnmodifiableList<Range<String>>)UnmodifiableList.unmodifiableList(feedRanges);
    }
}