FeedRangeInternal.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.feedranges;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.IRoutingMapProvider;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.routing.HexConvert;
import com.azure.cosmos.implementation.routing.Int128;
import com.azure.cosmos.implementation.routing.NumberPartitionKeyComponent;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.PartitionKeyDefinitionVersion;
import com.azure.cosmos.models.PartitionKind;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
@JsonDeserialize(using = FeedRangeInternalDeserializer.class)
public abstract class FeedRangeInternal extends JsonSerializable implements FeedRange {
private final static Logger LOGGER = LoggerFactory.getLogger(FeedRangeInternal.class);
private final static Long UINT64_TO_DOUBLE_MASK = Long.parseUnsignedLong("9223372036854775808");
private final static Long UINT_MAX_VALUE = Long.parseUnsignedLong("4294967295");
public static FeedRangeInternal convert(final FeedRange feedRange) {
checkNotNull(feedRange, "Argument 'feedRange' must not be null");
if (feedRange instanceof FeedRangeInternal) {
return (FeedRangeInternal)feedRange;
}
String json = feedRange.toString();
return fromBase64EncodedJsonString(json);
}
/**
* Creates a range from a previously obtained string representation.
*
* @param base64EncodedJson A string representation of a feed range
* @return A feed range
*/
public static FeedRangeInternal fromBase64EncodedJsonString(String base64EncodedJson) {
checkNotNull(base64EncodedJson, "Argument 'base64EncodedJson' must not be null");
String json = new String(
Base64.getUrlDecoder().decode(base64EncodedJson),
StandardCharsets.UTF_8);
FeedRangeInternal parsedRange = FeedRangeInternal.tryParse(json);
if (parsedRange == null) {
throw new IllegalArgumentException(
String.format(
"The provided string '%s' does not represent any known format.",
json));
}
return parsedRange;
}
protected abstract Mono<Range<String>> getEffectiveRange(
IRoutingMapProvider routingMapProvider,
MetadataDiagnosticsContext metadataDiagnosticsCtx,
Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono);
public static Range<String> normalizeRange(Range<String> range) {
if (range.isMinInclusive() && !range.isMaxInclusive()) {
return range;
}
String min;
String max;
if (range.isMinInclusive()) {
min = range.getMin();
} else {
min = addToEffectivePartitionKey(range.getMin(), -1);
}
if (!range.isMaxInclusive()) {
max = range.getMax();
} else {
max = addToEffectivePartitionKey(range.getMax(), 1);
}
return new Range<>(min, max, true, false);
}
// Will return a normalized range with minInclusive and maxExclusive boundaries
public Mono<Range<String>> getNormalizedEffectiveRange(
IRoutingMapProvider routingMapProvider,
MetadataDiagnosticsContext metadataDiagnosticsCtx,
Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono
) {
return this.getEffectiveRange(routingMapProvider, metadataDiagnosticsCtx, collectionResolutionMono)
.map(FeedRangeInternal::normalizeRange);
}
private static String addToEffectivePartitionKey(
String effectivePartitionKey,
int value) {
checkArgument(
value == 1 || value == -1,
"Argument 'value' has invalid value - only 1 and -1 are allowed");
byte[] blob = hexBinaryToByteArray(effectivePartitionKey);
if (value == 1) {
for (int i = blob.length - 1; i >= 0; i--) {
if ((0xff & blob[i]) < 255) {
blob[i] = (byte)((0xff & blob[i]) + 1);
break;
} else {
blob[i] = 0;
}
}
} else {
for (int i = blob.length - 1; i >= 0; i--) {
if ((0xff & blob[i]) != 0) {
blob[i] = (byte)((0xff & blob[i]) - 1);
break;
} else {
blob[i] = (byte)255;
}
}
}
return HexConvert.bytesToHex(blob);
}
public abstract Mono<List<String>> getPartitionKeyRanges(
IRoutingMapProvider routingMapProvider,
RxDocumentServiceRequest request,
Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono);
public abstract Mono<RxDocumentServiceRequest> populateFeedRangeFilteringHeaders(
IRoutingMapProvider routingMapProvider,
RxDocumentServiceRequest request,
Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono);
public void populatePropertyBag() {
setProperties(this, false);
}
@Override
public String toString() {
String json = this.toJson();
if (json == null) {
return "";
}
return Base64.getUrlEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8));
}
public abstract void removeProperties(JsonSerializable serializable);
public void setProperties(
JsonSerializable serializable,
boolean populateProperties) {
if (populateProperties) {
super.populatePropertyBag();
}
}
public static FeedRangeInternal tryParse(final String jsonString) {
checkNotNull(jsonString, "Argument 'jsonString' must not be null");
final ObjectMapper mapper = Utils.getSimpleObjectMapper();
try {
return mapper.readValue(jsonString, FeedRangeInternal.class);
} catch (final IOException ioError) {
LOGGER.debug("Failed to parse feed range JSON {}", jsonString, ioError);
return null;
}
}
public Mono<List<FeedRangeEpkImpl>> trySplit(
IRoutingMapProvider routingMapProvider,
MetadataDiagnosticsContext metadataDiagnosticsCtx,
Mono<Utils.ValueHolder<DocumentCollection>> collectionResolutionMono,
int targetedSplitCount) {
return Mono.zip(
this.getNormalizedEffectiveRange(
routingMapProvider,
metadataDiagnosticsCtx,
collectionResolutionMono),
collectionResolutionMono)
.map(tuple -> {
Range<String> effectiveRange = tuple.getT1();
Utils.ValueHolder<DocumentCollection> collectionValueHolder = tuple.getT2();
if (collectionValueHolder.v == null) {
throw new IllegalStateException("Collection should have been resolved.");
}
PartitionKeyDefinition pkDefinition =
collectionValueHolder.v.getPartitionKey();
if (targetedSplitCount <= 1 ||
effectiveRange.isSingleValue() ||
// splitting ranges into sub ranges only possible for hash partitioning
pkDefinition.getKind() != PartitionKind.HASH) {
return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange));
}
PartitionKeyDefinitionVersion effectivePKVersion =
pkDefinition.getVersion() != null
? pkDefinition.getVersion()
: PartitionKeyDefinitionVersion.V1;
switch (effectivePKVersion) {
case V1:
return trySplitWithHashV1(effectiveRange, targetedSplitCount);
case V2:
return trySplitWithHashV2(effectiveRange, targetedSplitCount);
default:
return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange));
}
});
}
static List<FeedRangeEpkImpl> trySplitWithHashV1(
Range<String> effectiveRange,
int targetedSplitCount) {
long min = 0;
long max = UINT_MAX_VALUE;
if (!effectiveRange.getMin().equalsIgnoreCase(
PartitionKeyInternalHelper.MinimumInclusiveEffectivePartitionKey)) {
min = fromHexEncodedBinaryString(effectiveRange.getMin());
}
if (!effectiveRange.getMax().equalsIgnoreCase(
PartitionKeyInternalHelper.MaximumExclusiveEffectivePartitionKey)) {
max = fromHexEncodedBinaryString(effectiveRange.getMax());
}
String minRange = effectiveRange.getMin();
long diff = max - min;
List<FeedRangeEpkImpl> splitFeedRanges = new ArrayList<>(targetedSplitCount);
for (int i = 1; i < targetedSplitCount; i++) {
long splitPoint = min + (i * (diff / targetedSplitCount));
String maxRange = PartitionKeyInternalHelper.toHexEncodedBinaryString(
new NumberPartitionKeyComponent[] {
new NumberPartitionKeyComponent(splitPoint)
});
splitFeedRanges.add(
new FeedRangeEpkImpl(
new Range<>(
minRange,
maxRange,
i > 1 || effectiveRange.isMinInclusive(),
false)));
minRange = maxRange;
}
splitFeedRanges.add(
new FeedRangeEpkImpl(
new Range<>(
minRange,
effectiveRange.getMax(),
true,
effectiveRange.isMaxInclusive())));
return splitFeedRanges;
}
static List<FeedRangeEpkImpl> trySplitWithHashV2(
Range<String> effectiveRange,
int targetedSplitCount) {
Int128 min = new Int128(0);
if (!effectiveRange.getMin().equalsIgnoreCase(
PartitionKeyInternalHelper.MinimumInclusiveEffectivePartitionKey)) {
byte[] minBytes = hexBinaryToByteArray(effectiveRange.getMin());
min = new Int128(minBytes);
}
Int128 max = PartitionKeyInternalHelper.MaxHashV2Value;
if (!effectiveRange.getMax().equalsIgnoreCase(
PartitionKeyInternalHelper.MaximumExclusiveEffectivePartitionKey)) {
byte[] maxBytes = hexBinaryToByteArray(effectiveRange.getMax());
max = new Int128(maxBytes);
}
if (Int128.lt(
Int128.subtract(max, min),
new Int128(targetedSplitCount))) {
return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange));
}
String minRange = effectiveRange.getMin();
Int128 diff = Int128.subtract(max, min);
Int128 splitCountInt128 = new Int128(targetedSplitCount);
List<FeedRangeEpkImpl> splitFeedRanges = new ArrayList<>(targetedSplitCount);
for (int i = 1; i < targetedSplitCount; i++) {
byte[] currentBlob = Int128.add(
min,
Int128.multiply(new Int128(i), Int128.div(diff, splitCountInt128))
).bytes();
String maxRange = HexConvert.bytesToHex(currentBlob);
splitFeedRanges.add(
new FeedRangeEpkImpl(
new Range<>(
minRange,
maxRange,
i > 1 || effectiveRange.isMinInclusive(),
false)));
minRange = maxRange;
}
splitFeedRanges.add(
new FeedRangeEpkImpl(
new Range<>(
minRange,
effectiveRange.getMax(),
true,
effectiveRange.isMaxInclusive())));
return splitFeedRanges;
}
private static double decodeDoubleFromUInt64Long(long value) {
value = (value < UINT64_TO_DOUBLE_MASK) ? -value : value ^ UINT64_TO_DOUBLE_MASK;
return Double.longBitsToDouble(value);
}
static long fromHexEncodedBinaryString(String hexBinary) {
byte[] byteString = hexBinaryToByteArray(hexBinary);
if (byteString.length < 2 || byteString[0] != 5) {
throw new IllegalStateException("Invalid hex-byteString");
}
int byteStringOffset = 1;
int offset = 64;
long payload = 0;
// Decode first 8-bit chunk
offset -= 8;
payload |= (((long)byteString[byteStringOffset++]) & 0x00FF) << offset;
// Decode remaining 7-bit chunks
while (true) {
if (byteStringOffset >= byteString.length) {
throw new IllegalStateException("Incorrect byte string without termination");
}
byte currentByte = byteString[byteStringOffset++];
offset -= 7;
payload |= (((((long)(currentByte)) & 0x00FF) >> 1) << offset);
if ((currentByte & 0x01) == 0) {
break;
}
}
return (long)decodeDoubleFromUInt64Long(payload);
}
private static byte[] hexBinaryToByteArray(String hexBinary) {
checkNotNull(hexBinary, "Argument 'hexBinary' must not be null.");
int len = hexBinary.length();
checkArgument(
(len & 0x01) == 0,
"Argument 'hexBinary' must not have odd number of characters.");
byte[] blob = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
blob[i / 2] = (byte)((Character.digit(hexBinary.charAt(i), 16) << 4)
+ Character.digit(hexBinary.charAt(i + 1), 16));
}
return blob;
}
}