BulkExecutor.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.batch;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.CosmosDaemonThreadFactory;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.models.CosmosBatchOperationResult;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosItemOperationType;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.function.Tuple2;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
/**
* The Core logic of bulk execution is here.
*
* The actual execution of the flux of operations. It is done in following steps:
* 1. Getting partition key range ID and grouping operations using that id.
* 2. For the flux of operations in a group, adding buffering based on size and a duration.
* 3. For the operation we get in after buffering, process it using a batch request and return
* a wrapper having request, response(if-any) and exception(if-any). Either response or exception will be there.
*
* 4. Any internal retry is done by adding in an intermediate sink for each grouped flux.
* 5. Any operation which failed due to partition key range gone is retried by putting it in the main sink which leads
* to re-calculation of partition key range id.
* 6. At the end and this is very essential, we close all the sinks as the sink continues to waits for more and the
* execution isn't finished even if all the operations have been executed(figured out by completion call of source)
*
* Note: Sink will move to a new interface from 3.5 and this is documentation for it:
* - https://github.com/reactor/reactor-core/blob/master/docs/asciidoc/processors.adoc
*
* For our use case, Sinks.many().unicast() will work.
*/
public final class BulkExecutor<TContext> {
private final static Logger logger = LoggerFactory.getLogger(BulkExecutor.class);
private final static AtomicLong instanceCount = new AtomicLong(0);
private final CosmosAsyncContainer container;
private final AsyncDocumentClient docClientWrapper;
private final String operationContextText;
private final OperationContextAndListenerTuple operationListener;
private final ThrottlingRetryOptions throttlingRetryOptions;
private final Flux<com.azure.cosmos.models.CosmosItemOperation> inputOperations;
// Options for bulk execution.
private final Long maxMicroBatchIntervalInMs;
private final TContext batchContext;
private final ConcurrentMap<String, PartitionScopeThresholds> partitionScopeThresholds;
private final CosmosBulkExecutionOptions cosmosBulkExecutionOptions;
// Handle gone error:
private final AtomicBoolean mainSourceCompleted;
private final AtomicInteger totalCount;
private final Sinks.EmitFailureHandler serializedEmitFailureHandler;
private final Sinks.Many<CosmosItemOperation> mainSink;
private final List<FluxSink<CosmosItemOperation>> groupSinks;
private final ScheduledExecutorService executorService;
private ScheduledFuture<?> scheduledFutureForFlush;
public BulkExecutor(CosmosAsyncContainer container,
Flux<CosmosItemOperation> inputOperations,
CosmosBulkExecutionOptions cosmosBulkOptions) {
checkNotNull(container, "expected non-null container");
checkNotNull(inputOperations, "expected non-null inputOperations");
checkNotNull(cosmosBulkOptions, "expected non-null bulkOptions");
this.cosmosBulkExecutionOptions = cosmosBulkOptions;
this.container = container;
this.inputOperations = inputOperations;
this.docClientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(container.getDatabase());
this.throttlingRetryOptions = docClientWrapper.getConnectionPolicy().getThrottlingRetryOptions();
// Fill the option first, to make the BulkProcessingOptions immutable, as if accessed directly, we might get
// different values when a new group is created.
maxMicroBatchIntervalInMs = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchInterval(cosmosBulkExecutionOptions)
.toMillis();
batchContext = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getLegacyBatchScopedContext(cosmosBulkExecutionOptions);
this.partitionScopeThresholds = ImplementationBridgeHelpers.CosmosBulkExecutionThresholdsStateHelper
.getBulkExecutionThresholdsAccessor()
.getPartitionScopeThresholds(cosmosBulkExecutionOptions.getThresholdsState());
operationListener = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getOperationContext(cosmosBulkExecutionOptions);
if (operationListener != null &&
operationListener.getOperationContext() != null) {
operationContextText = operationListener.getOperationContext().toString();
} else {
operationContextText = "n/a";
}
// Initialize sink for handling gone error.
mainSourceCompleted = new AtomicBoolean(false);
totalCount = new AtomicInteger(0);
serializedEmitFailureHandler = new SerializedEmitFailureHandler();
mainSink = Sinks.many().unicast().onBackpressureBuffer();
groupSinks = new CopyOnWriteArrayList<>();
// The evaluation whether a micro batch should be flushed to the backend happens whenever
// a new ItemOperation arrives. If the batch size is exceeded or the oldest buffered ItemOperation
// exceeds the MicroBatchInterval or the total serialized length exceeds, the micro batch gets flushed to the backend.
// To make sure we flush the buffers at least every maxMicroBatchIntervalInMs we start a timer
// that will trigger artificial ItemOperations that are only used to flush the buffers (and will be
// filtered out before sending requests to the backend)
this.executorService = Executors.newSingleThreadScheduledExecutor(
new CosmosDaemonThreadFactory("BulkExecutor-" + instanceCount.incrementAndGet()));
this.scheduledFutureForFlush = this.executorService.scheduleWithFixedDelay(
this::onFlush,
this.maxMicroBatchIntervalInMs,
this.maxMicroBatchIntervalInMs,
TimeUnit.MILLISECONDS);
}
public Flux<CosmosBulkOperationResponse<TContext>> execute() {
// The groupBy below is running into a hang if the flatMap above is
// not allowing at least a concurrency of the number of unique values
// you groupBy on.
// The groupBy is used to isolate Cosmos physical partitions
// so when there is no config override we enforce that the flatMap is using a concurrency of
// Math.max(default concurrency (256), #of partitions * 2 (to accommodate for some splits))
// The config override can be used by the Spark connector when customers follow best practices and
// repartition the data frame to avoid that each Spark partition contains data spread across all
// physical partitions. When repartitioning the incoming data it is possible to ensure that each
// Spark partition will only target a subset of Cosmos partitions. This will improve the efficiency
// and mean fewer than #of Partitions concurrency will be needed for
// large containers. (with hundreds of physical partitions)
Integer nullableMaxConcurrentCosmosPartitions = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxConcurrentCosmosPartitions(cosmosBulkExecutionOptions);
Mono<Integer> maxConcurrentCosmosPartitionsMono = nullableMaxConcurrentCosmosPartitions != null ?
Mono.just(Math.max(256, nullableMaxConcurrentCosmosPartitions)) :
this.container.getFeedRanges().map(ranges -> Math.max(256, ranges.size() * 2));
return
maxConcurrentCosmosPartitionsMono
.subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.flatMapMany(maxConcurrentCosmosPartitions -> {
logger.debug("BulkExecutor.execute with MaxConcurrentPartitions: {}, Context: {}",
maxConcurrentCosmosPartitions,
this.operationContextText);
return this.inputOperations
.publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.onErrorContinue((throwable, o) ->
logger.error("Skipping an error operation while processing {}. Cause: {}, Context: {}",
o,
throwable.getMessage(),
this.operationContextText))
.doOnNext((CosmosItemOperation cosmosItemOperation) -> {
// Set the retry policy before starting execution. Should only happens once.
BulkExecutorUtil.setRetryPolicyForBulk(
docClientWrapper,
this.container,
cosmosItemOperation,
this.throttlingRetryOptions);
if (cosmosItemOperation != FlushBuffersItemOperation.singleton()) {
totalCount.incrementAndGet();
}
logger.trace(
"SetupRetryPolicy, {}, TotalCount: {}, Context: {}, {}",
getItemOperationDiagnostics(cosmosItemOperation),
totalCount.get(),
this.operationContextText,
getThreadInfo()
);
})
.doOnComplete(() -> {
mainSourceCompleted.set(true);
long totalCountSnapshot = totalCount.get();
logger.debug("Main source completed - # left items {}, Context: {}",
totalCountSnapshot,
this.operationContextText);
if (totalCountSnapshot == 0) {
// This is needed as there can be case that onComplete was called after last element was processed
// So complete the sink here also if count is 0, if source has completed and count isn't zero,
// then the last element in the doOnNext will close it. Sink doesn't mind in case of a double close.
completeAllSinks();
} else {
ScheduledFuture<?> scheduledFutureSnapshot = this.scheduledFutureForFlush;
if (scheduledFutureSnapshot != null) {
try {
scheduledFutureSnapshot.cancel(true);
logger.debug("Cancelled all future scheduled tasks {}", getThreadInfo());
} catch (Exception e) {
logger.warn("Failed to cancel scheduled tasks{}", getThreadInfo(), e);
}
}
this.onFlush();
long flushIntervalAfterDrainingIncomingFlux = Math.min(
this.maxMicroBatchIntervalInMs,
BatchRequestResponseConstants
.DEFAULT_MAX_MICRO_BATCH_INTERVAL_AFTER_DRAINING_INCOMING_FLUX_IN_MILLISECONDS);
this.scheduledFutureForFlush = this.executorService.scheduleWithFixedDelay(
this::onFlush,
flushIntervalAfterDrainingIncomingFlux,
flushIntervalAfterDrainingIncomingFlux,
TimeUnit.MILLISECONDS);
}
})
.mergeWith(mainSink.asFlux())
.subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.flatMap(
operation -> {
logger.trace("Before Resolve PkRangeId, {}, Context: {} {}",
getItemOperationDiagnostics(operation),
this.operationContextText,
getThreadInfo());
// resolve partition key range id again for operations which comes in main sink due to gone retry.
return BulkExecutorUtil.resolvePartitionKeyRangeId(this.docClientWrapper, this.container, operation)
.map((String pkRangeId) -> {
PartitionScopeThresholds partitionScopeThresholds =
this.partitionScopeThresholds.computeIfAbsent(
pkRangeId,
(newPkRangeId) -> new PartitionScopeThresholds(newPkRangeId, this.cosmosBulkExecutionOptions));
logger.trace("Resolved PkRangeId, {}, PKRangeId: {} Context: {} {}",
getItemOperationDiagnostics(operation),
pkRangeId,
this.operationContextText,
getThreadInfo());
return Pair.of(partitionScopeThresholds, operation);
});
})
.groupBy(Pair::getKey, Pair::getValue)
.flatMap(
this::executePartitionedGroup,
maxConcurrentCosmosPartitions)
.subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.doOnNext(requestAndResponse -> {
int totalCountAfterDecrement = totalCount.decrementAndGet();
boolean mainSourceCompletedSnapshot = mainSourceCompleted.get();
if (totalCountAfterDecrement == 0 && mainSourceCompletedSnapshot) {
// It is possible that count is zero but there are more elements in the source.
// Count 0 also signifies that there are no pending elements in any sink.
logger.debug("All work completed, {}, TotalCount: {}, Context: {} {}",
getItemOperationDiagnostics(requestAndResponse.getOperation()),
totalCountAfterDecrement,
this.operationContextText,
getThreadInfo());
completeAllSinks();
} else {
logger.debug(
"Work left - TotalCount after decrement: {}, main sink completed {}, {}, Context: {} {}",
totalCountAfterDecrement,
mainSourceCompletedSnapshot,
getItemOperationDiagnostics(requestAndResponse.getOperation()),
this.operationContextText,
getThreadInfo());
}
})
.doOnComplete(() -> {
int totalCountSnapshot = totalCount.get();
boolean mainSourceCompletedSnapshot = mainSourceCompleted.get();
if (totalCountSnapshot == 0 && mainSourceCompletedSnapshot) {
// It is possible that count is zero but there are more elements in the source.
// Count 0 also signifies that there are no pending elements in any sink.
logger.debug("DoOnComplete: All work completed, Context: {}", this.operationContextText);
completeAllSinks();
} else {
logger.debug(
"DoOnComplete: Work left - TotalCount after decrement: {}, main sink completed {}, Context: {} {}",
totalCountSnapshot,
mainSourceCompletedSnapshot,
this.operationContextText,
getThreadInfo());
}
});
});
}
private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(
GroupedFlux<PartitionScopeThresholds, CosmosItemOperation> partitionedGroupFluxOfInputOperations) {
final PartitionScopeThresholds thresholds = partitionedGroupFluxOfInputOperations.key();
final FluxProcessor<CosmosItemOperation, CosmosItemOperation> groupFluxProcessor =
UnicastProcessor.<CosmosItemOperation>create().serialize();
final FluxSink<CosmosItemOperation> groupSink = groupFluxProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
groupSinks.add(groupSink);
AtomicLong firstRecordTimeStamp = new AtomicLong(-1);
AtomicLong currentMicroBatchSize = new AtomicLong(0);
AtomicInteger currentTotalSerializedLength = new AtomicInteger(0);
return partitionedGroupFluxOfInputOperations
.mergeWith(groupFluxProcessor)
.onBackpressureBuffer()
.timestamp()
.subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.bufferUntil(timeStampItemOperationTuple -> {
long timestamp = timeStampItemOperationTuple.getT1();
CosmosItemOperation itemOperation = timeStampItemOperationTuple.getT2();
logger.trace(
"BufferUntil - enqueued {}, {}, Context: {} {}",
timestamp,
getItemOperationDiagnostics(itemOperation),
this.operationContextText,
getThreadInfo());
if (itemOperation == FlushBuffersItemOperation.singleton()) {
long currentMicroBatchSizeSnapshot = currentMicroBatchSize.get();
if (currentMicroBatchSizeSnapshot > 0) {
logger.trace(
"Flushing PKRange {} (batch size: {}) due to FlushItemOperation, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
currentMicroBatchSizeSnapshot,
this.operationContextText,
getThreadInfo());
firstRecordTimeStamp.set(-1);
currentMicroBatchSize.set(0);
currentTotalSerializedLength.set(0);
return true;
}
// avoid counting flush operations for the micro batch size calculation
return false;
}
firstRecordTimeStamp.compareAndSet(-1, timestamp);
long age = timestamp - firstRecordTimeStamp.get();
long batchSize = currentMicroBatchSize.incrementAndGet();
int totalSerializedLength = this.calculateTotalSerializedLength(currentTotalSerializedLength, itemOperation);
if (batchSize >= thresholds.getTargetMicroBatchSizeSnapshot() ||
age >= this.maxMicroBatchIntervalInMs ||
totalSerializedLength >= BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES) {
logger.debug(
"BufferUntil - Flushing PKRange {} due to BatchSize ({}), payload size ({}) or age ({}), " +
"Triggering {}, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
batchSize,
totalSerializedLength,
age,
getItemOperationDiagnostics(itemOperation),
this.operationContextText,
getThreadInfo());
firstRecordTimeStamp.set(-1);
currentMicroBatchSize.set(0);
currentTotalSerializedLength.set(0);
return true;
}
return false;
})
.flatMap(
(List<Tuple2<Long, CosmosItemOperation>> timeStampAndItemOperationTuples) -> {
List<CosmosItemOperation> operations = new ArrayList<>(timeStampAndItemOperationTuples.size());
for (Tuple2<Long, CosmosItemOperation> timeStampAndItemOperationTuple :
timeStampAndItemOperationTuples) {
CosmosItemOperation itemOperation = timeStampAndItemOperationTuple.getT2();
if (itemOperation == FlushBuffersItemOperation.singleton()) {
continue;
}
operations.add(itemOperation);
}
logger.debug(
"Flushing PKRange {} micro batch with {} operations, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
operations.size(),
this.operationContextText,
getThreadInfo());
return executeOperations(operations, thresholds, groupSink);
},
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchConcurrency(this.cosmosBulkExecutionOptions));
}
private int calculateTotalSerializedLength(AtomicInteger currentTotalSerializedLength, CosmosItemOperation item) {
if (item instanceof CosmosItemOperationBase) {
return currentTotalSerializedLength.accumulateAndGet(
((CosmosItemOperationBase) item).getSerializedLength(),
(currentValue, incremental) -> currentValue + incremental);
}
return currentTotalSerializedLength.get();
}
private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(
List<CosmosItemOperation> operations,
PartitionScopeThresholds thresholds,
FluxSink<CosmosItemOperation> groupSink) {
if (operations.size() == 0) {
logger.trace("Empty operations list, Context: {}", this.operationContextText);
return Flux.empty();
}
String pkRange = thresholds.getPartitionKeyRangeId();
ServerOperationBatchRequest serverOperationBatchRequest =
BulkExecutorUtil.createBatchRequest(operations, pkRange);
if (serverOperationBatchRequest.getBatchPendingOperations().size() > 0) {
serverOperationBatchRequest.getBatchPendingOperations().forEach(groupSink::next);
}
return Flux.just(serverOperationBatchRequest.getBatchRequest())
.publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.flatMap((PartitionKeyRangeServerBatchRequest serverRequest) ->
this.executePartitionKeyRangeServerBatchRequest(serverRequest, groupSink, thresholds));
}
private Flux<CosmosBulkOperationResponse<TContext>> executePartitionKeyRangeServerBatchRequest(
PartitionKeyRangeServerBatchRequest serverRequest,
FluxSink<CosmosItemOperation> groupSink,
PartitionScopeThresholds thresholds) {
return this.executeBatchRequest(serverRequest)
.subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.flatMapMany(response ->
Flux
.fromIterable(response.getResults())
.publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.flatMap((CosmosBatchOperationResult result) ->
handleTransactionalBatchOperationResult(response, result, groupSink, thresholds)))
.onErrorResume((Throwable throwable) -> {
if (!(throwable instanceof Exception)) {
throw Exceptions.propagate(throwable);
}
Exception exception = (Exception) throwable;
return Flux
.fromIterable(serverRequest.getOperations())
.publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC)
.flatMap((CosmosItemOperation itemOperation) ->
handleTransactionalBatchExecutionException(itemOperation, exception, groupSink, thresholds));
});
}
// Helper functions
private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchOperationResult(
CosmosBatchResponse response,
CosmosBatchOperationResult operationResult,
FluxSink<CosmosItemOperation> groupSink,
PartitionScopeThresholds thresholds) {
CosmosBulkItemResponse cosmosBulkItemResponse = ModelBridgeInternal
.createCosmosBulkItemResponse(operationResult, response);
CosmosItemOperation itemOperation = operationResult.getOperation();
TContext actualContext = this.getActualContext(itemOperation);
logger.debug(
"HandleTransactionalBatchOperationResult - PKRange {}, Response Status Code {}, " +
"Operation Status Code, {}, {}, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
response.getStatusCode(),
operationResult.getStatusCode(),
getItemOperationDiagnostics(itemOperation),
this.operationContextText,
getThreadInfo());
if (!operationResult.isSuccessStatusCode()) {
if (itemOperation instanceof ItemBulkOperation<?, ?>) {
ItemBulkOperation<?, ?> itemBulkOperation = (ItemBulkOperation<?, ?>) itemOperation;
return itemBulkOperation.getRetryPolicy().shouldRetry(operationResult).flatMap(
result -> {
if (result.shouldRetry) {
logger.debug(
"HandleTransactionalBatchOperationResult - enqueue retry, PKRange {}, Response " +
"Status Code {}, Operation Status Code, {}, {}, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
response.getStatusCode(),
operationResult.getStatusCode(),
getItemOperationDiagnostics(itemOperation),
this.operationContextText,
getThreadInfo());
return this.enqueueForRetry(result.backOffTime, groupSink, itemOperation, thresholds);
} else {
logger.error(
"HandleTransactionalBatchOperationResult - Fail, PKRange {}, Response Status " +
"Code {}, Operation Status Code {}, {}, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
response.getStatusCode(),
operationResult.getStatusCode(),
getItemOperationDiagnostics(itemOperation),
this.operationContextText,
getThreadInfo());
return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(
itemOperation, cosmosBulkItemResponse, actualContext));
}
});
} else {
throw new UnsupportedOperationException("Unknown CosmosItemOperation.");
}
}
thresholds.recordSuccessfulOperation();
return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(
itemOperation,
cosmosBulkItemResponse,
actualContext));
}
private TContext getActualContext(CosmosItemOperation itemOperation) {
ItemBulkOperation<?, ?> itemBulkOperation = null;
if (itemOperation instanceof ItemBulkOperation<?, ?>) {
itemBulkOperation = (ItemBulkOperation<?, ?>) itemOperation;
}
if (itemBulkOperation == null) {
return this.batchContext;
}
TContext operationContext = itemBulkOperation.getContext();
if (operationContext != null) {
return operationContext;
}
return this.batchContext;
}
private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExecutionException(
CosmosItemOperation itemOperation,
Exception exception,
FluxSink<CosmosItemOperation> groupSink,
PartitionScopeThresholds thresholds) {
logger.debug(
"HandleTransactionalBatchExecutionException, PKRange {}, Error: {}, {}, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
exception,
getItemOperationDiagnostics(itemOperation),
this.operationContextText,
getThreadInfo());
if (exception instanceof CosmosException && itemOperation instanceof ItemBulkOperation<?, ?>) {
CosmosException cosmosException = (CosmosException) exception;
ItemBulkOperation<?, ?> itemBulkOperation = (ItemBulkOperation<?, ?>) itemOperation;
// First check if it failed due to split, so the operations need to go in a different pk range group. So
// add it in the mainSink.
return itemBulkOperation.getRetryPolicy()
.shouldRetryForGone(cosmosException.getStatusCode(), cosmosException.getSubStatusCode())
.flatMap(shouldRetryGone -> {
if (shouldRetryGone) {
logger.debug(
"HandleTransactionalBatchExecutionException - Retry due to split, PKRange {}, Error: " +
"{}, {}, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
exception,
getItemOperationDiagnostics(itemOperation),
this.operationContextText,
getThreadInfo());
// retry - but don't mark as enqueued for retry in thresholds
mainSink.emitNext(itemOperation, serializedEmitFailureHandler);
return Mono.empty();
} else {
logger.debug(
"HandleTransactionalBatchExecutionException - Retry other, PKRange {}, Error: " +
"{}, {}, Context: {} {}",
thresholds.getPartitionKeyRangeId(),
exception,
getItemOperationDiagnostics(itemOperation),
this.operationContextText,
getThreadInfo());
return retryOtherExceptions(
itemOperation,
exception,
groupSink,
cosmosException,
itemBulkOperation,
thresholds);
}
});
}
TContext actualContext = this.getActualContext(itemOperation);
return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, actualContext));
}
private Mono<CosmosBulkOperationResponse<TContext>> enqueueForRetry(
Duration backOffTime,
FluxSink<CosmosItemOperation> groupSink,
CosmosItemOperation itemOperation,
PartitionScopeThresholds thresholds) {
thresholds.recordEnqueuedRetry();
if (backOffTime == null || backOffTime.isZero()) {
groupSink.next(itemOperation);
return Mono.empty();
} else {
return Mono
.delay(backOffTime)
.flatMap((dummy) -> {
groupSink.next(itemOperation);
return Mono.empty();
});
}
}
private Mono<CosmosBulkOperationResponse<TContext>> retryOtherExceptions(
CosmosItemOperation itemOperation,
Exception exception,
FluxSink<CosmosItemOperation> groupSink,
CosmosException cosmosException,
ItemBulkOperation<?, ?> itemBulkOperation,
PartitionScopeThresholds thresholds) {
TContext actualContext = this.getActualContext(itemOperation);
return itemBulkOperation.getRetryPolicy().shouldRetry(cosmosException).flatMap(result -> {
if (result.shouldRetry) {
return this.enqueueForRetry(result.backOffTime, groupSink, itemBulkOperation, thresholds);
} else {
return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(
itemOperation, exception, actualContext));
}
});
}
private Mono<CosmosBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) {
RequestOptions options = new RequestOptions();
options.setOperationContextAndListenerTuple(operationListener);
// The request options here are used for the BulkRequest exchanged with the service
// If contentResponseOnWrite is not enabled here (or at the client level) the
// service will not even send a bulk response payload - so all the
// CosmosBulItemRequestOptions are irrelevant - all payloads will be null
// Instead we should automatically enforce contentResponseOnWrite for all
// bulk requests whenever at least one of the item operations requires a content response (either
// because it is a read operation or because contentResponseOnWrite was enabled explicitly)
if (!this.docClientWrapper.isContentResponseOnWriteEnabled() &&
serverRequest.getOperations().size() > 0) {
for (CosmosItemOperation itemOperation : serverRequest.getOperations()) {
if (itemOperation instanceof ItemBulkOperation<?, ?>) {
ItemBulkOperation<?, ?> itemBulkOperation = (ItemBulkOperation<?, ?>) itemOperation;
if (itemBulkOperation.getOperationType() == CosmosItemOperationType.READ ||
(itemBulkOperation.getRequestOptions() != null &&
itemBulkOperation.getRequestOptions().isContentResponseOnWriteEnabled() != null &&
itemBulkOperation.getRequestOptions().isContentResponseOnWriteEnabled().booleanValue())) {
options.setContentResponseOnWriteEnabled(true);
break;
}
}
}
}
return this.docClientWrapper.executeBatchRequest(
BridgeInternal.getLink(this.container), serverRequest, options, false);
}
private void completeAllSinks() {
logger.info("Closing all sinks, Context: {}", this.operationContextText);
executorService.shutdown();
logger.debug("Executor service shut down, Context: {}", this.operationContextText);
mainSink.tryEmitComplete();
logger.debug("Main sink completed, Context: {}", this.operationContextText);
groupSinks.forEach(FluxSink::complete);
logger.debug("All group sinks completed, Context: {}", this.operationContextText);
try {
this.executorService.shutdown();
logger.debug("Shutting down the executor service");
} catch (Exception e) {
logger.warn("Failed to shut down the executor service", e);
}
}
private void onFlush() {
try {
this.groupSinks.forEach(sink -> sink.next(FlushBuffersItemOperation.singleton()));
} catch(Throwable t) {
logger.error("Callback invocation 'onFlush' failed.", t);
}
}
private static String getItemOperationDiagnostics(CosmosItemOperation operation) {
if (operation == FlushBuffersItemOperation.singleton()) {
return "ItemOperation[Type: Flush]";
}
StringBuilder sb = new StringBuilder();
sb
.append("ItemOperation[Type: ")
.append(operation.getOperationType().toString())
.append(", PK: ")
.append(operation.getPartitionKeyValue() != null ? operation.getPartitionKeyValue().toString() : "n/a")
.append(", id: ")
.append(operation.getId())
.append("]");
return sb.toString();
}
private static String getThreadInfo() {
StringBuilder sb = new StringBuilder();
Thread t = Thread.currentThread();
sb
.append("Thread[")
.append("Name: ")
.append(t.getName())
.append(",Group: ")
.append(t.getThreadGroup() != null ? t.getThreadGroup().getName() : "n/a")
.append(", isDaemon: ")
.append(t.isDaemon())
.append(", Id: ")
.append(t.getId())
.append("]");
return sb.toString();
}
private class SerializedEmitFailureHandler implements Sinks.EmitFailureHandler {
@Override
public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
if (emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED)) {
logger.debug("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", signalType, emitResult);
return true;
}
logger.error("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", signalType, emitResult);
return false;
}
}
}