DocumentQueryExecutionContextFactory.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
*/
public class DocumentQueryExecutionContextFactory {
private final static int PageSizeFactorForTop = 5;
private static final Logger logger = LoggerFactory.getLogger(DocumentQueryExecutionContextFactory.class);
// Limiting cache size to 1000 for now. Can be updated in future based on need
private static final int MAX_CACHE_SIZE = 1000;
private static Mono<Utils.ValueHolder<DocumentCollection>> resolveCollection(DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
String resourceLink) {
RxCollectionCache collectionCache = client.getCollectionCache();
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(diagnosticsClientContext,
OperationType.Query,
resourceTypeEnum,
resourceLink, null
// TODO AuthorizationTokenType.INVALID)
); //this request doesnt actually go to server
return collectionCache.resolveCollectionAsync(null, request);
}
private static <T extends Resource> Mono<Pair<List<Range<String>>,QueryInfo>> getPartitionKeyRangesAndQueryInfo(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
SqlQuerySpec query,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceLink,
DocumentCollection collection,
DefaultDocumentQueryExecutionContext<T> queryExecutionContext, boolean queryPlanCachingEnabled,
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {
// The partitionKeyRangeIdInternal is no more a public API on
// FeedOptions, but have the below condition
// for handling ParallelDocumentQueryTest#partitionKeyRangeId
if (cosmosQueryRequestOptions != null &&
!StringUtils.isEmpty(ModelBridgeInternal.getPartitionKeyRangeIdInternal(cosmosQueryRequestOptions))) {
Mono<List<PartitionKeyRange>> partitionKeyRanges = queryExecutionContext
.getTargetPartitionKeyRangesById(
collection.getResourceId(),
ModelBridgeInternal.getPartitionKeyRangeIdInternal(cosmosQueryRequestOptions));
return partitionKeyRanges.map(pkRanges -> {
List<Range<String>> ranges =
pkRanges.stream().map(PartitionKeyRange::toRange).collect(Collectors.toList());
return Pair.of(ranges, QueryInfo.EMPTY);
});
}
Instant startTime = Instant.now();
Mono<PartitionedQueryExecutionInfo> queryExecutionInfoMono;
if (ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor()
.isQueryPlanRetrievalDisallowed(cosmosQueryRequestOptions)) {
Instant endTime = Instant.now(); // endTime for query plan diagnostics
return getTargetRangesFromEmptyQueryPlan(
cosmosQueryRequestOptions,
collection,
queryExecutionContext,
startTime,
endTime);
}
if (queryPlanCachingEnabled &&
isScopedToSinglePartition(cosmosQueryRequestOptions) &&
queryPlanCache.containsKey(query.getQueryText())) {
Instant endTime = Instant.now(); // endTime for query plan diagnostics
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlanCache.get(query.getQueryText());
if (partitionedQueryExecutionInfo != null) {
logger.debug("Skipping query plan round trip by using the cached plan");
return getTargetRangesFromQueryPlan(cosmosQueryRequestOptions, collection, queryExecutionContext,
partitionedQueryExecutionInfo, startTime, endTime);
}
}
queryExecutionInfoMono =
QueryPlanRetriever.getQueryPlanThroughGatewayAsync(
diagnosticsClientContext,
client,
query,
resourceLink,
cosmosQueryRequestOptions != null ? cosmosQueryRequestOptions.getPartitionKey() : null);
return queryExecutionInfoMono.flatMap(
partitionedQueryExecutionInfo -> {
Instant endTime = Instant.now();
if (queryPlanCachingEnabled && isScopedToSinglePartition(cosmosQueryRequestOptions)) {
tryCacheQueryPlan(query, partitionedQueryExecutionInfo, queryPlanCache);
}
return getTargetRangesFromQueryPlan(cosmosQueryRequestOptions, collection, queryExecutionContext,
partitionedQueryExecutionInfo, startTime, endTime);
});
}
private static <T extends Resource> Mono<Pair<List<Range<String>>, QueryInfo>> getTargetRangesFromQueryPlan(
CosmosQueryRequestOptions cosmosQueryRequestOptions, DocumentCollection collection,
DefaultDocumentQueryExecutionContext<T> queryExecutionContext,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, Instant planFetchStartTime,
Instant planFetchEndTime) {
QueryInfo queryInfo =
partitionedQueryExecutionInfo.getQueryInfo();
queryInfo.setQueryPlanDiagnosticsContext(new QueryInfo.QueryPlanDiagnosticsContext(planFetchStartTime,
planFetchEndTime,
partitionedQueryExecutionInfo.getQueryPlanRequestTimeline()));
List<Range<String>> queryRanges =
partitionedQueryExecutionInfo.getQueryRanges();
if (isScopedToSinglePartition(cosmosQueryRequestOptions)) {
PartitionKeyInternal internalPartitionKey =
BridgeInternal.getPartitionKeyInternal(cosmosQueryRequestOptions.getPartitionKey());
Range<String> range = Range.getPointRange(
internalPartitionKey.getEffectivePartitionKeyString(internalPartitionKey,collection.getPartitionKey()));
queryRanges = Collections.singletonList(range);
}
if (cosmosQueryRequestOptions != null && cosmosQueryRequestOptions.getFeedRange() != null) {
FeedRange userProvidedFeedRange = cosmosQueryRequestOptions.getFeedRange();
return queryExecutionContext.getTargetRange(collection.getResourceId(),
FeedRangeInternal.convert(userProvidedFeedRange))
.map(range -> Pair.of(Collections.singletonList(range),
partitionedQueryExecutionInfo.getQueryInfo()));
}
return
queryExecutionContext.getTargetPartitionKeyRanges(collection.getResourceId(), queryRanges)
.map(pkRanges -> {
List<Range<String>> ranges =
pkRanges.stream().map(PartitionKeyRange::toRange).collect(Collectors.toList());
return Pair.of(
ranges,
partitionedQueryExecutionInfo.getQueryInfo());
});
}
private static <T extends Resource> Mono<Pair<List<Range<String>>, QueryInfo>> getTargetRangesFromEmptyQueryPlan(
CosmosQueryRequestOptions cosmosQueryRequestOptions,
DocumentCollection collection,
DefaultDocumentQueryExecutionContext<T> queryExecutionContext,
Instant planFetchStartTime,
Instant planFetchEndTime) {
if (cosmosQueryRequestOptions == null ||
cosmosQueryRequestOptions.getFeedRange() == null) {
throw new IllegalStateException(
"Query plan retrieval must not be suppressed when not using FeedRanges");
}
QueryInfo queryInfo = QueryInfo.EMPTY;
queryInfo.setQueryPlanDiagnosticsContext(
new QueryInfo.QueryPlanDiagnosticsContext(
planFetchStartTime,
planFetchEndTime));
FeedRange userProvidedFeedRange = cosmosQueryRequestOptions.getFeedRange();
return queryExecutionContext
.getTargetRange(
collection.getResourceId(),
FeedRangeInternal.convert(userProvidedFeedRange))
.map(range -> Pair.of(
Collections.singletonList(range),
queryInfo));
}
private static void tryCacheQueryPlan(
SqlQuerySpec query,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {
QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo();
if (canCacheQuery(queryInfo) && !queryPlanCache.containsKey(query.getQueryText())) {
queryPlanCache.put(query.getQueryText(), partitionedQueryExecutionInfo);
}
}
private static boolean canCacheQuery(QueryInfo queryInfo) {
// Query plan will not be cached for the types below
return !queryInfo.hasAggregates()
&& !queryInfo.hasDistinct()
&& !queryInfo.hasGroupBy()
&& !queryInfo.hasLimit()
&& !queryInfo.hasTop()
&& !queryInfo.hasOffset()
&& !queryInfo.hasDCount()
&& !queryInfo.hasOrderBy();
}
private static boolean isScopedToSinglePartition(CosmosQueryRequestOptions cosmosQueryRequestOptions) {
return cosmosQueryRequestOptions != null
&& cosmosQueryRequestOptions.getPartitionKey() != null
&& cosmosQueryRequestOptions.getPartitionKey() != PartitionKey.NONE;
}
public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext<T>> createDocumentQueryExecutionContextAsync(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
Class<T> resourceType,
SqlQuerySpec query,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceLink,
boolean isContinuationExpected,
UUID correlatedActivityId,
boolean queryPlanCachingEnabled,
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {
// return proxy
Flux<Utils.ValueHolder<DocumentCollection>> collectionObs = Flux.just(new Utils.ValueHolder<>(null));
if (resourceTypeEnum.isCollectionChild()) {
collectionObs = resolveCollection(diagnosticsClientContext, client, resourceTypeEnum, resourceLink).flux();
}
DefaultDocumentQueryExecutionContext<T> queryExecutionContext = new DefaultDocumentQueryExecutionContext<T>(
diagnosticsClientContext,
client,
resourceTypeEnum,
resourceType,
query,
cosmosQueryRequestOptions,
resourceLink,
correlatedActivityId,
isContinuationExpected);
if ((ResourceType.Document != resourceTypeEnum && (ResourceType.Conflict != resourceTypeEnum))) {
return Flux.just(queryExecutionContext);
}
return collectionObs.single().flatMap(collectionValueHolder -> {
Mono<Pair<List<Range<String>>, QueryInfo>> queryPlanTask =
getPartitionKeyRangesAndQueryInfo(diagnosticsClientContext,
client,
query,
cosmosQueryRequestOptions,
resourceLink,
collectionValueHolder.v,
queryExecutionContext,
queryPlanCachingEnabled,
queryPlanCache);
return queryPlanTask
.flatMap(queryPlan -> createSpecializedDocumentQueryExecutionContextAsync(diagnosticsClientContext,
client,
resourceTypeEnum,
resourceType,
query,
cosmosQueryRequestOptions,
resourceLink,
isContinuationExpected,
queryPlan.getRight(),
queryPlan.getLeft(),
collectionValueHolder.v.getResourceId(),
correlatedActivityId)
.single());
}).flux();
}
public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext<T>> createSpecializedDocumentQueryExecutionContextAsync(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
Class<T> resourceType,
SqlQuerySpec query,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceLink,
boolean isContinuationExpected,
QueryInfo queryInfo,
List<Range<String>> targetRanges,
String collectionRid,
UUID correlatedActivityId) {
int initialPageSize = Utils.getValueOrDefault(
ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions),
ParallelQueryConfig.ClientInternalPageSize);
BadRequestException validationError = Utils.checkRequestOrReturnException
(initialPageSize > 0 || initialPageSize == -1, "MaxItemCount", "Invalid MaxItemCount %s",
initialPageSize);
if (validationError != null) {
return Flux.error(validationError);
}
boolean getLazyFeedResponse = queryInfo.hasTop();
// We need to compute the optimal initial page size for order-by queries
if (queryInfo.hasOrderBy()) {
int top;
if (queryInfo.hasTop() && (top = queryInfo.getTop()) > 0) {
int pageSizeWithTop = Math.min(
(int)Math.ceil(top / (double)targetRanges.size()) * PageSizeFactorForTop,
top);
if (initialPageSize > 0) {
initialPageSize = Math.min(pageSizeWithTop, initialPageSize);
}
else {
initialPageSize = pageSizeWithTop;
}
}
// TODO: do not support continuation in string format right now
// else if (isContinuationExpected)
// {
// if (initialPageSize < 0)
// {
// initialPageSize = (int)Math.Max(feedOptions.MaxBufferedItemCount,
// ParallelQueryConfig.GetConfig().DefaultMaximumBufferSize);
// }
//
// initialPageSize = Math.Min(
// (int)Math.Ceiling(initialPageSize / (double)targetRanges.Count) * PageSizeFactorForTop,
// initialPageSize);
// }
}
List<FeedRangeEpkImpl> feedRangeEpks = targetRanges.stream().map(FeedRangeEpkImpl::new)
.collect(Collectors.toList());
PipelinedDocumentQueryParams<T> documentQueryParams = new PipelinedDocumentQueryParams<T>(
resourceTypeEnum,
resourceType,
query,
resourceLink,
collectionRid,
getLazyFeedResponse,
isContinuationExpected,
initialPageSize,
queryInfo,
cosmosQueryRequestOptions,
correlatedActivityId,
feedRangeEpks);
return PipelinedDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams);
}
public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext<T>> createReadManyQueryAsync(
DiagnosticsClientContext diagnosticsClientContext, IDocumentQueryClient queryClient, String collectionResourceId, SqlQuerySpec sqlQuery,
Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap, CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceId, String collectionLink, UUID activityId, Class<T> klass,
ResourceType resourceTypeEnum) {
return PipelinedDocumentQueryExecutionContext.createReadManyAsync(diagnosticsClientContext, queryClient,
collectionResourceId, sqlQuery, rangeQueryMap,
cosmosQueryRequestOptions, resourceId,
collectionLink,
activityId, klass,
resourceTypeEnum);
}
}