Fetcher.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.function.BiFunction;
import java.util.function.Function;

class Fetcher<T extends Resource> {
    private final static Logger logger = LoggerFactory.getLogger(Fetcher.class);

    private final BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc;
    private final Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc;
    private final boolean isChangeFeed;

    private volatile boolean shouldFetchMore;
    private volatile int maxItemCount;
    private volatile int top;
    private volatile String continuationToken;

    public Fetcher(BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc,
                   Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
                   String continuationToken,
                   boolean isChangeFeed,
                   int top,
                   int maxItemCount) {

        this.createRequestFunc = createRequestFunc;
        this.executeFunc = executeFunc;
        this.isChangeFeed = isChangeFeed;

        this.continuationToken = continuationToken;
        this.top = top;
        if (top == -1) {
            this.maxItemCount = maxItemCount;
        } else {
            // it is a top query, we should not retrieve more than requested top.
            this.maxItemCount = Math.min(maxItemCount, top);
        }
        this.shouldFetchMore = true;
    }

    public boolean shouldFetchMore() {
        return shouldFetchMore;
    }

    public Mono<FeedResponse<T>> nextPage() {
        RxDocumentServiceRequest request = createRequest();
        return nextPage(request);
    }

    private void updateState(FeedResponse<T> response) {
        continuationToken = response.getContinuationToken();
        if (top != -1) {
            top -= response.getResults().size();
            if (top < 0) {
                // this shouldn't happen
                // this means backend retrieved more items than requested
                logger.warn("Azure Cosmos DB BackEnd Service returned more than requested {} items", maxItemCount);
                top = 0;
            }
            maxItemCount = Math.min(maxItemCount, top);
        }

        shouldFetchMore = shouldFetchMore &&
                // if token is null or top == 0 then done
                (!StringUtils.isEmpty(continuationToken) && (top != 0)) &&
                // if change feed query and no changes then done
                (!isChangeFeed || !BridgeInternal.noChanges(response));

        logger.debug("Fetcher state updated: " +
                        "isChangeFeed = {}, continuation token = {}, max item count = {}, should fetch more = {}",
                        isChangeFeed, continuationToken, maxItemCount, shouldFetchMore);
    }

    private RxDocumentServiceRequest createRequest() {
        if (!shouldFetchMore) {
            // this should never happen
            logger.error("invalid state, trying to fetch more after completion");
            throw new IllegalStateException("INVALID state, trying to fetch more after completion");
        }

        return createRequestFunc.apply(continuationToken, maxItemCount);
    }

    private Mono<FeedResponse<T>> nextPage(RxDocumentServiceRequest request) {
        return executeFunc.apply(request).map(rsp -> {
            updateState(rsp);
            return rsp;
        });
    }
}