ServerSideOnlyContinuationFetcherImpl.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.models.FeedResponse;
import reactor.core.publisher.Mono;

import java.util.function.BiFunction;
import java.util.function.Function;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

class ServerSideOnlyContinuationFetcherImpl<T extends Resource> extends Fetcher<T> {
    private final BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc;
    private volatile String continuationToken;

    public ServerSideOnlyContinuationFetcherImpl(BiFunction<String, Integer,
        RxDocumentServiceRequest> createRequestFunc,
                                                 Function<RxDocumentServiceRequest,
                                                     Mono<FeedResponse<T>>> executeFunc,
                                                 String continuationToken,
                                                 boolean isChangeFeed,
                                                 int top,
                                                 int maxItemCount) {

        super(executeFunc, isChangeFeed, top, maxItemCount);

        checkNotNull(createRequestFunc, "Argument 'createRequestFunc' must not be null.");
        this.createRequestFunc = createRequestFunc;

        this.continuationToken = continuationToken;
    }

    @Override
    protected String applyServerResponseContinuation(
        String serverContinuationToken,
        RxDocumentServiceRequest request) {

        return this.continuationToken = serverContinuationToken;
    }

    @Override
    protected boolean isFullyDrained(boolean isChangeFeed, FeedResponse<T> response) {
        // if token is null or if change feed query and no changes then done
        return StringUtils.isEmpty(continuationToken) ||
            (isChangeFeed && BridgeInternal.noChanges(response));
    }

    @Override
    protected String getContinuationForLogging() {
        return this.continuationToken;
    }

    @Override
    protected RxDocumentServiceRequest createRequest(int maxItemCount) {
        return this.createRequestFunc.apply(this.continuationToken, maxItemCount);
    }
}