QueryPlanRetriever.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.implementation.BackoffRetryUtility;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RuntimeConstants;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
class QueryPlanRetriever {
private static final String TRUE = "True";
private static final String SUPPORTED_QUERY_FEATURES = QueryFeature.Aggregate.name() + ", " +
QueryFeature.CompositeAggregate.name() + ", " +
QueryFeature.MultipleOrderBy.name() + ", " +
QueryFeature.MultipleAggregates.name() + ", " +
QueryFeature.OrderBy.name() + ", " +
QueryFeature.OffsetAndLimit.name() + ", " +
QueryFeature.Distinct.name() + ", " +
QueryFeature.GroupBy.name() + ", " +
QueryFeature.Top.name() + ", " +
QueryFeature.DCount.name() + ", " +
QueryFeature.NonValueAggregate.name();
static Mono<PartitionedQueryExecutionInfo> getQueryPlanThroughGatewayAsync(DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient queryClient,
SqlQuerySpec sqlQuerySpec,
String resourceLink,
PartitionKey partitionKey) {
final Map<String, String> requestHeaders = new HashMap<>();
requestHeaders.put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON);
requestHeaders.put(HttpConstants.HttpHeaders.IS_QUERY_PLAN_REQUEST, TRUE);
requestHeaders.put(HttpConstants.HttpHeaders.SUPPORTED_QUERY_FEATURES, SUPPORTED_QUERY_FEATURES);
requestHeaders.put(HttpConstants.HttpHeaders.QUERY_VERSION, HttpConstants.Versions.QUERY_VERSION);
if (partitionKey != null && partitionKey != PartitionKey.NONE) {
PartitionKeyInternal partitionKeyInternal = BridgeInternal.getPartitionKeyInternal(partitionKey);
requestHeaders.put(HttpConstants.HttpHeaders.PARTITION_KEY, partitionKeyInternal.toJson());
}
final RxDocumentServiceRequest request = RxDocumentServiceRequest.create(diagnosticsClientContext,
OperationType.QueryPlan,
ResourceType.Document,
resourceLink,
requestHeaders);
request.UseGatewayMode = true;
request.setByteBuffer(ModelBridgeInternal.serializeJsonToByteBuffer(sqlQuerySpec));
final DocumentClientRetryPolicy retryPolicyInstance =
queryClient.getResetSessionTokenRetryPolicy().getRequestPolicy();
Function<RxDocumentServiceRequest, Mono<PartitionedQueryExecutionInfo>> executeFunc = req -> {
return BackoffRetryUtility.executeRetry(() -> {
retryPolicyInstance.onBeforeSendRequest(req);
return queryClient.executeQueryAsync(request).flatMap(rxDocumentServiceResponse -> {
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo =
new PartitionedQueryExecutionInfo(rxDocumentServiceResponse.getResponseBodyAsByteArray(), rxDocumentServiceResponse.getGatewayHttpRequestTimeline());
return Mono.just(partitionedQueryExecutionInfo);
});
}, retryPolicyInstance);
};
return executeFunc.apply(request);
}
}