OrderByDocumentQueryExecutionContext.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.ResourceId;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Undefined;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.Utils.ValueHolder;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.orderbyquery.ComparisonFilters;
import com.azure.cosmos.implementation.query.orderbyquery.ComparisonWithDefinedFilters;
import com.azure.cosmos.implementation.query.orderbyquery.ComparisonWithUndefinedFilters;
import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
*/
public class OrderByDocumentQueryExecutionContext<T extends Resource>
extends ParallelDocumentQueryExecutionContextBase<T> {
private final static String FormatPlaceHolder = "{documentdb-formattableorderbyquery-filter}";
private final static String True = "true";
private final String collectionRid;
private final OrderbyRowComparer<T> consumeComparer;
private final RequestChargeTracker tracker;
private final ConcurrentMap<String, QueryMetrics> queryMetricMap;
List<ClientSideRequestStatistics> clientSideRequestStatisticsList;
private Flux<OrderByRowResult<T>> orderByObservable;
private final Map<FeedRangeEpkImpl, OrderByContinuationToken> targetRangeToOrderByContinuationTokenMap;
private OrderByDocumentQueryExecutionContext(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
Class<T> klass,
SqlQuerySpec query,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceLink,
String rewrittenQuery,
boolean isContinuationExpected,
boolean getLazyFeedResponse,
OrderbyRowComparer<T> consumeComparer,
String collectionRid,
UUID correlatedActivityId) {
super(diagnosticsClientContext, client, resourceTypeEnum, klass, query, cosmosQueryRequestOptions, resourceLink, rewrittenQuery,
isContinuationExpected, getLazyFeedResponse, correlatedActivityId);
this.collectionRid = collectionRid;
this.consumeComparer = consumeComparer;
this.tracker = new RequestChargeTracker();
this.queryMetricMap = new ConcurrentHashMap<>();
this.clientSideRequestStatisticsList = new ArrayList<>();
targetRangeToOrderByContinuationTokenMap = new HashMap<>();
}
public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
PipelinedDocumentQueryParams<T> initParams) {
OrderByDocumentQueryExecutionContext<T> context = new OrderByDocumentQueryExecutionContext<T>(diagnosticsClientContext,
client,
initParams.getResourceTypeEnum(),
initParams.getResourceType(),
initParams.getQuery(),
initParams.getCosmosQueryRequestOptions(),
initParams.getResourceLink(),
initParams.getQueryInfo().getRewrittenQuery(),
initParams.isContinuationExpected(),
initParams.isGetLazyResponseFeed(),
new OrderbyRowComparer<T>(initParams.getQueryInfo().getOrderBy()),
initParams.getCollectionRid(),
initParams.getCorrelatedActivityId());
context.setTop(initParams.getTop());
try {
context.initialize(
initParams.getFeedRanges(),
initParams.getQueryInfo().getOrderBy(),
initParams.getQueryInfo().getOrderByExpressions(),
initParams.getInitialPageSize(),
ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(initParams.getCosmosQueryRequestOptions()));
return Flux.just(context);
} catch (CosmosException dce) {
return Flux.error(dce);
}
}
private void initialize(
List<FeedRangeEpkImpl> feedRanges, List<SortOrder> sortOrders,
Collection<String> orderByExpressions,
int initialPageSize,
String continuationToken) throws CosmosException {
if (continuationToken == null) {
// First iteration so use null continuation tokens and "true" filters
Map<FeedRangeEpkImpl, String> partitionKeyRangeToContinuationToken = new HashMap<>();
for (FeedRangeEpkImpl feedRangeEpk : feedRanges) {
partitionKeyRangeToContinuationToken.put(feedRangeEpk,
null);
}
super.initialize(collectionRid,
partitionKeyRangeToContinuationToken,
initialPageSize,
new SqlQuerySpec(querySpec.getQueryText().replace(FormatPlaceHolder, True),
querySpec.getParameters()));
} else {
// Check to see if order by continuation token is a valid JSON.
OrderByContinuationToken orderByContinuationToken;
ValueHolder<OrderByContinuationToken> outOrderByContinuationToken = new ValueHolder<OrderByContinuationToken>();
if (!OrderByContinuationToken.tryParse(continuationToken,
outOrderByContinuationToken)) {
String message = String.format("INVALID JSON in continuation token %s for OrderBy~Context",
continuationToken);
throw BridgeInternal.createCosmosException(HttpConstants.StatusCodes.BADREQUEST,
message);
}
orderByContinuationToken = outOrderByContinuationToken.v;
CompositeContinuationToken compositeContinuationToken = orderByContinuationToken
.getCompositeContinuationToken();
// Check to see if the ranges inside are valid
if (compositeContinuationToken.getRange().isEmpty()) {
String message = String.format("INVALID RANGE in the continuation token %s for OrderBy~Context.",
continuationToken);
throw BridgeInternal.createCosmosException(HttpConstants.StatusCodes.BADREQUEST,
message);
}
// Checking early if the token has a valid resource id
Pair<Boolean, ResourceId> booleanResourceIdPair = ResourceId.tryParse(orderByContinuationToken.getRid());
if (!booleanResourceIdPair.getLeft()) {
throw new BadRequestException(String.format("INVALID Rid in the continuation token %s for " +
"OrderBy~Context.",
orderByContinuationToken.getCompositeContinuationToken().getToken()));
}
// At this point the token is valid.
FormattedFilterInfo formattedFilterInfo = this.getFormattedFilters(orderByExpressions,
orderByContinuationToken
.getOrderByItems(),
sortOrders,
orderByContinuationToken.getInclusive());
PartitionMapper.PartitionMapping<OrderByContinuationToken> partitionMapping =
PartitionMapper.getPartitionMapping(feedRanges, Collections.singletonList(orderByContinuationToken));
initializeWithTokenAndFilter(partitionMapping.getMappingLeftOfTarget(), initialPageSize,
formattedFilterInfo.filterForRangesLeftOfTheTargetRange);
initializeWithTokenAndFilter(partitionMapping.getTargetMapping(), initialPageSize,
formattedFilterInfo.filterForTargetRange);
initializeWithTokenAndFilter(partitionMapping.getMappingRightOfTarget(), initialPageSize,
formattedFilterInfo.filterForRangesRightOfTheTargetRange);
}
orderByObservable = OrderByUtils.orderedMerge(resourceType,
consumeComparer,
tracker,
documentProducers,
queryMetricMap,
targetRangeToOrderByContinuationTokenMap,
clientSideRequestStatisticsList);
}
private void initializeWithTokenAndFilter(Map<FeedRangeEpkImpl, OrderByContinuationToken> rangeToTokenMapping,
int initialPageSize,
String filter) {
for (Map.Entry<FeedRangeEpkImpl, OrderByContinuationToken> entry :
rangeToTokenMapping.entrySet()) {
targetRangeToOrderByContinuationTokenMap.put(entry.getKey(), entry.getValue());
Map<FeedRangeEpkImpl, String> partitionKeyRangeToContinuationToken = new HashMap<FeedRangeEpkImpl, String>();
partitionKeyRangeToContinuationToken.put(entry.getKey(), null);
super.initialize(collectionRid,
partitionKeyRangeToContinuationToken,
initialPageSize,
new SqlQuerySpec(querySpec.getQueryText().replace(FormatPlaceHolder,
filter),
querySpec.getParameters()));
}
}
private OrderByDocumentQueryExecutionContext<T>.FormattedFilterInfo getFormattedFilters(
Collection<String> orderByExpressionCollection,
QueryItem[] orderByItems,
Collection<SortOrder> sortOrderCollection,
boolean inclusive) {
// Convert to arrays
SortOrder[] sortOrders = new SortOrder[sortOrderCollection.size()];
sortOrderCollection.toArray(sortOrders);
String[] expressions = new String[orderByExpressionCollection.size()];
orderByExpressionCollection.toArray(expressions);
// Validate the inputs
if (expressions.length != sortOrders.length) {
throw new IllegalArgumentException("expressions.size() != sortOrders.size()");
}
if (expressions.length != orderByItems.length) {
throw new IllegalArgumentException("expressions.size() != orderByItems.length");
}
// When we run cross partition queries,
// we only serialize the continuation token for the partition that we left off
// on.
// The only problem is that when we resume the order by query,
// we don't have continuation tokens for all other partitions.
// The saving grace is that the data has a composite sort order(query sort
// order, partition key range id)
// so we can generate range filters which in turn the backend will turn into rid
// based continuation tokens,
// which is enough to get the streams of data flowing from all partitions.
// The details of how this is done is described below:
int numOrderByItems = expressions.length;
boolean isSingleOrderBy = numOrderByItems == 1;
StringBuilder left = new StringBuilder();
StringBuilder target = new StringBuilder();
StringBuilder right = new StringBuilder();
if (isSingleOrderBy) {
// For a single order by query we resume the continuations in this manner
// Suppose the query is SELECT* FROM c ORDER BY c.string ASC
// And we left off on partition N with the value "B"
// Then
// ALL the partitions to the left will have finished reading "B"
// Partition N is still reading "B"
// ALL the partitions to the right have let to read a "B
// Therefore the filters should be
// > "B" , >= "B", and >= "B" respectively
// Repeat the same logic for DESC and you will get
// < "B", <= "B", and <= "B" respectively
// The general rule becomes
// For ASC
// > for partitions to the left
// >= for the partition we left off on
// >= for the partitions to the right
// For DESC
// < for partitions to the left
// <= for the partition we left off on
// <= for the partitions to the right
String expression = expressions[0];
SortOrder sortOrder = sortOrders[0];
QueryItem orderByItem = orderByItems[0];
Object rawItem = orderByItem.getItem();
this.appendToBuilders(left, target, right, "(");
String orderByItemToString = this.getOrderByItemString(rawItem);
// Handling undefined needs filter literals
// What we really want is to support expression > undefined,
// but the engine evaluates to undefined instead of true or false,
// so we work around this by using the IS_DEFINED() system function
// ComparisonWithUndefinedFilters is used to handle the logic mentioned above
ComparisonFilters filters =
rawItem == Undefined.value() ? new ComparisonWithUndefinedFilters(expression) : new ComparisonWithDefinedFilters(expression, orderByItemToString);
left.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
if (inclusive) {
target.append(sortOrder == SortOrder.Descending ? filters.lessThanOrEqualTo() : filters.greaterThanOrEqualTo());
} else {
target.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
}
right.append(sortOrder == SortOrder.Descending ? filters.lessThanOrEqualTo() : filters.greaterThanOrEqualTo());
// Now we need to include all the types that match the sort order.
List<String> definedFunctions =
IsSystemFunctions.getIsDefinedFunctions(ItemTypeHelper.getOrderByItemType(rawItem),
sortOrder == SortOrder.Ascending);
StringBuilder isDefinedFuncBuilder = new StringBuilder();
for (String idf : definedFunctions) {
isDefinedFuncBuilder.append(" OR ");
isDefinedFuncBuilder.append(String.format("%s(%s)", idf, expression));
}
String isDefinedFunctions = isDefinedFuncBuilder.toString();
left.append(isDefinedFunctions);
target.append(isDefinedFunctions);
right.append(isDefinedFunctions);
this.appendToBuilders(left, target, right, ")");
} else {
// For a multi order by query
// Suppose the query is SELECT* FROM c ORDER BY c.string ASC, c.number ASC
// And we left off on partition N with the value("A", 1)
// Then
// All the partitions to the left will have finished reading("A", 1)
// Partition N is still reading("A", 1)
// All the partitions to the right have let to read a "(A", 1)
// The filters are harder to derive since there are multiple columns
// But the problem reduces to "How do you know one document comes after another in a multi order by query"
// The answer is to just look at it one column at a time.
// For this particular scenario:
// If a first column is greater ex. ("B", blah), then the document comes later in the sort order
// Therefore we want all documents where the first column is greater than "A" which means > "A"
// Or if the first column is a tie, then you look at the second column ex. ("A", blah).
// Therefore we also want all documents where the first column was a tie but the second column is greater which means = "A" AND > 1
// Therefore the filters should be
// (> "A") OR (= "A" AND > 1), (> "A") OR (= "A" AND >= 1), (> "A") OR (= "A" AND >= 1)
// Notice that if we repeated the same logic we for single order by we would have gotten
// > "A" AND > 1, >= "A" AND >= 1, >= "A" AND >= 1
// which is wrong since we missed some documents
// Repeat the same logic for ASC, DESC
// (> "A") OR (= "A" AND < 1), (> "A") OR (= "A" AND <= 1), (> "A") OR (= "A" AND <= 1)
// Again for DESC, ASC
// (< "A") OR (= "A" AND > 1), (< "A") OR (= "A" AND >= 1), (< "A") OR (= "A" AND >= 1)
// And again for DESC DESC
// (< "A") OR (= "A" AND < 1), (< "A") OR (= "A" AND <= 1), (< "A") OR (= "A" AND <= 1)
// The general we look at all prefixes of the order by columns to look for tie breakers.
// Except for the full prefix whose last column follows the rules for single item order by
// And then you just OR all the possibilities together
for (int prefixLength = 1; prefixLength <= numOrderByItems; prefixLength++) {
boolean lastPrefix = prefixLength == numOrderByItems;
this.appendToBuilders(left, target, right, "(");
for (int index = 0; index < prefixLength; index++) {
String expression = expressions[index];
SortOrder sortOrder = sortOrders[index];
QueryItem orderbyItem = orderByItems[index];
Object orderbyRawItem = orderbyItem.getItem();
boolean lastItem = index == prefixLength - 1;
this.appendToBuilders(left, target, right, "(");
String orderByItemToString = getOrderByItemString(orderbyRawItem);
ComparisonFilters filters =
orderbyRawItem == Undefined.value() ? new ComparisonWithUndefinedFilters((expression)) : new ComparisonWithDefinedFilters(expression, orderByItemToString);
if (lastItem) {
if (lastPrefix) {
left.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
if (inclusive) {
target.append(sortOrder == SortOrder.Descending ? filters.lessThanOrEqualTo() : filters.greaterThanOrEqualTo());
} else {
target.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
}
right.append(sortOrder == SortOrder.Descending ? filters.lessThanOrEqualTo() : filters.greaterThanOrEqualTo());
} else {
left.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
target.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
right.append(sortOrder == SortOrder.Descending ? filters.lessThan() : filters.greaterThan());
}
} else {
left.append(filters.equalTo());
target.append(filters.equalTo());
right.append(filters.equalTo());
}
if (lastItem) {
// Now we need to include all the types that match the sort order.
List<String> definedFunctions =
IsSystemFunctions.getIsDefinedFunctions(ItemTypeHelper.getOrderByItemType(orderbyRawItem),
sortOrder == SortOrder.Ascending);
StringBuilder isDefinedFuncBuilder = new StringBuilder();
for (String idf : definedFunctions) {
isDefinedFuncBuilder.append(" OR ");
isDefinedFuncBuilder.append(String.format("%s(%s)", idf, expression));
}
String isDefinedFunctions = isDefinedFuncBuilder.toString();
left.append(isDefinedFunctions);
target.append(isDefinedFunctions);
right.append(isDefinedFunctions);
}
this.appendToBuilders(left, target, right, ")");
if (!lastItem) {
this.appendToBuilders(left, target, right, " AND ");
}
}
this.appendToBuilders(left, target, right, ")");
if (!lastPrefix) {
this.appendToBuilders(left, target, right, " OR ");
}
}
}
return new FormattedFilterInfo(left.toString(),
target.toString(),
right.toString());
}
private String getOrderByItemString(Object orderbyRawItem) {
String orderByItemToString;
if (orderbyRawItem instanceof String) {
orderByItemToString = "\"" + orderbyRawItem.toString().replaceAll("\"",
"\\\"") + "\"";
} else {
if (orderbyRawItem != null) {
orderByItemToString = orderbyRawItem.toString();
} else {
orderByItemToString = "null";
}
}
return orderByItemToString;
}
private void appendToBuilders(StringBuilder leftBuilder, StringBuilder targetBuilder, StringBuilder rightBuilder, String appendText) {
this.appendToBuilders(leftBuilder, targetBuilder, rightBuilder, appendText, appendText, appendText);
}
private void appendToBuilders(
StringBuilder leftBuilder,
StringBuilder targetBuilder,
StringBuilder rightBuilder,
String leftAppendText,
String targetAppendText,
String rightAppendText) {
leftBuilder.append(leftAppendText);
targetBuilder.append(targetAppendText);
rightBuilder.append(rightAppendText);
}
static class IsSystemFunctions {
static final String Defined = "IS_DEFINED";
static final String NotDefined = "NOT IS_DEFINED";
static final String Null = "IS_NULL";
static final String Boolean = "IS_BOOLEAN";
static final String Number = "IS_NUMBER";
static final String IsString = "IS_STRING";
static final String Array = "IS_ARRAY";
static final String Object = "IS_OBJECT";
static final List<String> systemFunctionSortOrder = Arrays.asList(IsSystemFunctions.NotDefined,
IsSystemFunctions.Null,
IsSystemFunctions.Boolean,
IsSystemFunctions.Number,
IsSystemFunctions.IsString,
IsSystemFunctions.Array,
IsSystemFunctions.Object);
final List<String> extendedTypesSystemFunctionSortOrder = Arrays.asList(IsSystemFunctions.NotDefined,
IsSystemFunctions.Defined);
private static class SortOrder {
public static final int Undefined = 0;
public static final int Null = 1;
public static final int Boolean = 2;
public static final int Number = 3;
public static final int String = 4;
public static final int Array = 5;
public static final int Object = 6;
}
private static class ExtendedTypesSortOrder {
public static final int Undefined = 0;
public static final int Defined = 1;
}
public static List<String> getIsDefinedFunctions(ItemType itemtype, boolean isAscending) {
switch (itemtype) {
case NoValue:
return getIsDefinedFunctionsInternal(SortOrder.Undefined, isAscending);
case Null:
return getIsDefinedFunctionsInternal(SortOrder.Null, isAscending);
case Boolean:
return getIsDefinedFunctionsInternal(SortOrder.Boolean, isAscending);
case Number:
return getIsDefinedFunctionsInternal(SortOrder.Number, isAscending);
case String:
return getIsDefinedFunctionsInternal(SortOrder.String, isAscending);
case ArrayNode:
return getIsDefinedFunctionsInternal(SortOrder.Array, isAscending);
case ObjectNode:
return getIsDefinedFunctionsInternal(SortOrder.Object, isAscending);
}
return null;
}
private static List<String> getIsDefinedFunctionsInternal(int index, boolean isAscending) {
return isAscending ? systemFunctionSortOrder.subList(index + 1, systemFunctionSortOrder.size())
: systemFunctionSortOrder.subList(0, index);
}
// For future use
List<String> getExtendedTypesIsDefinedFunctions(int index, boolean isAscending) {
return isAscending ?
extendedTypesSystemFunctionSortOrder.subList(index + 1,
extendedTypesSystemFunctionSortOrder.size())
: extendedTypesSystemFunctionSortOrder.subList(0, index);
}
}
@Override
protected OrderByDocumentProducer<T> createDocumentProducer(
String collectionRid,
PartitionKeyRange targetRange,
String continuationToken,
int initialPageSize,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
SqlQuerySpec querySpecForInit,
Map<String, String> commonRequestHeaders,
TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
Callable<DocumentClientRetryPolicy> createRetryPolicyFunc, FeedRangeEpkImpl feedRange) {
return new OrderByDocumentProducer<T>(consumeComparer,
client,
collectionRid,
cosmosQueryRequestOptions,
createRequestFunc,
executeFunc,
targetRange,
feedRange,
collectionRid,
createRetryPolicyFunc,
resourceType,
correlatedActivityId,
initialPageSize,
continuationToken,
top,
this.targetRangeToOrderByContinuationTokenMap);
}
private static class ItemToPageTransformer<T extends Resource>
implements Function<Flux<OrderByRowResult<T>>, Flux<FeedResponse<T>>> {
private final static int DEFAULT_PAGE_SIZE = 100;
private final RequestChargeTracker tracker;
private final int maxPageSize;
private final ConcurrentMap<String, QueryMetrics> queryMetricMap;
private final Function<OrderByRowResult<T>, String> orderByContinuationTokenCallback;
private final List<ClientSideRequestStatistics> clientSideRequestStatisticsList;
private volatile FeedResponse<OrderByRowResult<T>> previousPage;
public ItemToPageTransformer(
RequestChargeTracker tracker,
int maxPageSize,
ConcurrentMap<String, QueryMetrics> queryMetricsMap,
Function<OrderByRowResult<T>, String> orderByContinuationTokenCallback,
List<ClientSideRequestStatistics> clientSideRequestStatisticsList) {
this.tracker = tracker;
this.maxPageSize = maxPageSize > 0 ? maxPageSize : DEFAULT_PAGE_SIZE;
this.queryMetricMap = queryMetricsMap;
this.orderByContinuationTokenCallback = orderByContinuationTokenCallback;
this.previousPage = null;
this.clientSideRequestStatisticsList = clientSideRequestStatisticsList;
}
private static Map<String, String> headerResponse(
double requestCharge) {
return Utils.immutableMapOf(HttpConstants.HttpHeaders.REQUEST_CHARGE,
String.valueOf(requestCharge));
}
private FeedResponse<OrderByRowResult<T>> addOrderByContinuationToken(
FeedResponse<OrderByRowResult<T>> page,
String orderByContinuationToken) {
Map<String, String> headers = new HashMap<>(page.getResponseHeaders());
headers.put(HttpConstants.HttpHeaders.CONTINUATION,
orderByContinuationToken);
return BridgeInternal.createFeedResponseWithQueryMetrics(page.getResults(),
headers,
BridgeInternal.queryMetricsFromFeedResponse(page),
ModelBridgeInternal.getQueryPlanDiagnosticsContext(page),
false,
false,
page.getCosmosDiagnostics());
}
@Override
public Flux<FeedResponse<T>> apply(Flux<OrderByRowResult<T>> source) {
return source
// .windows: creates an observable of observable where inner observable
// emits max maxPageSize elements
.window(maxPageSize).map(Flux::collectList)
// flattens the observable<Observable<List<OrderByRowResult<T>>>> to
// Observable<List<OrderByRowResult<T>>>
.flatMap(resultListObs -> resultListObs,
1)
// translates Observable<List<OrderByRowResult<T>>> to
// Observable<FeedResponsePage<OrderByRowResult<T>>>>
.map(orderByRowResults -> {
// construct a page from result with request charge
FeedResponse<OrderByRowResult<T>> feedResponse = BridgeInternal.createFeedResponse(
orderByRowResults,
headerResponse(tracker.getAndResetCharge()));
if (!queryMetricMap.isEmpty()) {
for (Map.Entry<String, QueryMetrics> entry : queryMetricMap.entrySet()) {
BridgeInternal.putQueryMetricsIntoMap(feedResponse,
entry.getKey(),
entry.getValue());
}
}
return feedResponse;
})
// Emit an empty page so the downstream observables know when there are no more
// results.
.concatWith(Flux.defer(() -> {
return Flux.just(BridgeInternal.createFeedResponse(Utils.immutableListOf(),
null));
}))
// CREATE pairs from the stream to allow the observables downstream to "peek"
// 1, 2, 3, null -> (null, 1), (1, 2), (2, 3), (3, null)
.map(orderByRowResults -> {
ImmutablePair<FeedResponse<OrderByRowResult<T>>, FeedResponse<OrderByRowResult<T>>> previousCurrent = new ImmutablePair<FeedResponse<OrderByRowResult<T>>, FeedResponse<OrderByRowResult<T>>>(
this.previousPage,
orderByRowResults);
this.previousPage = orderByRowResults;
return previousCurrent;
})
// remove the (null, 1)
.skip(1)
// Add the continuation token based on the current and next page.
.map(currentNext -> {
FeedResponse<OrderByRowResult<T>> current = currentNext.left;
FeedResponse<OrderByRowResult<T>> next = currentNext.right;
FeedResponse<OrderByRowResult<T>> page;
if (next.getResults().size() == 0) {
// No more pages no send current page with null continuation token
page = current;
page = this.addOrderByContinuationToken(page,
null);
} else {
// Give the first page but use the first value in the next page to generate the
// continuation token
page = current;
List<OrderByRowResult<T>> results = next.getResults();
OrderByRowResult<T> firstElementInNextPage = results.get(0);
String orderByContinuationToken = this.orderByContinuationTokenCallback
.apply(firstElementInNextPage);
page = this.addOrderByContinuationToken(page,
orderByContinuationToken);
}
return page;
}).map(feedOfOrderByRowResults -> {
// FeedResponse<OrderByRowResult<T>> to FeedResponse<T>
List<T> unwrappedResults = new ArrayList<T>();
for (OrderByRowResult<T> orderByRowResult : feedOfOrderByRowResults.getResults()) {
unwrappedResults.add(orderByRowResult.getPayload());
}
FeedResponse<T> feedResponse = BridgeInternal.createFeedResponseWithQueryMetrics(unwrappedResults,
feedOfOrderByRowResults.getResponseHeaders(),
BridgeInternal.queryMetricsFromFeedResponse(feedOfOrderByRowResults),
ModelBridgeInternal.getQueryPlanDiagnosticsContext(feedOfOrderByRowResults),
false,
false, feedOfOrderByRowResults.getCosmosDiagnostics());
BridgeInternal.addClientSideDiagnosticsToFeed(feedResponse.getCosmosDiagnostics(),
clientSideRequestStatisticsList);
return feedResponse;
}).switchIfEmpty(Flux.defer(() -> {
// create an empty page if there is no result
FeedResponse<T> frp = BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(),
headerResponse(
tracker.getAndResetCharge()),
queryMetricMap,
null,
false,
false,
null);
BridgeInternal.addClientSideDiagnosticsToFeed(frp.getCosmosDiagnostics(),
clientSideRequestStatisticsList);
return Flux.just(frp);
}));
}
}
@Override
public Flux<FeedResponse<T>> drainAsync(
int maxPageSize) {
//// In order to maintain the continuation token for the user we must drain with
//// a few constraints
//// 1) We always drain from the partition, which has the highest priority item
//// first
//// 2) If multiple partitions have the same priority item then we drain from
//// the left most first
//// otherwise we would need to keep track of how many of each item we drained
//// from each partition
//// (just like parallel queries).
//// Visually that look the following case where we have three partitions that
//// are numbered and store letters.
//// For teaching purposes I have made each item a tuple of the following form:
//// <item stored in partition, partition number>
//// So that duplicates across partitions are distinct, but duplicates within
//// partitions are indistinguishable.
//// |-------| |-------| |-------|
//// | <a,1> | | <a,2> | | <a,3> |
//// | <a,1> | | <b,2> | | <c,3> |
//// | <a,1> | | <b,2> | | <c,3> |
//// | <d,1> | | <c,2> | | <c,3> |
//// | <d,1> | | <e,2> | | <f,3> |
//// | <e,1> | | <h,2> | | <j,3> |
//// | <f,1> | | <i,2> | | <k,3> |
//// |-------| |-------| |-------|
//// Now the correct drain order in this case is:
//// <a,1>,<a,1>,<a,1>,<a,2>,<a,3>,<b,2>,<b,2>,<c,2>,<c,3>,<c,3>,<c,3>,
//// <d,1>,<d,1>,<e,1>,<e,2>,<f,1>,<f,3>,<h,2>,<i,2>,<j,3>,<k,3>
//// In more mathematical terms
//// 1) <x, y> always comes before <z, y> where x < z
//// 2) <i, j> always come before <i, k> where j < k
return this.orderByObservable.transformDeferred(new ItemToPageTransformer<T>(tracker,
maxPageSize,
this.queryMetricMap,
this::getContinuationToken,
this.clientSideRequestStatisticsList));
}
@Override
public Flux<FeedResponse<T>> executeAsync() {
return drainAsync(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions));
}
private String getContinuationToken(
OrderByRowResult<T> orderByRowResult) {
// rid
String rid = orderByRowResult.getResourceId();
// CompositeContinuationToken
String backendContinuationToken = orderByRowResult.getSourceBackendContinuationToken();
Range<String> range = orderByRowResult.getSourceRange().getRange();
boolean inclusive = true;
CompositeContinuationToken compositeContinuationToken = new CompositeContinuationToken(backendContinuationToken,
range);
// OrderByItems
QueryItem[] orderByItems = new QueryItem[orderByRowResult.getOrderByItems().size()];
orderByRowResult.getOrderByItems().toArray(orderByItems);
return new OrderByContinuationToken(compositeContinuationToken,
orderByItems,
rid,
inclusive).toJson();
}
private final class FormattedFilterInfo {
private final String filterForRangesLeftOfTheTargetRange;
private final String filterForTargetRange;
private final String filterForRangesRightOfTheTargetRange;
public FormattedFilterInfo(
String filterForRangesLeftOfTheTargetRange,
String filterForTargetRange,
String filterForRangesRightOfTheTargetRange) {
if (filterForRangesLeftOfTheTargetRange == null) {
throw new IllegalArgumentException("filterForRangesLeftOfTheTargetRange must not be null.");
}
if (filterForTargetRange == null) {
throw new IllegalArgumentException("filterForTargetRange must not be null.");
}
if (filterForRangesRightOfTheTargetRange == null) {
throw new IllegalArgumentException("filterForRangesRightOfTheTargetRange must not be null.");
}
this.filterForRangesLeftOfTheTargetRange = filterForRangesLeftOfTheTargetRange;
this.filterForTargetRange = filterForTargetRange;
this.filterForRangesRightOfTheTargetRange = filterForRangesRightOfTheTargetRange;
}
/**
* @return the filterForRangesLeftOfTheTargetRange
*/
public String getFilterForRangesLeftOfTheTargetRange() {
return filterForRangesLeftOfTheTargetRange;
}
/**
* @return the filterForTargetRange
*/
public String getFilterForTargetRange() {
return filterForTargetRange;
}
/**
* @return the filterForRangesRightOfTheTargetRange
*/
public String getFilterForRangesRightOfTheTargetRange() {
return filterForRangesRightOfTheTargetRange;
}
}
}