PartitionScopeThresholds.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.batch;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
public class PartitionScopeThresholds {
private final static Logger logger = LoggerFactory.getLogger(PartitionScopeThresholds.class);
private final String pkRangeId;
private final CosmosBulkExecutionOptions options;
private final AtomicInteger targetMicroBatchSize;
private final AtomicLong totalOperationCount;
private final AtomicReference<CurrentIntervalThresholds> currentThresholds;
private final String identifier = UUID.randomUUID().toString();
private final double minRetryRate;
private final double maxRetryRate;
private final double avgRetryRate;
public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptions options) {
checkNotNull(pkRangeId, "expected non-null pkRangeId");
checkNotNull(options, "expected non-null options");
this.pkRangeId = pkRangeId;
this.options = options;
this.targetMicroBatchSize = new AtomicInteger(
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchSize(options));
this.totalOperationCount = new AtomicLong(0);
this.currentThresholds = new AtomicReference<>(new CurrentIntervalThresholds());
this.minRetryRate = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMinTargetedMicroBatchRetryRate(options);
this.maxRetryRate = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxTargetedMicroBatchRetryRate(options);
this.avgRetryRate = ((this.maxRetryRate + this.minRetryRate)/2);
}
public String getPartitionKeyRangeId() {
return this.pkRangeId;
}
private Pair<Boolean, Boolean> shouldReevaluateThresholds(long totalSnapshot, long currentSnapshot) {
if (totalSnapshot < 1_000) {
return Pair.of(currentSnapshot == 100, false);
}
if (totalSnapshot < 10_000) {
return Pair.of(currentSnapshot == 1_000, false);
}
return Pair.of(currentSnapshot % 1_000 == 0, currentSnapshot % 10_000 == 0);
}
private void recordOperation(boolean isRetry) {
long totalSnapshot = this.totalOperationCount.incrementAndGet();
CurrentIntervalThresholds currentThresholdsSnapshot = this.currentThresholds.get();
long currentTotalCountSnapshot = currentThresholdsSnapshot.currentOperationCount.incrementAndGet();
long currentRetryCountSnapshot;
if (isRetry) {
currentRetryCountSnapshot = currentThresholdsSnapshot.currentRetriedOperationCount.incrementAndGet();
} else {
currentRetryCountSnapshot = currentThresholdsSnapshot.currentRetriedOperationCount.get();
}
Pair<Boolean, Boolean> shouldReevaluateResult =
this.shouldReevaluateThresholds(totalSnapshot, currentTotalCountSnapshot);
boolean shouldReevaluate = shouldReevaluateResult.getLeft();
if (shouldReevaluate) {
boolean onlyUpscale = shouldReevaluateResult.getRight();
if (onlyUpscale ||
this.currentThresholds.compareAndSet(currentThresholdsSnapshot, new CurrentIntervalThresholds())) {
this.reevaluateThresholds(
totalSnapshot,
currentTotalCountSnapshot,
currentRetryCountSnapshot,
shouldReevaluateResult.getRight());
}
}
}
private void reevaluateThresholds(
long totalCount,
long currentCount,
long retryCount,
boolean onlyUpscale) {
double retryRate = currentCount == 0 ? 0 : (double)retryCount / currentCount;
int microBatchSizeBefore = this.targetMicroBatchSize.get();
int microBatchSizeAfter = microBatchSizeBefore;
int maxMicroBatchSize = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchSize(options);
if (retryRate < this.minRetryRate && microBatchSizeBefore < maxMicroBatchSize) {
int targetedNewBatchSize = Math.min(
Math.min(
microBatchSizeBefore * 2,
microBatchSizeBefore + (int)(maxMicroBatchSize * this.avgRetryRate)),
maxMicroBatchSize);
if (this.targetMicroBatchSize.compareAndSet(microBatchSizeBefore, targetedNewBatchSize)) {
microBatchSizeAfter = targetedNewBatchSize;
}
} else if (!onlyUpscale && retryRate > this.maxRetryRate && microBatchSizeBefore > 1) {
double deltaRate = retryRate - this.avgRetryRate;
int targetedNewBatchSize = Math.max(1, (int) (microBatchSizeBefore * (1 - deltaRate)));
if (this.targetMicroBatchSize.compareAndSet(microBatchSizeBefore, targetedNewBatchSize)) {
microBatchSizeAfter = targetedNewBatchSize;
}
}
logger.debug(
"Reevaluated thresholds for PKRange '{}#{}' (TotalCount: {}, CurrentCount: {}, CurrentRetryCount: {}, " +
"CurrentRetryRate: {} - BatchSize {} -> {}, OnlyUpscale: {})",
this.pkRangeId,
this.identifier,
totalCount,
currentCount,
retryCount,
retryRate,
microBatchSizeBefore,
microBatchSizeAfter,
onlyUpscale);
}
public void recordSuccessfulOperation() {
this.recordOperation(false);
}
public void recordEnqueuedRetry() {
this.recordOperation(true);
}
public int getTargetMicroBatchSizeSnapshot() {
return this.targetMicroBatchSize.get();
}
private static class CurrentIntervalThresholds {
public final AtomicLong currentOperationCount = new AtomicLong(0);
public final AtomicLong currentRetriedOperationCount = new AtomicLong(0);
}
}