Paginator.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedState;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.concurrent.Queues;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
*/
public class Paginator {
private final static Logger logger = LoggerFactory.getLogger(Paginator.class);
public static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResultAsObservable(
CosmosQueryRequestOptions cosmosQueryRequestOptions,
BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
Class<T> resourceType,
int maxPageSize) {
return getPaginatedQueryResultAsObservable(
ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(cosmosQueryRequestOptions),
createRequestFunc,
executeFunc,
resourceType,
-1, maxPageSize);
}
public static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResultAsObservable(
String continuationToken,
BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
Class<T> resourceType,
int top,
int maxPageSize) {
return getPaginatedQueryResultAsObservable(
continuationToken,
createRequestFunc,
executeFunc,
resourceType,
top,
maxPageSize,
false);
}
public static <T extends Resource> Flux<FeedResponse<T>> getChangeFeedQueryResultAsObservable(
RxDocumentClientImpl client,
ChangeFeedState changeFeedState,
Map<String, Object> requestOptionProperties,
Supplier<RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
Class<T> resourceType,
int top,
int maxPageSize,
int preFetchCount,
boolean isSplitHandlingDisabled) {
return getPaginatedQueryResultAsObservable(
() -> new ChangeFeedFetcher<>(
client,
createRequestFunc,
executeFunc,
changeFeedState,
requestOptionProperties,
top,
maxPageSize,
isSplitHandlingDisabled),
preFetchCount);
}
private static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResultAsObservable(
Supplier<Fetcher<T>> fetcherFactory,
int preFetchCount) {
return Flux.defer(() -> {
Flux<Flux<FeedResponse<T>>> generate = Flux.generate(
fetcherFactory::get,
(tFetcher, sink) -> {
if (tFetcher.shouldFetchMore()) {
Mono<FeedResponse<T>> nextPage = tFetcher.nextPage();
sink.next(nextPage.flux());
} else {
logger.debug("No more results");
sink.complete();
}
return tFetcher;
});
return generate.flatMapSequential(
feedResponseFlux -> feedResponseFlux,
1,
preFetchCount);
});
}
private static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResultAsObservable(
String continuationToken,
BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
Class<T> resourceType,
int top,
int maxPageSize,
boolean isChangeFeed) {
return getPaginatedQueryResultAsObservable(
() -> new ServerSideOnlyContinuationFetcherImpl<>(
createRequestFunc,
executeFunc,
continuationToken,
isChangeFeed,
top,
maxPageSize),
Queues.XS_BUFFER_SIZE);
}
}