ChangeFeedQueryImpl.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedStateV1;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.query.Paginator;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
class ChangeFeedQueryImpl<T extends Resource> {
private static final int INITIAL_TOP_VALUE = -1;
private final RxDocumentClientImpl client;
private final DiagnosticsClientContext clientContext;
private final Supplier<RxDocumentServiceRequest> createRequestFunc;
private final String documentsLink;
private final Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc;
private final Class<T> klass;
private final CosmosChangeFeedRequestOptions options;
private final ResourceType resourceType;
private final ChangeFeedState changeFeedState;
public ChangeFeedQueryImpl(
RxDocumentClientImpl client,
ResourceType resourceType,
Class<T> klass,
String collectionLink,
String collectionRid,
CosmosChangeFeedRequestOptions requestOptions) {
checkNotNull(client, "Argument 'client' must not be null.");
checkNotNull(resourceType, "Argument 'resourceType' must not be null.");
checkNotNull(klass, "Argument 'klass' must not be null.");
checkNotNull(requestOptions, "Argument 'requestOptions' must not be null.");
checkNotNull(collectionLink, "Argument 'collectionLink' must not be null.");
checkNotNull(collectionRid, "Argument 'collectionRid' must not be null.");
if (Strings.isNullOrWhiteSpace(collectionLink)) {
throw new IllegalArgumentException("Argument 'collectionLink' must not be empty");
}
if (Strings.isNullOrWhiteSpace(collectionRid)) {
throw new IllegalArgumentException("Argument 'collectionRid' must not be empty");
}
this.createRequestFunc = this::createDocumentServiceRequest;
this.executeFunc = this::executeRequestAsync;
this.clientContext = client;
this.client = client;
this.resourceType = resourceType;
this.klass = klass;
this.documentsLink = Utils.joinPath(collectionLink, Paths.DOCUMENTS_PATH_SEGMENT);
this.options = requestOptions;
FeedRangeInternal feedRange = (FeedRangeInternal)this.options.getFeedRange();
ChangeFeedState state;
if ((state = ModelBridgeInternal.getChangeFeedContinuationState(requestOptions)) == null)
{
state = new ChangeFeedStateV1(
collectionRid,
feedRange,
ModelBridgeInternal.getChangeFeedMode(requestOptions),
ModelBridgeInternal.getChangeFeedStartFromSettings(requestOptions),
null);
}
this.changeFeedState = state;
}
public Flux<FeedResponse<T>> executeAsync() {
return Paginator.getChangeFeedQueryResultAsObservable(
this.client,
this.changeFeedState,
ModelBridgeInternal.getPropertiesFromChangeFeedRequestOptions(this.options),
this.createRequestFunc,
this.executeFunc,
this.klass,
INITIAL_TOP_VALUE,
this.options.getMaxItemCount(),
this.options.getMaxPrefetchPageCount(),
ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options));
}
private RxDocumentServiceRequest createDocumentServiceRequest() {
Map<String, String> headers = new HashMap<>();
Map<String, String> customOptions =
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor().getHeader(this.options);
if (customOptions != null) {
headers.putAll(customOptions);
}
if (options.isQuotaInfoEnabled()) {
headers.put(HttpConstants.HttpHeaders.POPULATE_QUOTA_INFO, String.valueOf(true));
}
return RxDocumentServiceRequest.create(clientContext,
OperationType.ReadFeed,
resourceType,
documentsLink,
headers,
options);
}
private Mono<FeedResponse<T>> executeRequestAsync(RxDocumentServiceRequest request) {
return client.readFeed(request)
.map(rsp -> BridgeInternal.toChangeFeedResponsePage(rsp, klass));
}
}