Fetcher.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.function.Function;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

abstract class Fetcher<T extends Resource> {
    private final static Logger logger = LoggerFactory.getLogger(Fetcher.class);

    private final Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc;
    private final boolean isChangeFeed;

    private volatile boolean shouldFetchMore;
    private volatile int maxItemCount;
    private volatile int top;

    public Fetcher(
        Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
        boolean isChangeFeed,
        int top,
        int maxItemCount) {

        checkNotNull(executeFunc, "Argument 'executeFunc' must not be null.");

        this.executeFunc = executeFunc;
        this.isChangeFeed = isChangeFeed;

        this.top = top;
        if (top == -1) {
            this.maxItemCount = maxItemCount;
        } else {
            // it is a top query, we should not retrieve more than requested top.
            this.maxItemCount = Math.min(maxItemCount, top);
        }
        this.shouldFetchMore = true;
    }

    public final boolean shouldFetchMore() {
        return shouldFetchMore;
    }

    public Mono<FeedResponse<T>> nextPage() {
        return this.nextPageCore();
    }

    protected final Mono<FeedResponse<T>> nextPageCore() {
        RxDocumentServiceRequest request = createRequest();
        return nextPage(request);
    }

    protected abstract String applyServerResponseContinuation(
        String serverContinuationToken,
        RxDocumentServiceRequest request);

    protected abstract boolean isFullyDrained(boolean isChangeFeed, FeedResponse<T> response);

    protected abstract String getContinuationForLogging();

    private void updateState(FeedResponse<T> response, RxDocumentServiceRequest request) {
        String transformedContinuation =
            this.applyServerResponseContinuation(response.getContinuationToken(), request);

        ModelBridgeInternal.setFeedResponseContinuationToken(transformedContinuation, response);
        if (top != -1) {
            top -= response.getResults().size();
            if (top < 0) {
                // this shouldn't happen
                // this means backend retrieved more items than requested
                logger.warn("Azure Cosmos DB BackEnd Service returned more than requested {} items", maxItemCount);
                top = 0;
            }
            maxItemCount = Math.min(maxItemCount, top);
        }

        shouldFetchMore = shouldFetchMore &&
            // if top == 0 then done
            (top != 0) &&
            // if fullyDrained then done
            !this.isFullyDrained(this.isChangeFeed, response);

        if (logger.isDebugEnabled()) {
            logger.debug("Fetcher state updated: " +
                    "isChangeFeed = {}, continuation token = {}, max item count = {}, should fetch more = {}",
                isChangeFeed, this.getContinuationForLogging(), maxItemCount, shouldFetchMore);
        }
    }

    protected void reenableShouldFetchMoreForRetry() {
        this.shouldFetchMore = true;
    }

    private RxDocumentServiceRequest createRequest() {
        if (!shouldFetchMore) {
            // this should never happen
            logger.error("invalid state, trying to fetch more after completion");
            throw new IllegalStateException("INVALID state, trying to fetch more after completion");
        }

        return this.createRequest(maxItemCount);
    }

    protected abstract RxDocumentServiceRequest createRequest(int maxItemCount);

    private Mono<FeedResponse<T>> nextPage(RxDocumentServiceRequest request) {
        return executeFunc.apply(request).map(rsp -> {
            updateState(rsp, request);
            return rsp;
        });
    }
}