RetryUtil.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.amqp.implementation;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ExponentialAmqpRetryPolicy;
import com.azure.core.amqp.FixedAmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.logging.ClientLogger;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.TimeoutException;
/**
* Helper class to help with retry policies.
*/
public class RetryUtil {
private static final ClientLogger LOGGER = new ClientLogger(RetryUtil.class);
// So this class can't be instantiated.
private RetryUtil() {
}
/**
* Given a set of {@link AmqpRetryOptions options}, creates the appropriate retry policy.
*
* @param options A set of options used to configure the retry policy.
* @return A new retry policy configured with the given {@code options}.
* @throws IllegalArgumentException If {@link AmqpRetryOptions#getMode()} is not a supported mode.
*/
public static AmqpRetryPolicy getRetryPolicy(AmqpRetryOptions options) {
switch (options.getMode()) {
case FIXED:
return new FixedAmqpRetryPolicy(options);
case EXPONENTIAL:
return new ExponentialAmqpRetryPolicy(options);
default:
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Mode is not supported: %s", options.getMode()));
}
}
/**
* Given a {@link Flux} will apply the retry policy to it when the operation times out.
*
* @param source The publisher to apply the retry policy to.
* @return A publisher that returns the results of the {@link Flux} if any of the retry attempts are successful.
* Otherwise, propagates a {@link TimeoutException}.
*/
public static <T> Flux<T> withRetry(Flux<T> source, Duration operationTimeout, AmqpRetryPolicy retryPolicy) {
return Flux.defer(() -> source.timeout(operationTimeout))
.retryWhen(Retry.withThrowable(errors -> retry(errors, retryPolicy)));
}
/**
* Given a {@link Mono} will apply the retry policy to it when the operation times out.
*
* @param source The publisher to apply the retry policy to.
* @return A publisher that returns the results of the {@link Flux} if any of the retry attempts are successful.
* Otherwise, propagates a {@link TimeoutException}.
*/
public static <T> Mono<T> withRetry(Mono<T> source, Duration operationTimeout, AmqpRetryPolicy retryPolicy) {
return Mono.defer(() -> source.timeout(operationTimeout))
.retryWhen(Retry.withThrowable(errors -> retry(errors, retryPolicy)));
}
private static Flux<Long> retry(Flux<Throwable> source, AmqpRetryPolicy retryPolicy) {
return source.zipWith(Flux.range(1, retryPolicy.getMaxRetries() + 1),
(error, attempt) -> {
if (attempt > retryPolicy.getMaxRetries()) {
LOGGER.warning("Retry attempts are exhausted. Current: {}. Max: {}.", attempt,
retryPolicy.getMaxRetries());
throw Exceptions.propagate(error);
}
if (error instanceof TimeoutException) {
LOGGER.info("TimeoutException error occurred. Retrying operation. Attempt: {}.",
attempt, error);
return retryPolicy.calculateRetryDelay(error, attempt);
} else if (error instanceof AmqpException && (((AmqpException) error).isTransient())) {
LOGGER.info("Retryable error occurred. Retrying operation. Attempt: {}. Error condition: {}",
attempt, ((AmqpException) error).getErrorCondition(), error);
return retryPolicy.calculateRetryDelay(error, attempt);
} else {
LOGGER.warning("Error is not a TimeoutException nor is it a retryable AMQP exception.", error);
throw Exceptions.propagate(error);
}
})
.flatMap(Mono::delay);
}
}