PipelinedDocumentQueryExecutionContext.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import reactor.core.publisher.Flux;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
*/
public class PipelinedDocumentQueryExecutionContext<T extends Resource> implements IDocumentQueryExecutionContext<T> {
private IDocumentQueryExecutionComponent<T> component;
private int actualPageSize;
private UUID correlatedActivityId;
private QueryInfo queryInfo;
private PipelinedDocumentQueryExecutionContext(IDocumentQueryExecutionComponent<T> component, int actualPageSize,
UUID correlatedActivityId, QueryInfo queryInfo) {
this.component = component;
this.actualPageSize = actualPageSize;
this.correlatedActivityId = correlatedActivityId;
this.queryInfo = queryInfo;
// this.executeNextSchedulingMetrics = new SchedulingStopwatch();
// this.executeNextSchedulingMetrics.Ready();
// DefaultTrace.TraceVerbose(string.Format(
// CultureInfo.InvariantCulture,
// "{0} Pipelined~Context, actual page size: {1}",
// DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture),
// this.actualPageSize));
}
public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T>> createAsync(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
PipelinedDocumentQueryParams<T> initParams) {
// Use nested callback pattern to unwrap the continuation token and query params at each level.
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createBaseComponentFunction;
QueryInfo queryInfo = initParams.getQueryInfo();
UUID correlatedActivityId = initParams.getCorrelatedActivityId();
CosmosQueryRequestOptions cosmosQueryRequestOptions = initParams.getCosmosQueryRequestOptions();
if (queryInfo.hasOrderBy()) {
createBaseComponentFunction = (continuationToken, documentQueryParams) -> {
CosmosQueryRequestOptions orderByCosmosQueryRequestOptions = ModelBridgeInternal.createQueryRequestOptions(cosmosQueryRequestOptions);
ModelBridgeInternal.setQueryRequestOptionsContinuationToken(orderByCosmosQueryRequestOptions, continuationToken);
initParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions);
return OrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams);
};
} else {
createBaseComponentFunction = (continuationToken, documentQueryParams) -> {
CosmosQueryRequestOptions parallelCosmosQueryRequestOptions = ModelBridgeInternal.createQueryRequestOptions(cosmosQueryRequestOptions);
ModelBridgeInternal.setQueryRequestOptionsContinuationToken(parallelCosmosQueryRequestOptions, continuationToken);
initParams.setCosmosQueryRequestOptions(parallelCosmosQueryRequestOptions);
return ParallelDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams);
};
}
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createAggregateComponentFunction;
if (queryInfo.hasAggregates() && !queryInfo.hasGroupBy()) {
createAggregateComponentFunction =
(continuationToken, documentQueryParams) ->
AggregateDocumentQueryExecutionContext.createAsync(createBaseComponentFunction,
queryInfo.getAggregates(),
queryInfo.getGroupByAliasToAggregateType(),
queryInfo.getGroupByAliases(),
queryInfo.hasSelectValue(),
continuationToken,
documentQueryParams);
} else {
createAggregateComponentFunction = createBaseComponentFunction;
}
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createDistinctComponentFunction;
if (queryInfo.hasDistinct()) {
createDistinctComponentFunction =
(continuationToken, documentQueryParams) ->
DistinctDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction,
queryInfo.getDistinctQueryType(),
continuationToken,
documentQueryParams);
} else {
createDistinctComponentFunction = createAggregateComponentFunction;
}
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createGroupByComponentFunction;
if (queryInfo.hasGroupBy()) {
createGroupByComponentFunction =
(continuationToken, documentQueryParams) ->
GroupByDocumentQueryExecutionContext.createAsync(createDistinctComponentFunction,
continuationToken,
queryInfo.getGroupByAliasToAggregateType(),
queryInfo.getGroupByAliases(),
queryInfo.hasSelectValue(),
documentQueryParams);
} else{
createGroupByComponentFunction = createDistinctComponentFunction;
}
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createSkipComponentFunction;
if (queryInfo.hasOffset()) {
createSkipComponentFunction =
(continuationToken, documentQueryParams) ->
SkipDocumentQueryExecutionContext.createAsync(createGroupByComponentFunction,
queryInfo.getOffset(),
continuationToken,
documentQueryParams);
} else {
createSkipComponentFunction = createGroupByComponentFunction;
}
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createTopComponentFunction;
if (queryInfo.hasTop()) {
createTopComponentFunction =
(continuationToken, documentQueryParams) ->
TopDocumentQueryExecutionContext.createAsync(createSkipComponentFunction,
queryInfo.getTop(),
queryInfo.getTop(),
continuationToken,
documentQueryParams);
} else {
createTopComponentFunction = createSkipComponentFunction;
}
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createTakeComponentFunction;
if (queryInfo.hasLimit()) {
createTakeComponentFunction = (continuationToken, documentQueryParams) -> {
int totalLimit = queryInfo.getLimit();
if (queryInfo.hasOffset()) {
// This is being done to match the limit from rewritten query
totalLimit = queryInfo.getOffset() + queryInfo.getLimit();
}
return TopDocumentQueryExecutionContext.createAsync(createTopComponentFunction,
queryInfo.getLimit(),
totalLimit,
continuationToken,
documentQueryParams);
};
} else {
createTakeComponentFunction = createTopComponentFunction;
}
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createDCountComponentFunction;
if (queryInfo.hasDCount()) {
createDCountComponentFunction = (continuationToken, documentQueryParams) -> {
return DCountDocumentQueryExecutionContext.createAsync(createTakeComponentFunction,
queryInfo,
continuationToken,
documentQueryParams);
};
} else {
createDCountComponentFunction = createTakeComponentFunction;
}
int actualPageSize = Utils.getValueOrDefault(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions),
ParallelQueryConfig.ClientInternalPageSize);
if (actualPageSize == -1) {
actualPageSize = Integer.MAX_VALUE;
}
int pageSize = Math.min(actualPageSize, Utils.getValueOrDefault(queryInfo.getTop(), (actualPageSize)));
return createDCountComponentFunction.apply(ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(cosmosQueryRequestOptions), initParams)
.map(c -> new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId, queryInfo));
}
public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T>> createReadManyAsync(
DiagnosticsClientContext diagnosticsClientContext, IDocumentQueryClient queryClient, String collectionResourceId, SqlQuerySpec sqlQuery,
Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap, CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceId, String collectionLink, UUID activityId, Class<T> klass,
ResourceType resourceTypeEnum) {
Flux<IDocumentQueryExecutionComponent<T>> documentQueryExecutionComponentFlux =
ParallelDocumentQueryExecutionContext.createReadManyQueryAsync(diagnosticsClientContext, queryClient,
collectionResourceId, sqlQuery,
rangeQueryMap,
cosmosQueryRequestOptions, resourceId,
collectionLink, activityId, klass,
resourceTypeEnum);
// TODO: Making pagesize -1. Should be reviewed
return documentQueryExecutionComponentFlux.map(c -> new PipelinedDocumentQueryExecutionContext<>(c, -1,
activityId, null));
}
@Override
public Flux<FeedResponse<T>> executeAsync() {
// TODO Auto-generated method stub
// TODO add more code here
return this.component.drainAsync(actualPageSize);
}
public QueryInfo getQueryInfo() {
return this.queryInfo;
}
}