GoneAndRetryWithRetryPolicy.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.IRetryPolicy;
import com.azure.cosmos.implementation.InvalidPartitionException;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.Quadruple;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RetryWithException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ThreadLocalRandom;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class GoneAndRetryWithRetryPolicy implements IRetryPolicy {

    private final static Logger logger = LoggerFactory.getLogger(GoneAndRetryWithRetryPolicy.class);
    private final GoneRetryPolicy goneRetryPolicy;
    private final RetryWithRetryPolicy retryWithRetryPolicy;
    private final Instant start;
    private volatile Instant end;

    private volatile RetryWithException lastRetryWithException;
    private RetryContext retryContext;
    private static final ThreadLocalRandom random = ThreadLocalRandom.current();

    public GoneAndRetryWithRetryPolicy(RxDocumentServiceRequest request, Integer waitTimeInSeconds) {
        this.retryContext = BridgeInternal.getRetryContext(request.requestContext.cosmosDiagnostics);
        this.goneRetryPolicy = new GoneRetryPolicy(
            request,
            waitTimeInSeconds,
            this.retryContext
        );
        this.retryWithRetryPolicy = new RetryWithRetryPolicy(
            waitTimeInSeconds, this.retryContext);
        this.start = Instant.now();
    }

    @Override
    public Mono<ShouldRetryResult> shouldRetry(Exception exception) {

        return this.retryWithRetryPolicy.shouldRetry(exception)
                                        .flatMap((retryWithResult) -> {
            if (!retryWithResult.nonRelatedException) {
                return Mono.just(retryWithResult);
            }

            // only pass request to gone retry policy if retryWithRetryPolicy can not handle the exception.
            return this.goneRetryPolicy.shouldRetry(exception)
                .flatMap((goneRetryResult) -> {
                    if (!goneRetryResult.shouldRetry) {
                        logger.debug("Operation will NOT be retried. Exception:",
                            exception);
                        this.end = Instant.now();
                    }

                    return Mono.just(goneRetryResult);
                });
        });
    }

    @Override
    public RetryContext getRetryContext() {
        return this.retryContext;
    }

    private Duration getElapsedTime() {
        Instant endSnapshot = this.end != null ? this.end : Instant.now();

        return Duration.between(this.start, endSnapshot);
    }

    class GoneRetryPolicy implements IRetryPolicy {
        private final static int DEFAULT_WAIT_TIME_IN_SECONDS = 30;
        private final static int MAXIMUM_BACKOFF_TIME_IN_SECONDS = 15;
        private final static int INITIAL_BACKOFF_TIME = 1;
        private final static int BACK_OFF_MULTIPLIER = 2;

        private final RxDocumentServiceRequest request;
        private volatile int attemptCount = 1;
        private volatile int attemptCountInvalidPartition = 1;
        private volatile int currentBackoffSeconds = GoneRetryPolicy.INITIAL_BACKOFF_TIME;
        private final int waitTimeInSeconds;
        private RetryContext retryContext;

        public GoneRetryPolicy(
            RxDocumentServiceRequest request,
            Integer waitTimeInSeconds,
            RetryContext retryContext) {

            checkNotNull(request, "request must not be null.");
            this.request = request;
            this.waitTimeInSeconds = waitTimeInSeconds != null ? waitTimeInSeconds : DEFAULT_WAIT_TIME_IN_SECONDS;
            this.retryContext = retryContext;
        }

        private boolean isNonRetryableException(Exception exception) {
            if (exception instanceof GoneException ||
                exception instanceof PartitionIsMigratingException ||
                exception instanceof PartitionKeyRangeIsSplittingException) {

                return false;
            }

            if (exception instanceof InvalidPartitionException) {
                return this.request.getPartitionKeyRangeIdentity() != null &&
                    this.request.getPartitionKeyRangeIdentity().getCollectionRid() != null;
            }

            return true;
        }

        private CosmosException logAndWrapExceptionWithLastRetryWithException(Exception exception) {
            String exceptionType;
            if (exception instanceof GoneException) {
                exceptionType = "GoneException";
            } else if (exception instanceof PartitionKeyRangeGoneException) {
                exceptionType = "PartitionKeyRangeGoneException";
            } else if (exception instanceof  InvalidPartitionException) {
                exceptionType = "InvalidPartitionException";
            } else if (exception instanceof  PartitionKeyRangeIsSplittingException) {
                exceptionType = "PartitionKeyRangeIsSplittingException";
            } else if (exception instanceof CosmosException) {
                logger.warn("Received CosmosException after backoff/retry. Will fail the request.",
                    exception);

                return (CosmosException)exception;
            } else {
                throw new IllegalStateException("Invalid exception type", exception);
            }

            RetryWithException lastRetryWithExceptionSnapshot =
                GoneAndRetryWithRetryPolicy.this.lastRetryWithException;
            if (lastRetryWithExceptionSnapshot != null) {
                logger.warn(
                    "Received {} after backoff/retry including at least one RetryWithException. "
                        + "Will fail the request with RetryWithException. {}: {}. RetryWithException: {}",
                    exceptionType,
                    exceptionType,
                    exception,
                    lastRetryWithExceptionSnapshot);

                return lastRetryWithExceptionSnapshot;
            }

            logger.warn(
                "Received {} after backoff/retry. Will fail the request. {}",
                exceptionType,
                exception);
            return BridgeInternal.createServiceUnavailableException(exception);
        }

        @Override
        public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
            CosmosException exceptionToThrow;
            Duration backoffTime = Duration.ofSeconds(0);
            Duration timeout;
            boolean forceRefreshAddressCache;
            if (isNonRetryableException(exception)) {
                logger.debug("Operation will NOT be retried. Current attempt {}, Exception: ", this.attemptCount,
                    exception);
                return Mono.just(ShouldRetryResult.noRetryOnNonRelatedException());
            } else if (exception instanceof GoneException &&
                !request.isReadOnly() &&
                BridgeInternal.hasSendingRequestStarted((CosmosException)exception) &&
                !((GoneException)exception).isBasedOn410ResponseFromService()) {

                logger.warn(
                    "Operation will NOT be retried. Write operations which failed due to transient transport errors " +
                        "can not be retried safely when sending the request " +
                        "to the service because they aren't idempotent. Current attempt {}, Exception: ",
                    this.attemptCount,
                    exception);

                return Mono.just(ShouldRetryResult.noRetry(
                    Quadruple.with(true, true, Duration.ofMillis(0), this.attemptCount)));
            }

            long remainingSeconds = this.waitTimeInSeconds -
                GoneAndRetryWithRetryPolicy.this.getElapsedTime().toMillis() / 1_000L;
            int currentRetryAttemptCount = this.attemptCount;
            if (this.attemptCount++ > 1) {
                if (remainingSeconds <= 0) {
                    exceptionToThrow = logAndWrapExceptionWithLastRetryWithException(exception);
                    return Mono.just(ShouldRetryResult.error(exceptionToThrow));
                }

                backoffTime = Duration.ofSeconds(Math.min(Math.min(this.currentBackoffSeconds, remainingSeconds),
                    GoneRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_SECONDS));
                this.currentBackoffSeconds *= GoneRetryPolicy.BACK_OFF_MULTIPLIER;
                logger.debug("BackoffTime: {} seconds.", backoffTime.getSeconds());
            }

            // Calculate the remaining time based after accounting for the backoff that we
            // will perform
            long timeoutInMillSec = remainingSeconds*1000 - backoffTime.toMillis();
            timeout = timeoutInMillSec > 0 ? Duration.ofMillis(timeoutInMillSec)
                : Duration.ofSeconds(GoneRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_SECONDS);
            logger.debug("Timeout. {} - BackoffTime {} - currentBackoffSeconds {} - CurrentRetryAttemptCount {}",
                timeout.toMillis(),
                backoffTime,
                this.currentBackoffSeconds,
                currentRetryAttemptCount);

            Pair<Mono<ShouldRetryResult>, Boolean> exceptionHandlingResult = handleException(exception);
            Mono<ShouldRetryResult> result = exceptionHandlingResult.getLeft();
            if (result != null) {
                return result;
            }

            forceRefreshAddressCache = exceptionHandlingResult.getRight();

            return Mono.just(ShouldRetryResult.retryAfter(backoffTime,
                Quadruple.with(forceRefreshAddressCache, true, timeout, currentRetryAttemptCount)));
        }

        @Override
        public RetryContext getRetryContext() {
            return this.retryContext;
        }

        private Pair<Mono<ShouldRetryResult>, Boolean> handleException(Exception exception) {
            if (exception instanceof GoneException) {
                return handleGoneException((GoneException)exception);
            } else if (exception instanceof PartitionIsMigratingException) {
                return handlePartitionIsMigratingException((PartitionIsMigratingException)exception);
            } else if (exception instanceof InvalidPartitionException) {
                return handleInvalidPartitionException((InvalidPartitionException)exception);
            } else if (exception instanceof PartitionKeyRangeIsSplittingException) {
                return handlePartitionKeyIsSplittingException((PartitionKeyRangeIsSplittingException) exception);
            }

            throw new IllegalStateException("Invalid exception type", exception);
        }

        private Pair<Mono<ShouldRetryResult>, Boolean> handleGoneException(GoneException exception) {
            logger.debug("Received gone exception, will retry, {}", exception.toString());
            return Pair.of(null, true); // indicate we are in retry.
        }

        private Pair<Mono<ShouldRetryResult>, Boolean> handlePartitionIsMigratingException(PartitionIsMigratingException exception) {
            logger.debug("Received PartitionIsMigratingException, will retry, {}", exception.toString());
            this.request.forceCollectionRoutingMapRefresh = true;
            return Pair.of(null, true);
        }

        private Pair<Mono<ShouldRetryResult>, Boolean> handlePartitionKeyIsSplittingException(PartitionKeyRangeIsSplittingException exception) {
            this.request.requestContext.resolvedPartitionKeyRange = null;
            this.request.requestContext.quorumSelectedLSN = -1;
            this.request.requestContext.quorumSelectedStoreResponse = null;
            logger.debug("Received partition key range splitting exception, will retry, {}", exception.toString());
            this.request.forcePartitionKeyRangeRefresh = true;
            return Pair.of(null, false);
        }

        private Pair<Mono<ShouldRetryResult>, Boolean> handleInvalidPartitionException(InvalidPartitionException exception) {
            this.request.requestContext.quorumSelectedLSN = -1;
            this.request.requestContext.resolvedPartitionKeyRange = null;
            this.request.requestContext.quorumSelectedStoreResponse = null;
            this.request.requestContext.globalCommittedSelectedLSN = -1;
            if (this.attemptCountInvalidPartition++ > 2) {
                // for second InvalidPartitionException, stop retrying.
                logger.warn("Received second InvalidPartitionException after backoff/retry. Will fail the request. {}",
                    exception.toString());
                return Pair.of(
                    Mono.just(ShouldRetryResult.error(BridgeInternal.createServiceUnavailableException(exception))),
                    false);
            }

            logger.debug("Received invalid collection exception, will retry, {}", exception.toString());
            this.request.forceNameCacheRefresh = true;

            return Pair.of(null, false);
        }
    }

    class RetryWithRetryPolicy implements IRetryPolicy {
        private final static int DEFAULT_WAIT_TIME_IN_SECONDS = 30;
        private final static int MAXIMUM_BACKOFF_TIME_IN_MS = 1000;
        private final static int INITIAL_BACKOFF_TIME_MS = 10;
        private final static int BACK_OFF_MULTIPLIER = 2;
        private final static int RANDOM_SALT_IN_MS = 5;

        private volatile int attemptCount = 1;
        private volatile int currentBackoffMilliseconds = RetryWithRetryPolicy.INITIAL_BACKOFF_TIME_MS;

        private final int waitTimeInSeconds;
        private RetryContext retryContext;


        public RetryWithRetryPolicy(Integer waitTimeInSeconds, RetryContext retryContext) {
            this.waitTimeInSeconds = waitTimeInSeconds != null ? waitTimeInSeconds : DEFAULT_WAIT_TIME_IN_SECONDS;
            this.retryContext = retryContext;
        }

        @Override
        public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
            Duration backoffTime;
            Duration timeout;

            if (!(exception instanceof RetryWithException)) {
                logger.debug("Operation will NOT be retried. Current attempt {}, Exception: ", this.attemptCount,
                    exception);
                return Mono.just(ShouldRetryResult.noRetryOnNonRelatedException());
            }

            RetryWithException lastRetryWithException = (RetryWithException)exception;
            GoneAndRetryWithRetryPolicy.this.lastRetryWithException = lastRetryWithException;

            long remainingMilliseconds =
                (this.waitTimeInSeconds * 1_000L) -
                    GoneAndRetryWithRetryPolicy.this.getElapsedTime().toMillis();
            int currentRetryAttemptCount = this.attemptCount++;

            if (remainingMilliseconds <= 0) {
                logger.warn("Received RetryWithException after backoff/retry. Will fail the request.",
                    lastRetryWithException);
                return Mono.just(ShouldRetryResult.error(lastRetryWithException));
            }

            backoffTime = Duration.ofMillis(
                Math.min(
                    Math.min(this.currentBackoffMilliseconds + random.nextInt(RANDOM_SALT_IN_MS), remainingMilliseconds),
                    RetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_MS));
            this.currentBackoffMilliseconds *= RetryWithRetryPolicy.BACK_OFF_MULTIPLIER;
            logger.debug("BackoffTime: {} ms.", backoffTime.toMillis());

            // Calculate the remaining time based after accounting for the backoff that we
            // will perform
            long timeoutInMillSec = remainingMilliseconds - backoffTime.toMillis();
            timeout = timeoutInMillSec > 0 ? Duration.ofMillis(timeoutInMillSec)
                : Duration.ofMillis(RetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_MS);

            logger.debug("Received RetryWithException, will retry, ", exception);

            // For RetryWithException, prevent the caller
            // from refreshing any caches.
            return Mono.just(ShouldRetryResult.retryAfter(backoffTime,
                Quadruple.with(false, true, timeout, currentRetryAttemptCount)));
        }

        @Override
        public RetryContext getRetryContext() {
            return this.retryContext;
        }
    }
}