ChangeFeedFetcher.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.InvalidPartitionExceptionRetryPolicy;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneRetryPolicy;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedState;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
class ChangeFeedFetcher<T extends Resource> extends Fetcher<T> {
private final ChangeFeedState changeFeedState;
private final Supplier<RxDocumentServiceRequest> createRequestFunc;
private final DocumentClientRetryPolicy feedRangeContinuationSplitRetryPolicy;
public ChangeFeedFetcher(
RxDocumentClientImpl client,
Supplier<RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
ChangeFeedState changeFeedState,
Map<String, Object> requestOptionProperties,
int top,
int maxItemCount,
boolean isSplitHandlingDisabled) {
super(executeFunc, true, top, maxItemCount);
checkNotNull(client, "Argument 'client' must not be null.");
checkNotNull(createRequestFunc, "Argument 'createRequestFunc' must not be null.");
checkNotNull(changeFeedState, "Argument 'changeFeedState' must not be null.");
this.changeFeedState = changeFeedState;
if (isSplitHandlingDisabled) {
// True for ChangeFeedProcessor - where all retry-logic is handled
this.feedRangeContinuationSplitRetryPolicy = null;
this.createRequestFunc = createRequestFunc;
} else {
DocumentClientRetryPolicy retryPolicyInstance = client.getResetSessionTokenRetryPolicy().getRequestPolicy();
String collectionLink = PathsHelper.generatePath(
ResourceType.DocumentCollection, changeFeedState.getContainerRid(), false);
retryPolicyInstance = new InvalidPartitionExceptionRetryPolicy(
client.getCollectionCache(),
retryPolicyInstance,
collectionLink,
requestOptionProperties);
retryPolicyInstance = new PartitionKeyRangeGoneRetryPolicy(client,
client.getCollectionCache(),
client.getPartitionKeyRangeCache(),
collectionLink,
retryPolicyInstance,
requestOptionProperties);
this.feedRangeContinuationSplitRetryPolicy = new FeedRangeContinuationSplitRetryPolicy(
client,
this.changeFeedState,
retryPolicyInstance,
requestOptionProperties,
retryPolicyInstance.getRetryContext());
this.createRequestFunc = () -> {
RxDocumentServiceRequest request = createRequestFunc.get();
this.feedRangeContinuationSplitRetryPolicy.onBeforeSendRequest(request);
return request;
};
}
}
@Override
public Mono<FeedResponse<T>> nextPage() {
if (this.feedRangeContinuationSplitRetryPolicy == null) {
return this.nextPageInternal();
}
// There are two conditions that require retries
// in the change feed pipeline
// 1) On 410 the FeedRangeContinuation needs to evaluate
// whether continuations need to be split (in the case that any continuation
// exists that would span more than one physical partition now
// 2) On 304 a retry is needed if at least one continuation has not been drained yet.
// This prevents returning a 304 before we received a 304 for all continuations
//
// 410 handling: this is triggered by an exception - using an
// IRetryPolicy (FeedRangeContinuationSplitRetryPolicy)
// 304 handling: this is not triggered by an exception (304 doesn't result in throwing)
// so using Reactor's built-in option of repeating the chain on an empty result
// so nextPageInternal below has the logic to return empty result
// if not all continuations have been drained yet.
return ObservableHelper.inlineIfPossible(
this::nextPageInternal,
this.feedRangeContinuationSplitRetryPolicy);
}
private Mono<FeedResponse<T>> nextPageInternal() {
return Mono.fromSupplier(this::nextPageCore)
.flatMap(Function.identity())
.flatMap((r) -> {
FeedRangeContinuation continuationSnapshot =
this.changeFeedState.getContinuation();
if (continuationSnapshot != null &&
continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) {
// not all continuations have been drained yet
// repeat with the next continuation
this.reenableShouldFetchMoreForRetry();
return Mono.empty();
}
return Mono.just(r);
})
.repeatWhenEmpty(o -> o);
}
@Override
protected String applyServerResponseContinuation(
String serverContinuationToken,
RxDocumentServiceRequest request) {
return this.changeFeedState.applyServerResponseContinuation(
serverContinuationToken, request);
}
@Override
protected boolean isFullyDrained(boolean isChangeFeed, FeedResponse<T> response) {
if (ModelBridgeInternal.noChanges(response)) {
return true;
}
FeedRangeContinuation continuation = this.changeFeedState.getContinuation();
return continuation != null && continuation.isDone();
}
@Override
protected String getContinuationForLogging() {
return this.changeFeedState.toJson();
}
@Override
protected RxDocumentServiceRequest createRequest(int maxItemCount) {
RxDocumentServiceRequest request = this.createRequestFunc.get();
this.changeFeedState.populateRequest(request, maxItemCount);
return request;
}
private static final class FeedRangeContinuationSplitRetryPolicy extends DocumentClientRetryPolicy {
private final static Logger LOGGER = LoggerFactory.getLogger(FeedRangeContinuationSplitRetryPolicy.class);
private final ChangeFeedState state;
private final RxDocumentClientImpl client;
private final DocumentClientRetryPolicy nextRetryPolicy;
private final Map<String, Object> requestOptionProperties;
private MetadataDiagnosticsContext diagnosticsContext;
private final RetryContext retryContext;
public FeedRangeContinuationSplitRetryPolicy(
RxDocumentClientImpl client,
ChangeFeedState state,
DocumentClientRetryPolicy nextRetryPolicy,
Map<String, Object> requestOptionProperties,
RetryContext retryContext) {
this.client = client;
this.state = state;
this.nextRetryPolicy = nextRetryPolicy;
this.requestOptionProperties = requestOptionProperties;
this.diagnosticsContext = null;
this.retryContext = retryContext;
}
@Override
public void onBeforeSendRequest(RxDocumentServiceRequest request) {
this.diagnosticsContext =
BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);
this.nextRetryPolicy.onBeforeSendRequest(request);
}
@Override
public Mono<ShouldRetryResult> shouldRetry(Exception e) {
return this.nextRetryPolicy.shouldRetry(e).flatMap(shouldRetryResult -> {
if (!shouldRetryResult.shouldRetry) {
if (!(e instanceof GoneException)) {
LOGGER.warn("Exception not applicable - will fail the request.", e);
return Mono.just(ShouldRetryResult.noRetry());
}
if (this.state.getContinuation() == null) {
final FeedRangeInternal feedRange = this.state.getFeedRange();
final Mono<Range<String>> effectiveRangeMono = feedRange.getNormalizedEffectiveRange(
this.client.getPartitionKeyRangeCache(),
this.diagnosticsContext,
this.client.getCollectionCache().resolveByRidAsync(
this.diagnosticsContext,
this.state.getContainerRid(),
this.requestOptionProperties)
);
return effectiveRangeMono
.map(effectiveRange -> {
return this.state.setContinuation(
FeedRangeContinuation.create(
this.state.getContainerRid(),
this.state.getFeedRange(),
effectiveRange));
})
.flatMap(state -> state.getContinuation().handleSplit(client, (GoneException)e));
}
return this
.state
.getContinuation()
.handleSplit(client, (GoneException)e)
.flatMap(splitShouldRetryResult -> {
if (!splitShouldRetryResult.shouldRetry) {
LOGGER.warn("No partition split error - will fail the request.", e);
} else {
LOGGER.debug("HandleSplit will retry.", e);
}
return Mono.just(shouldRetryResult);
});
}
LOGGER.trace("Retrying due to inner retry policy");
return Mono.just(shouldRetryResult);
});
}
@Override
public RetryContext getRetryContext() {
return this.retryContext;
}
}
}