ChangeFeedState.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.FeedRange;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
@JsonDeserialize(using = ChangeFeedStateDeserializer.class)
public abstract class ChangeFeedState extends JsonSerializable {
private static final Comparator<Range<String>> MIN_RANGE_COMPARATOR = new Range.MinComparator<>();
private static final Comparator<Range<String>> MAX_RANGE_COMPARATOR = new Range.MaxComparator<>();
ChangeFeedState() {
}
public abstract FeedRangeContinuation getContinuation();
public abstract ChangeFeedState setContinuation(FeedRangeContinuation continuation);
public abstract FeedRangeInternal getFeedRange();
public abstract ChangeFeedMode getMode();
public abstract ChangeFeedStartFromInternal getStartFromSettings();
public abstract String applyServerResponseContinuation(
String serverContinuationToken,
RxDocumentServiceRequest request);
public abstract String getContainerRid();
public static ChangeFeedState fromString(String base64EncodedJson) {
checkNotNull(base64EncodedJson, "Argument 'base64EncodedJson' must not be null");
String json = new String(
Base64.getUrlDecoder().decode(base64EncodedJson),
StandardCharsets.UTF_8);
final ObjectMapper mapper = Utils.getSimpleObjectMapper();
try {
return mapper.readValue(json, ChangeFeedState.class);
} catch (IOException ioException) {
throw new IllegalArgumentException(
String.format("The change feed state continuation contains invalid or unsupported" +
" json: %s", json),
ioException);
}
}
@Override
public void populatePropertyBag() {
super.populatePropertyBag();
}
@Override
public String toString() {
String json = this.toJson();
if (json == null) {
return "";
}
return Base64.getUrlEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8));
}
public abstract void populateRequest(RxDocumentServiceRequest request, int maxItemCount);
public List<CompositeContinuationToken> extractContinuationTokens() {
return extractContinuationTokens(PartitionKeyInternalHelper.FullRange).getLeft();
}
private Pair<List<CompositeContinuationToken>, Range<String>> extractContinuationTokens(
Range<String> effectiveRange) {
checkNotNull(effectiveRange);
List<CompositeContinuationToken> extractedContinuationTokens = new ArrayList<>();
FeedRangeContinuation continuation = this.getContinuation();
String min = null;
String max = null;
if (continuation != null) {
List<CompositeContinuationToken> continuationTokensSnapshot = new ArrayList<>();
Collections.addAll(continuationTokensSnapshot, continuation.getCurrentContinuationTokens());
continuationTokensSnapshot.sort(ContinuationTokenRangeComparator.SINGLETON_INSTANCE);
for (CompositeContinuationToken compositeContinuationToken : continuationTokensSnapshot) {
if (Range.checkOverlapping(effectiveRange, compositeContinuationToken.getRange())) {
Range<String> overlappingRange =
getOverlappingRange(effectiveRange, compositeContinuationToken.getRange());
extractedContinuationTokens.add(
new CompositeContinuationToken(compositeContinuationToken.getToken(),
overlappingRange));
if (min == null) {
min = overlappingRange.getMin();
}
max = overlappingRange.getMax();
} else {
if (extractedContinuationTokens.size() > 0) {
break;
}
}
}
}
Range<String> totalRange = new Range<>(
min != null ? min : PartitionKeyInternalHelper.MinimumInclusiveEffectivePartitionKey,
max != null ? max : PartitionKeyInternalHelper.MaximumExclusiveEffectivePartitionKey,
true,
false);
return Pair.of(extractedContinuationTokens, totalRange);
}
public ChangeFeedState extractForEffectiveRange(Range<String> effectiveRange) {
checkNotNull(effectiveRange);
Pair<List<CompositeContinuationToken>, Range<String>> effectiveTokensAndMinMax =
this.extractContinuationTokens(effectiveRange);
List<CompositeContinuationToken> extractedContinuationTokens = effectiveTokensAndMinMax.getLeft();
Range<String> totalRange = effectiveTokensAndMinMax.getRight();
FeedRangeEpkImpl feedRange = new FeedRangeEpkImpl(totalRange);
return new ChangeFeedStateV1(
this.getContainerRid(),
feedRange,
this.getMode(),
this.getStartFromSettings(),
FeedRangeContinuation.create(
this.getContainerRid(),
feedRange,
extractedContinuationTokens
)
);
}
private Range<String> getOverlappingRange(Range<String> left, Range<String> right) {
checkNotNull(left, "Argument 'left' must not be null");
checkNotNull(right, "Argument 'right' must not be null");
checkArgument(
left.isMinInclusive() && !left.isMaxInclusive(),
"Argument 'left' is using exclusive Min or inclusive Max.");
checkArgument(
right.isMinInclusive() && !right.isMaxInclusive(),
"Argument 'right' is using exclusive Min or inclusive Max.");
String min;
String max;
if (MIN_RANGE_COMPARATOR.compare(left, right) > 0) {
min = left.getMin();
} else {
min = right.getMin();
}
if (MAX_RANGE_COMPARATOR.compare(left, right) < 0) {
max = left.getMax();
} else {
max = right.getMax();
}
return new Range<>(min, max, true, false);
}
public static ChangeFeedState merge(ChangeFeedState[] states) {
checkNotNull(states, "Argument 'states' must not be null.");
checkArgument(states.length > 0, "Argument 'states' must not be empty.");
ChangeFeedState firstState = states[0];
if (states.length == 1) {
return firstState;
}
List<CompositeContinuationToken> continuationTokens = new ArrayList<>(states.length);
for (ChangeFeedState state : states) {
validateConsistency(state, firstState);
FeedRangeContinuation continuation = state.getContinuation();
Collections.addAll(continuationTokens, continuation.getCurrentContinuationTokens());
}
continuationTokens.sort(ContinuationTokenRangeComparator.SINGLETON_INSTANCE);
for (int i = 1; i < continuationTokens.size(); i++) {
CompositeContinuationToken previous = continuationTokens.get(i - 1);
CompositeContinuationToken current = continuationTokens.get(i);
if (Range.checkOverlapping(previous.getRange(), current.getRange()))
{
throw new IllegalStateException(
String.format(
"Argument 'states' is invalid - it contains overlapping continuations '%s' and '%s'.",
previous.toJson(),
current.toJson()));
}
}
return new ChangeFeedStateV1(
firstState.getContainerRid(),
(FeedRangeEpkImpl)FeedRange.forFullRange(),
firstState.getMode(),
firstState.getStartFromSettings(),
FeedRangeContinuation.create(
firstState.getContainerRid(),
(FeedRangeEpkImpl)FeedRange.forFullRange(),
continuationTokens
)
);
}
private static void validateConsistency(ChangeFeedState candidate, ChangeFeedState expected) {
String containerRid = candidate.getContainerRid();
if (Strings.isNullOrEmpty(containerRid) || !containerRid.equals(expected.getContainerRid())) {
final String message = String.format(
"The container %s for the reference change feed status is different from the current container %s.",
expected.getContainerRid(),
containerRid);
throw new IllegalArgumentException(message);
}
ChangeFeedMode mode = candidate.getMode();
if (mode == null || !mode.equals(expected.getMode())) {
final String message = String.format(
"The mode %s for the reference change feed status is different from the current mode %s.",
expected.getMode(),
mode);
throw new IllegalArgumentException(message);
}
ChangeFeedStartFromInternal startFrom = candidate.getStartFromSettings();
if (startFrom == null || !startFrom.toJson().equals(expected.getStartFromSettings().toJson())) {
final String message = String.format(
"The mode '%s' for the reference change feed status is different from the current mode '%s'.",
expected.getStartFromSettings().toJson(),
startFrom != null ? startFrom.toJson() : "null");
throw new IllegalArgumentException(message);
}
}
private static class ContinuationTokenRangeComparator
implements Comparator<CompositeContinuationToken>, Serializable {
public static final ContinuationTokenRangeComparator SINGLETON_INSTANCE =
new ContinuationTokenRangeComparator();
private static final long serialVersionUID = 1L;
private ContinuationTokenRangeComparator() {
}
@Override
public int compare(CompositeContinuationToken left, CompositeContinuationToken right) {
checkNotNull(left, "Argument 'left' must not be null.");
checkNotNull(right, "Argument 'right' must not be null.");
return MIN_RANGE_COMPARATOR.compare(left.getRange(), right.getRange());
}
}
}