BulkExecutorUtil.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.batch;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ResourceThrottleRetryPolicy;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.models.CosmosBatchOperationResult;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosItemOperationType;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
import static com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper.getEffectivePartitionKeyString;
final class BulkExecutorUtil {
static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation> operations, String partitionKeyRangeId) {
return PartitionKeyRangeServerBatchRequest.createBatchRequest(
partitionKeyRangeId,
operations,
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
}
static void setRetryPolicyForBulk(
AsyncDocumentClient docClientWrapper,
CosmosAsyncContainer container,
CosmosItemOperation cosmosItemOperation,
ThrottlingRetryOptions throttlingRetryOptions) {
if (cosmosItemOperation instanceof ItemBulkOperation<?, ?>) {
final ItemBulkOperation<?, ?> itemBulkOperation = (ItemBulkOperation<?, ?>) cosmosItemOperation;
ResourceThrottleRetryPolicy resourceThrottleRetryPolicy = new ResourceThrottleRetryPolicy(
throttlingRetryOptions.getMaxRetryAttemptsOnThrottledRequests(),
throttlingRetryOptions.getMaxRetryWaitTime(),
true);
BulkOperationRetryPolicy bulkRetryPolicy = new BulkOperationRetryPolicy(
docClientWrapper.getCollectionCache(),
docClientWrapper.getPartitionKeyRangeCache(),
BridgeInternal.getLink(container),
resourceThrottleRetryPolicy);
itemBulkOperation.setRetryPolicy(bulkRetryPolicy);
} else {
throw new UnsupportedOperationException("Unknown CosmosItemOperation.");
}
}
static Map<String, String> getResponseHeadersFromBatchOperationResult(CosmosBatchOperationResult result) {
final Map<String, String> headers = new HashMap<>();
headers.put(HttpConstants.HttpHeaders.SUB_STATUS, String.valueOf(result.getSubStatusCode()));
headers.put(HttpConstants.HttpHeaders.E_TAG, result.getETag());
headers.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, String.valueOf(result.getRequestCharge()));
if (result.getRetryAfterDuration() != null) {
headers.put(HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS, String.valueOf(result.getRetryAfterDuration().toMillis()));
}
return headers;
}
/**
* Resolve partition key range id of a operation and set the partition key json value in operation.
*
* TODO(rakkuma): metaDataDiagnosticContext is passed null in tryLookupAsync function. Fix it while adding
* support for an operation wise Diagnostic. The value here should be merged in the individual diagnostic.
* Issue: https://github.com/Azure/azure-sdk-for-java/issues/17647
*/
static Mono<String> resolvePartitionKeyRangeId(
AsyncDocumentClient docClientWrapper,
CosmosAsyncContainer container,
CosmosItemOperation operation) {
checkNotNull(operation, "expected non-null operation");
AtomicReference<DocumentCollection> collectionBeforeRecreation = new AtomicReference<>(null);
if (operation instanceof ItemBulkOperation<?, ?>) {
final ItemBulkOperation<?, ?> itemBulkOperation = (ItemBulkOperation<?, ?>) operation;
final Mono<String> pkRangeIdMono = Mono.defer(() ->
BulkExecutorUtil.getCollectionInfoAsync(docClientWrapper, container, collectionBeforeRecreation.get())
.flatMap(collection -> {
final PartitionKeyDefinition definition = collection.getPartitionKey();
final PartitionKeyInternal partitionKeyInternal = getPartitionKeyInternal(operation, definition);
itemBulkOperation.setPartitionKeyJson(partitionKeyInternal.toJson());
return docClientWrapper.getPartitionKeyRangeCache()
.tryLookupAsync(null, collection.getResourceId(), null, null)
.map((Utils.ValueHolder<CollectionRoutingMap> routingMap) -> {
if (routingMap.v == null) {
collectionBeforeRecreation.set(collection);
throw new CollectionRoutingMapNotFoundException(
String.format(
"No collection routing map found for container %s(%s) in database %s.",
container.getId(),
collection.getResourceId(),
container.getDatabase().getId())
);
}
return routingMap.v.getRangeByEffectivePartitionKey(
getEffectivePartitionKeyString(
partitionKeyInternal,
definition)).getId();
});
}))
.retryWhen(Retry
.fixedDelay(
BatchRequestResponseConstants.MAX_COLLECTION_RECREATION_RETRY_COUNT,
Duration.ofSeconds(
BatchRequestResponseConstants.MAX_COLLECTION_RECREATION_REFRESH_INTERVAL_IN_SECONDS))
.filter(t -> t instanceof CollectionRoutingMapNotFoundException)
.doBeforeRetry((retrySignal) -> docClientWrapper
.getCollectionCache()
.refresh(
null,
Utils.getCollectionName(BridgeInternal.getLink(container)),
null)
)
);
return pkRangeIdMono;
} else {
throw new UnsupportedOperationException("Unknown CosmosItemOperation.");
}
}
private static PartitionKeyInternal getPartitionKeyInternal(
final CosmosItemOperation operation,
final PartitionKeyDefinition partitionKeyDefinition) {
checkNotNull(operation, "expected non-null operation");
final PartitionKey partitionKey = operation.getPartitionKeyValue();
if (partitionKey == null) {
return ModelBridgeInternal.getNonePartitionKey(partitionKeyDefinition);
} else {
return BridgeInternal.getPartitionKeyInternal(partitionKey);
}
}
/**
* TODO(rakkuma): metaDataDiagnosticContext is passed null in resolveByNameAsync function. Fix it while adding
* support for an operation wise Diagnostic. The value here should be merged in the individual diagnostic.
* Issue: https://github.com/Azure/azure-sdk-for-java/issues/17647
*/
private static Mono<DocumentCollection> getCollectionInfoAsync(
AsyncDocumentClient documentClient,
CosmosAsyncContainer container,
DocumentCollection obsoleteValue) {
// Utils.joinPath sanitizes the path and make sure it ends with a single '/'.
final String resourceAddress = Utils.joinPath(BridgeInternal.getLink(container), null);
final RxClientCollectionCache clientCollectionCache = documentClient.getCollectionCache();
return clientCollectionCache
.resolveByNameAsync(
null,
resourceAddress,
null,
obsoleteValue);
}
static boolean isWriteOperation(CosmosItemOperationType cosmosItemOperationType) {
return cosmosItemOperationType == CosmosItemOperationType.CREATE ||
cosmosItemOperationType == CosmosItemOperationType.REPLACE ||
cosmosItemOperationType == CosmosItemOperationType.UPSERT ||
cosmosItemOperationType == CosmosItemOperationType.DELETE ||
cosmosItemOperationType == CosmosItemOperationType.PATCH;
}
static class CollectionRoutingMapNotFoundException extends CosmosException {
private static final long serialVersionUID = 1L;
/**
* Instantiates a new Invalid partition exception.
*
* @param msg the msg
*/
public CollectionRoutingMapNotFoundException(String msg) {
super(HttpConstants.StatusCodes.NOTFOUND, msg);
setSubStatus();
}
private void setSubStatus() {
this.getResponseHeaders().put(
WFConstants.BackendHeaders.SUB_STATUS,
Integer.toString(HttpConstants.SubStatusCodes.INCORRECT_CONTAINER_RID_SUB_STATUS));
}
}
}