ResourceThrottleRetryPolicy.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;
import com.azure.cosmos.CosmosException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.time.Duration;
/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
*/
public class ResourceThrottleRetryPolicy extends DocumentClientRetryPolicy {
private final static Logger logger = LoggerFactory.getLogger(ResourceThrottleRetryPolicy.class);
private final static Duration DEFAULT_MAX_WAIT_TIME_IN_SECONDS = Duration.ofSeconds(60);
private final static Duration DEFAULT_RETRY_IN_SECONDS = Duration.ofSeconds(5);
private final int backoffDelayFactor;
private final int maxAttemptCount;
private final Duration maxWaitTime;
// TODO: is this thread safe?
// should we make this atomic int?
private int currentAttemptCount;
private Duration cumulativeRetryDelay;
private RetryContext retryContext;
private final boolean retryOnClientSideThrottledBatchRequests;
public ResourceThrottleRetryPolicy(
int maxAttemptCount,
Duration maxWaitTime,
RetryContext retryContext,
boolean retryOnClientSideThrottledBatchRequests) {
this(maxAttemptCount, maxWaitTime, retryOnClientSideThrottledBatchRequests);
this.retryContext = retryContext;
}
public ResourceThrottleRetryPolicy(
int maxAttemptCount,
Duration maxWaitTime,
boolean retryOnClientSideThrottledBatchRequests) {
this(maxAttemptCount, maxWaitTime, 1, retryOnClientSideThrottledBatchRequests);
}
public ResourceThrottleRetryPolicy(int maxAttemptCount, boolean retryOnClientSideThrottledBatchRequests) {
this(
maxAttemptCount,
DEFAULT_MAX_WAIT_TIME_IN_SECONDS,
1,
retryOnClientSideThrottledBatchRequests);
}
public ResourceThrottleRetryPolicy(
int maxAttemptCount,
Duration maxWaitTime,
int backoffDelayFactor,
boolean retryOnClientSideThrottledBatchRequests) {
Utils.checkStateOrThrow(maxWaitTime.getSeconds() <= Integer.MAX_VALUE / 1000, "maxWaitTime", "maxWaitTime must not be larger than " + Integer.MAX_VALUE / 1000);
this.maxAttemptCount = maxAttemptCount;
this.backoffDelayFactor = backoffDelayFactor;
this.maxWaitTime = maxWaitTime;
this.currentAttemptCount = 0;
this.cumulativeRetryDelay = Duration.ZERO;
this.retryOnClientSideThrottledBatchRequests = retryOnClientSideThrottledBatchRequests;
}
@Override
public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
Duration retryDelay = Duration.ZERO;
CosmosException dce = Utils.as(exception, CosmosException.class);
if (dce == null || !Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.TOO_MANY_REQUESTS)) {
logger.debug(
"Operation will NOT be retried - not a throttled request. Current attempt {}",
this.currentAttemptCount,
exception);
return Mono.just(ShouldRetryResult.noRetryOnNonRelatedException());
}
if (!retryOnClientSideThrottledBatchRequests &&
dce.getSubStatusCode() == HttpConstants.SubStatusCodes.THROUGHPUT_CONTROL_BULK_REQUEST_RATE_TOO_LARGE) {
return Mono.just(ShouldRetryResult.noRetry());
}
if (this.currentAttemptCount < this.maxAttemptCount &&
(retryDelay = checkIfRetryNeeded(dce)) != null) {
this.currentAttemptCount++;
logger.debug(
"Operation will be retried after {} milliseconds. Current attempt {}, Cumulative delay {}",
retryDelay.toMillis(),
this.currentAttemptCount,
this.cumulativeRetryDelay,
exception);
return Mono.just(ShouldRetryResult.retryAfter(retryDelay));
} else {
if (retryDelay != null) {
logger.warn(
"Operation will NOT be retried. Current attempt {}",
this.currentAttemptCount,
exception);
} else {
logger.debug(
"Operation will NOT be retried - not a throttled request. Current attempt {}",
this.currentAttemptCount,
exception);
}
return Mono.just(ShouldRetryResult.noRetry());
}
}
@Override
public void onBeforeSendRequest(RxDocumentServiceRequest request) {
// no op
}
@Override
public RetryContext getRetryContext() {
return this.retryContext;
}
// if retry not needed reaturns null
/// <summary>
/// Returns True if the given exception <paramref name="exception"/> is retriable
/// </summary>
/// <param name="exception">Exception to examine</param>
/// <param name="retryDelay">retryDelay</param>
/// <returns>True if the exception is retriable; False otherwise</returns>
private Duration checkIfRetryNeeded(CosmosException dce) {
Duration retryDelay = Duration.ZERO;
if (dce != null){
if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.TOO_MANY_REQUESTS)) {
retryDelay = dce.getRetryAfterDuration();
if (this.backoffDelayFactor > 1) {
retryDelay = Duration.ofNanos(retryDelay.toNanos() * this.backoffDelayFactor);
}
if (retryDelay.toMillis() < this.maxWaitTime.toMillis() &&
this.maxWaitTime.toMillis() >= (this.cumulativeRetryDelay = retryDelay.plus(this.cumulativeRetryDelay)).toMillis())
{
if (retryDelay == Duration.ZERO){
// we should never reach here as BE should turn non-zero of retryDelay
logger.trace("Received retryDelay of 0 with Http 429", dce);
retryDelay = DEFAULT_RETRY_IN_SECONDS;
}
return retryDelay;
}
}
}
// if retry not needed returns null
return null;
}
}