RetryUtils.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.apachecommons.lang.time.StopWatch;
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.function.Function;

/**
 * While this class is public, but it is not part of our published public APIs.
 * This is meant to be internally used only by our sdk.
 */
public class RetryUtils {
    private final static Logger logger = LoggerFactory.getLogger(BackoffRetryUtility.class);

    static Function<Flux<Throwable>, Flux<Long>> toRetryWhenFunc(IRetryPolicy policy) {
        return throwableFlux -> throwableFlux.flatMap(t -> {
            Exception e = Utils.as(t, Exception.class);
            if (e == null) {
                return Flux.error(t);
            }
            RetryContext retryContext = policy.getRetryContext();
            if (retryContext != null) {
                retryContext.captureStartTimeIfNotSet();
            }

            Flux<ShouldRetryResult> shouldRetryResultFlux = policy.shouldRetry(e).flux();
            return shouldRetryResultFlux.flatMap(s -> {
                CosmosException clientException = Utils.as(e, CosmosException.class);
                    addStatusSubStatusCodeOnRetryContext(retryContext, clientException, s.nonRelatedException);

                if (s.backOffTime != null) {
                    return Mono.delay(Duration.ofMillis(s.backOffTime.toMillis()), CosmosSchedulers.COSMOS_PARALLEL).flux();
                } else if (s.exception != null) {
                    return Flux.error(s.exception);
                } else {
                    // NoRetry return original failure
                    return Flux.error(t);
                }
            });
        });
    }

    /**
     * This method will be called after getting error on callbackMethod , and then keep trying between
     * callbackMethod and inBackoffAlternateCallbackMethod until success or as stated in
     * retry policy.
     * @param callbackMethod The callbackMethod
     * @param retryPolicy Retry policy
     * @param inBackoffAlternateCallbackMethod The inBackoffAlternateCallbackMethod
     * @param minBackoffForInBackoffCallback Minimum backoff for InBackoffCallbackMethod
     * @return
     */
    public static <T> Function<Throwable, Mono<T>> toRetryWithAlternateFunc(Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod,
                                                                            IRetryPolicy retryPolicy,
                                                                            Function<Quadruple<Boolean, Boolean, Duration, Integer>,
                                                                            Mono<T>> inBackoffAlternateCallbackMethod, Duration minBackoffForInBackoffCallback,
                                                                            RxDocumentServiceRequest rxDocumentServiceRequest,
                                                                            AddressSelector addressSelector) {
        return throwable -> {
            RetryContext retryContext = retryPolicy.getRetryContext();
            if (retryContext != null) {
                retryContext.captureStartTimeIfNotSet();
                if (retryContext.getRetryCount() > 0) {
                    retryContext.updateEndTime();
                }
            }

            Exception e = Utils.as(throwable, Exception.class);
            if (e == null) {
                return Mono.error(throwable);
            }

            Mono<ShouldRetryResult> shouldRetryResultFlux = retryPolicy.shouldRetry(e);
            return shouldRetryResultFlux.flatMap(shouldRetryResult -> {
                if (retryContext != null) {
                    CosmosException clientException = Utils.as(e, CosmosException.class);
                    addStatusSubStatusCodeOnRetryContext(retryContext, clientException, shouldRetryResult.nonRelatedException);
                    retryContext.updateEndTime();
                }

                if (!shouldRetryResult.shouldRetry) {
                    if (retryContext != null) {
                        retryContext.updateEndTime();
                    }

                    final Throwable errorToReturn = shouldRetryResult.exception != null ? shouldRetryResult.exception : e;
                    final Mono<T> failure = Mono.error(errorToReturn);

                    if (shouldRetryResult.policyArg != null) {
                        Boolean forceAddressRefresh = shouldRetryResult.policyArg.getValue0();

                        if (forceAddressRefresh != null && forceAddressRefresh) {
                            startBackgroundAddressRefresh(rxDocumentServiceRequest, addressSelector);
                        }
                    }

                    return failure;
                }

                if (inBackoffAlternateCallbackMethod != null
                        && shouldRetryResult.backOffTime.compareTo(minBackoffForInBackoffCallback) > 0) {
                    StopWatch stopwatch = new StopWatch();
                    startStopWatch(stopwatch);
                    return inBackoffAlternateCallbackMethod.apply(shouldRetryResult.policyArg)
                            .onErrorResume(recursiveWithAlternateFunc(callbackMethod, retryPolicy,
                                    inBackoffAlternateCallbackMethod, shouldRetryResult, stopwatch,
                                    minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector));
                } else if (shouldRetryResult.backOffTime == Duration.ZERO) {
                    return recursiveFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod,
                            shouldRetryResult, minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector);
                } else {
                    // it is important to use defer here, because we do not want the TimeoutHelper to be initialized before the backoff.
                    // TimeoutHelper is used in consistency layer for request timeout check.
                    return Mono.defer(() -> {
                        return recursiveFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod,
                            shouldRetryResult, minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector);
                    })
                        .delaySubscription(
                            Duration.ofMillis(shouldRetryResult.backOffTime.toMillis()),
                            CosmosSchedulers.COSMOS_PARALLEL);
                }
            });
        };
    }

    private static void startBackgroundAddressRefresh(
        RxDocumentServiceRequest request,
        AddressSelector addressSelector) {

        addressSelector.resolveAddressesAsync(request, true)
                       .publishOn(Schedulers.boundedElastic())
                       .subscribe(
                           r -> {
                           },
                           e -> logger.warn(
                               "Background refresh of addresses failed with {}", e.getMessage(), e)
                       );
    }

    private static <T> Mono<T> recursiveFunc(
        Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod,
        IRetryPolicy retryPolicy,
        Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
        ShouldRetryResult shouldRetryResult,
        Duration minBackoffForInBackoffCallback,
        RxDocumentServiceRequest rxDocumentServiceRequest,
        AddressSelector addressSelector) {

        return callbackMethod.apply(shouldRetryResult.policyArg).onErrorResume(toRetryWithAlternateFunc(
                callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector));
    }

    private static <T> Function<Throwable, Mono<T>> recursiveWithAlternateFunc(
        Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod,
        IRetryPolicy retryPolicy,
        Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
        ShouldRetryResult shouldRetryResult,
        StopWatch stopwatch,
        Duration minBackoffForInBackoffCallback,
        RxDocumentServiceRequest rxDocumentServiceRequest,
        AddressSelector addressSelector) {

        return throwable -> {
            Exception e = Utils.as(throwable, Exception.class);
            if (e == null) {
                return Mono.error(throwable);
            }

            stopStopWatch(stopwatch);
            logger.info("Failed inBackoffAlternateCallback with {}, proceeding with retry. Time taken: {}ms",
                    e.toString(), stopwatch.getTime());
            Duration backoffTime = shouldRetryResult.backOffTime.toMillis() > stopwatch.getTime()
                    ? Duration.ofMillis(shouldRetryResult.backOffTime.toMillis() - stopwatch.getTime())
                    : Duration.ZERO;
            return recursiveFunc(callbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, shouldRetryResult,
                    minBackoffForInBackoffCallback, rxDocumentServiceRequest, addressSelector)
                    .delaySubscription(
                        Duration.ofMillis(backoffTime.toMillis()),
                        CosmosSchedulers.COSMOS_PARALLEL);
        };
    }

    private static void stopStopWatch(StopWatch stopwatch) {
        synchronized (stopwatch) {
            stopwatch.stop();
        }
    }

    private static void startStopWatch(StopWatch stopwatch) {
        synchronized (stopwatch) {
            stopwatch.start();
        }
    }

    private static void addStatusSubStatusCodeOnRetryContext(RetryContext retryContext,
                                                             CosmosException clientException,
                                                             boolean isNonRelatedException) {
        if (!isNonRelatedException) {
            if (retryContext != null && clientException != null) {
                retryContext.addStatusAndSubStatusCode(clientException.getStatusCode(),
                    clientException.getSubStatusCode());
            }
        }
    }
}