RoutingMapProviderHelper.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.routing;
import com.azure.cosmos.implementation.IRoutingMapProvider;
import com.azure.cosmos.implementation.PartitionKeyRange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.TreeSet;
/**
* Provide utility functionality to route request in direct connectivity mode in the Azure Cosmos DB database service.
*/
public final class RoutingMapProviderHelper {
private static final Range.MaxComparator<String> MAX_COMPARATOR = new Range.MaxComparator<String>();
private static String max(String left, String right) {
return left.compareTo(right) < 0 ? right : left;
}
private static <T extends Comparable<T>> boolean isSortedAndNonOverlapping(List<Range<T>> list) {
for (int i = 1; i < list.size(); i++) {
Range<T> previousRange = list.get(i - 1);
Range<T> currentRange = list.get(i);
int compareResult = previousRange.getMax().compareTo(currentRange.getMin());
if (compareResult > 0) {
return false;
} else if (compareResult == 0 && previousRange.isMaxInclusive() && currentRange.isMinInclusive()) {
return false;
}
}
return true;
}
public static Collection<PartitionKeyRange> getOverlappingRanges(RoutingMapProvider routingMapProvider,
String collectionSelfLink, List<Range<String>> sortedRanges) {
if (!isSortedAndNonOverlapping(sortedRanges)) {
throw new IllegalArgumentException("sortedRanges");
}
List<PartitionKeyRange> targetRanges = new ArrayList<PartitionKeyRange>();
int currentProvidedRange = 0;
while (currentProvidedRange < sortedRanges.size()) {
if (sortedRanges.get(currentProvidedRange).isEmpty()) {
currentProvidedRange++;
continue;
}
Range<String> queryRange;
if (!targetRanges.isEmpty()) {
String left = max(targetRanges.get(targetRanges.size() - 1).getMaxExclusive(),
sortedRanges.get(currentProvidedRange).getMin());
boolean leftInclusive = left.compareTo(sortedRanges.get(currentProvidedRange).getMin()) == 0
? sortedRanges.get(currentProvidedRange).isMinInclusive() : false;
queryRange = new Range<String>(left, sortedRanges.get(currentProvidedRange).getMax(), leftInclusive,
sortedRanges.get(currentProvidedRange).isMaxInclusive());
} else {
queryRange = sortedRanges.get(currentProvidedRange);
}
targetRanges.addAll(routingMapProvider.getOverlappingRanges(collectionSelfLink, queryRange, false));
Range<String> lastKnownTargetRange = targetRanges.get(targetRanges.size() - 1).toRange();
while (currentProvidedRange < sortedRanges.size()
&& MAX_COMPARATOR.compare(sortedRanges.get(currentProvidedRange), lastKnownTargetRange) <= 0) {
currentProvidedRange++;
}
}
return targetRanges;
}
public static Mono<List<PartitionKeyRange>> getOverlappingRanges(
IRoutingMapProvider routingMapProvider,
String resourceId, List<Range<String>> sortedRanges) {
if (routingMapProvider == null){
throw new IllegalArgumentException("routingMapProvider");
}
if (sortedRanges == null) {
throw new IllegalArgumentException("sortedRanges");
}
// Removing duplicates from sortedranges to check for nonoverlap
TreeSet<Range<String>> distinctSortedRanges = new TreeSet<>(new Range.MinComparator<>());
distinctSortedRanges.addAll(sortedRanges);
if (!isSortedAndNonOverlapping(new ArrayList<>(distinctSortedRanges))) {
throw new IllegalArgumentException("sortedRanges");
}
List<PartitionKeyRange> targetRanges = new ArrayList<>();
final ListIterator<Range<String>> iterator = sortedRanges.listIterator();
return Flux.defer(() -> {
if (!iterator.hasNext()) {
return Flux.empty();
}
Range<String> queryRange;
Range<String> sortedRange = iterator.next();
if (!targetRanges.isEmpty()) {
String left = max(targetRanges.get(targetRanges.size() - 1).getMaxExclusive(),
sortedRange.getMin());
boolean leftInclusive = left.compareTo(sortedRange.getMin()) == 0 && sortedRange.isMinInclusive();
queryRange = new Range<String>(left, sortedRange.getMax(), leftInclusive,
sortedRange.isMaxInclusive());
} else {
queryRange = sortedRange;
}
return routingMapProvider.tryGetOverlappingRangesAsync(null, resourceId, queryRange, false, null)
.map(ranges -> ranges.v)
.map(targetRanges::addAll)
.flatMap(aBoolean -> {
if (!targetRanges.isEmpty()) {
Range<String> lastKnownTargetRange = targetRanges.get(targetRanges.size() - 1).toRange();
while (iterator.hasNext()) {
Range<String> value = iterator.next();
if (MAX_COMPARATOR.compare(value, lastKnownTargetRange) > 0) {
// Since we already moved forward on iterator to check above condition, we
// go to previous when it fails so the the value is not skipped on iteration
iterator.previous();
break;
}
}
}
return Mono.just(targetRanges);
}).flux();
}).repeat(sortedRanges.size())
.takeUntil(stringRange -> !iterator.hasNext())
.last()
.single();
}
}