ChangeFeedQueryImpl.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.implementation.query.Paginator;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
class ChangeFeedQueryImpl<T extends Resource> {
private static final String IfNonMatchAllHeaderValue = "*";
private final RxDocumentClientImpl client;
private final DiagnosticsClientContext clientContext;
private final ResourceType resourceType;
private final Class<T> klass;
private final String documentsLink;
private final ChangeFeedOptions options;
public ChangeFeedQueryImpl(RxDocumentClientImpl client,
ResourceType resourceType,
Class<T> klass,
String collectionLink,
ChangeFeedOptions changeFeedOptions) {
this.clientContext = client;
this.client = client;
this.resourceType = resourceType;
this.klass = klass;
this.documentsLink = Utils.joinPath(collectionLink, Paths.DOCUMENTS_PATH_SEGMENT);
changeFeedOptions = changeFeedOptions != null ? changeFeedOptions: new ChangeFeedOptions();
if (resourceType.isPartitioned() && changeFeedOptions.getPartitionKeyRangeId() == null && changeFeedOptions.getPartitionKey() == null) {
throw new IllegalArgumentException(RMResources.PartitionKeyRangeIdOrPartitionKeyMustBeSpecified);
}
if (changeFeedOptions.getPartitionKey() != null &&
!Strings.isNullOrEmpty(changeFeedOptions.getPartitionKeyRangeId())) {
throw new IllegalArgumentException(String.format(
RMResources.PartitionKeyAndParitionKeyRangeIdBothSpecified
, "feedOptions"));
}
String initialNextIfNoneMatch = null;
boolean canUseStartFromBeginning = true;
if (changeFeedOptions.getRequestContinuation() != null) {
initialNextIfNoneMatch = changeFeedOptions.getRequestContinuation();
canUseStartFromBeginning = false;
}
if(changeFeedOptions.getStartDateTime() != null){
canUseStartFromBeginning = false;
}
if (canUseStartFromBeginning && !changeFeedOptions.isStartFromBeginning()) {
initialNextIfNoneMatch = IfNonMatchAllHeaderValue;
}
this.options = getChangeFeedOptions(changeFeedOptions, initialNextIfNoneMatch);
}
private RxDocumentServiceRequest createDocumentServiceRequest(String continuationToken, int pageSize) {
Map<String, String> headers = new HashMap<>();
RxDocumentServiceRequest req = RxDocumentServiceRequest.create(clientContext,
OperationType.ReadFeed,
resourceType,
documentsLink,
headers,
options);
if (options.getMaxItemCount() != null) {
headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, String.valueOf(options.getMaxItemCount()));
}
// On REST level, change feed is using IF_NONE_MATCH/ETag instead of continuation.
if(continuationToken != null) {
headers.put(HttpConstants.HttpHeaders.IF_NONE_MATCH, continuationToken);
}
headers.put(HttpConstants.HttpHeaders.A_IM, HttpConstants.A_IMHeaderValues.INCREMENTAL_FEED);
if (options.getPartitionKey() != null) {
PartitionKeyInternal partitionKey = BridgeInternal.getPartitionKeyInternal(options.getPartitionKey());
headers.put(HttpConstants.HttpHeaders.PARTITION_KEY, partitionKey.toJson());
req.setPartitionKeyInternal(partitionKey);
}
if(options.getStartDateTime() != null){
String dateTimeInHttpFormat = Utils.zonedDateTimeAsUTCRFC1123(options.getStartDateTime().atOffset(ZoneOffset.UTC));
headers.put(HttpConstants.HttpHeaders.IF_MODIFIED_SINCE, dateTimeInHttpFormat);
}
if (options.getPartitionKeyRangeId() != null) {
req.routeTo(new PartitionKeyRangeIdentity(this.options.getPartitionKeyRangeId()));
}
return req;
}
private ChangeFeedOptions getChangeFeedOptions(ChangeFeedOptions options, String continuationToken) {
ChangeFeedOptions newOps = new ChangeFeedOptions(options);
newOps.setRequestContinuation(continuationToken);
return newOps;
}
public Flux<FeedResponse<T>> executeAsync() {
BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc = this::createDocumentServiceRequest;
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc = this::executeRequestAsync;
return Paginator.getPaginatedChangeFeedQueryResultAsObservable(options, createRequestFunc, executeFunc, klass, options.getMaxItemCount() != null ? options.getMaxItemCount(): -1);
}
private Mono<FeedResponse<T>> executeRequestAsync(RxDocumentServiceRequest request) {
return client.readFeed(request)
.map( rsp -> BridgeInternal.toChangeFeedResponsePage(rsp, klass));
}
}