PollerFlux.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.util.polling;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* A Flux that simplifies the task of executing long running operations against an Azure service.
* A subscription to {@link PollerFlux} initiates a long running operation and polls the status
* until it completes.
*
* <p><strong>Code samples</strong></p>
*
* <p><strong>Instantiating and subscribing to PollerFlux</strong></p>
* {@codesnippet com.azure.core.util.polling.poller.instantiationAndSubscribe}
*
* <p><strong>Asynchronously wait for polling to complete and then retrieve the final result</strong></p>
* {@codesnippet com.azure.core.util.polling.poller.getResult}
*
* <p><strong>Block for polling to complete and then retrieve the final result</strong></p>
* {@codesnippet com.azure.core.util.polling.poller.blockAndGetResult}
*
* <p><strong>Asynchronously poll until poller receives matching status</strong></p>
* {@codesnippet com.azure.core.util.polling.poller.pollUntil}
*
* <p><strong>Asynchronously cancel the long running operation</strong></p>
* {@codesnippet com.azure.core.util.polling.poller.cancelOperation}
*
* @param <T> The type of poll response value.
* @param <U> The type of the final result of long running operation.
*/
public final class PollerFlux<T, U> extends Flux<AsyncPollResponse<T, U>> {
private final ClientLogger logger = new ClientLogger(PollerFlux.class);
private final PollingContext<T> rootContext = new PollingContext<>();
private final Duration defaultPollInterval;
private final Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation;
private final BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation;
private final Function<PollingContext<T>, Mono<U>> fetchResultOperation;
private final Mono<Boolean> oneTimeActivationMono;
private final Function<PollingContext<T>, PollResponse<T>> syncActivationOperation;
/**
* Creates PollerFlux.
*
* @param pollInterval the polling interval
* @param activationOperation the activation operation to activate (start) the long running operation.
* This operation will be invoked at most once across all subscriptions. This parameter is required.
* If there is no specific activation work to be done then invocation should return Mono.empty(),
* this operation will be called with a new {@link PollingContext}.
* @param pollOperation the operation to poll the current state of long running operation. This parameter
* is required and the operation will be called with current {@link PollingContext}.
* @param cancelOperation a {@link Function} that represents the operation to cancel the long running operation
* if service supports cancellation. This parameter is required. If service does not support cancellation
* then the implementer should return Mono.error with an error message indicating absence of cancellation
* support. The operation will be called with current {@link PollingContext}.
* @param fetchResultOperation a {@link Function} that represents the operation to retrieve final result of
* the long running operation if service support it. This parameter is required and operation will be called
* current {@link PollingContext}. If service does not have an api to fetch final result and if final result
* is same as final poll response value then implementer can choose to simply return value from provided
* final poll response.
*/
public PollerFlux(Duration pollInterval,
Function<PollingContext<T>, Mono<T>> activationOperation,
Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation,
BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation,
Function<PollingContext<T>, Mono<U>> fetchResultOperation) {
Objects.requireNonNull(pollInterval, "'pollInterval' cannot be null.");
if (pollInterval.compareTo(Duration.ZERO) <= 0) {
throw logger.logExceptionAsWarning(new IllegalArgumentException(
"Negative or zero value for 'defaultPollInterval' is not allowed."));
}
this.defaultPollInterval = pollInterval;
Objects.requireNonNull(activationOperation, "'activationOperation' cannot be null.");
this.pollOperation = Objects.requireNonNull(pollOperation, "'pollOperation' cannot be null.");
this.cancelOperation = Objects.requireNonNull(cancelOperation, "'cancelOperation' cannot be null.");
this.fetchResultOperation = Objects.requireNonNull(fetchResultOperation,
"'fetchResultOperation' cannot be null.");
this.oneTimeActivationMono = new OneTimeActivation<>(this.rootContext,
activationOperation,
// mapper
activationResult -> new PollResponse<>(LongRunningOperationStatus.NOT_STARTED, activationResult)).getMono();
this.syncActivationOperation =
cxt -> new PollResponse<>(LongRunningOperationStatus.NOT_STARTED, activationOperation.apply(cxt).block());
}
/**
* Creates PollerFlux.
*
* This create method differs from the PollerFlux constructor in that the constructor uses an
* activationOperation which returns a Mono that emits result, the create method uses an activationOperation
* which returns a Mono that emits {@link PollResponse}. The {@link PollResponse} holds the result.
* If the {@link PollResponse} from the activationOperation indicate that long running operation is
* completed then the pollOperation will not be called.
*
* @param pollInterval the polling interval
* @param activationOperation the activation operation to activate (start) the long running operation.
* This operation will be invoked at most once across all subscriptions. This parameter is required.
* If there is no specific activation work to be done then invocation should return Mono.empty(),
* this operation will be called with a new {@link PollingContext}.
* @param pollOperation the operation to poll the current state of long running operation. This parameter
* is required and the operation will be called with current {@link PollingContext}.
* @param cancelOperation a {@link Function} that represents the operation to cancel the long running operation
* if service supports cancellation. This parameter is required. If service does not support cancellation
* then the implementer should return Mono.error with an error message indicating absence of cancellation
* support. The operation will be called with current {@link PollingContext}.
* @param fetchResultOperation a {@link Function} that represents the operation to retrieve final result of
* the long running operation if service support it. This parameter is required and operation will be called
* current {@link PollingContext}. If service does not have an api to fetch final result and if final result
* is same as final poll response value then implementer can choose to simply return value from provided
* final poll response.
*
* @param <T> The type of poll response value.
* @param <U> The type of the final result of long running operation.
* @return PollerFlux
*/
public static <T, U> PollerFlux<T, U>
create(Duration pollInterval,
Function<PollingContext<T>, Mono<PollResponse<T>>> activationOperation,
Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation,
BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation,
Function<PollingContext<T>, Mono<U>> fetchResultOperation) {
return new PollerFlux<>(pollInterval,
activationOperation,
pollOperation,
cancelOperation,
fetchResultOperation,
true);
}
private PollerFlux(Duration pollInterval,
Function<PollingContext<T>, Mono<PollResponse<T>>> activationOperation,
Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation,
BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation,
Function<PollingContext<T>, Mono<U>> fetchResultOperation,
boolean ignored) {
Objects.requireNonNull(pollInterval, "'pollInterval' cannot be null.");
if (pollInterval.compareTo(Duration.ZERO) <= 0) {
throw logger.logExceptionAsWarning(new IllegalArgumentException(
"Negative or zero value for 'pollInterval' is not allowed."));
}
this.defaultPollInterval = pollInterval;
Objects.requireNonNull(activationOperation, "'activationOperation' cannot be null.");
this.pollOperation = Objects.requireNonNull(pollOperation, "'pollOperation' cannot be null.");
this.cancelOperation = Objects.requireNonNull(cancelOperation, "'cancelOperation' cannot be null.");
this.fetchResultOperation = Objects.requireNonNull(fetchResultOperation,
"'fetchResultOperation' cannot be null.");
this.oneTimeActivationMono = new OneTimeActivation<>(this.rootContext,
activationOperation,
// mapper
Function.identity()).getMono();
this.syncActivationOperation = cxt -> activationOperation.apply(cxt).block();
}
/**
* Creates a PollerFlux instance that returns an error on subscription.
*
* @param ex The exception to be returned on subscription of this {@link PollerFlux}.
* @param <T> The type of poll response value.
* @param <U> The type of the final result of long running operation.
* @return A poller flux instance that returns an error without emitting any data.
*
* @see Mono#error(Throwable)
* @see Flux#error(Throwable)
*/
public static <T, U> PollerFlux<T, U> error(Exception ex) {
return new PollerFlux<>(Duration.ofMillis(1L), context -> Mono.error(ex), context -> Mono.error(ex),
(context, response) -> Mono.error(ex), context -> Mono.error(ex));
}
@Override
public void subscribe(CoreSubscriber<? super AsyncPollResponse<T, U>> actual) {
this.oneTimeActivationMono
.flatMapMany(ignored -> {
final PollResponse<T> activationResponse = this.rootContext.getActivationResponse();
if (activationResponse.getStatus().isComplete()) {
return Flux.just(new AsyncPollResponse<>(this.rootContext,
this.cancelOperation,
this.fetchResultOperation));
} else {
return this.pollingLoop();
}
})
.subscribe(actual);
}
/**
* @return a synchronous blocking poller.
*/
public SyncPoller<T, U> getSyncPoller() {
return new DefaultSyncPoller<>(this.defaultPollInterval,
this.syncActivationOperation,
this.pollOperation,
this.cancelOperation,
this.fetchResultOperation);
}
/**
* Do the polling until it reaches a terminal state.
*
* @return a Flux that emits polling event.
*/
private Flux<AsyncPollResponse<T, U>> pollingLoop() {
return Flux.using(
// Create a Polling Context per subscription
() -> this.rootContext.copy(),
// Do polling
// set|read to|from context as needed, reactor guarantee thread-safety of cxt object.
cxt -> Mono.defer(() -> {
final Mono<PollResponse<T>> pollOnceMono = this.pollOperation.apply(cxt);
// Execute (subscribe to) the pollOnceMono after the default poll-interval
// or duration specified in the last retry-after response header elapses.
return pollOnceMono.delaySubscription(getDelay(cxt.getLatestResponse()));
})
.switchIfEmpty(Mono.error(new IllegalStateException("PollOperation returned Mono.empty().")))
.repeat()
.takeUntil(currentPollResponse -> currentPollResponse.getStatus().isComplete())
.concatMap(currentPollResponse -> {
cxt.setLatestResponse(currentPollResponse);
return Mono.just(new AsyncPollResponse<>(cxt,
this.cancelOperation,
this.fetchResultOperation));
}),
//
// No cleaning needed, Polling Context will be GC-ed
cxt -> { });
}
/**
* Get the duration to wait before making next poll attempt.
*
* @param pollResponse the poll response to retrieve delay duration from
* @return the delay
*/
private Duration getDelay(PollResponse<T> pollResponse) {
Duration retryAfter = pollResponse.getRetryAfter();
if (retryAfter == null) {
return this.defaultPollInterval;
} else {
return retryAfter.compareTo(Duration.ZERO) > 0
? retryAfter
: this.defaultPollInterval;
}
}
/**
* A utility to get One-Time-Executable-Mono that execute an activation function at most once.
*
* When subscribed to such a Mono it internally subscribes to a Mono that perform an activation
* function. The One-Time-Executable-Mono caches the result of activation function as a PollResponse
* in {@code rootContext}, this cached response will be used by any future subscriptions.
*
* Note: The standard cache() operator can't be used to achieve one time execution, because it caches
* error terminal signal and forward it to any future subscriptions. If there is an error while executing
* activation function then error should not be cached but it should be forward it to subscription that
* initiated the failed activation. For any future subscriptions such past error should not be delivered
* instead activation function should again invoked. Once a subscription result in successful execution
* of activation function then it will be cached in {@code rootContext} and will be used by any future
* subscriptions.
*
* The One-Time-Executable-Mono handles concurrent calls to activation. Only one of them will be able
* to execute the activation function and other subscriptions will keep resubscribing until it sees
* a activation happened or get a chance to call activation as the one previously entered the critical
* section got an error on activation.
*
* @param <V> The type of value in poll response.
* @param <R> The type of the activation operation result.
*/
private class OneTimeActivation<V, R> {
private final PollingContext<V> rootContext;
private final Function<PollingContext<V>, Mono<R>> activationFunction;
private final Function<R, PollResponse<V>> activationPollResponseMapper;
// indicates whether activation executed and completed 'successfully'.
private volatile boolean activated = false;
// to guard one-time-activation area
private final AtomicBoolean guardActivation = new AtomicBoolean(false);
/**
* Creates OneTimeActivation.
*
* @param rootContext the root context to store PollResponse holding activation result
* @param activationFunction function upon call return a Mono representing activation work
* @param activationPollResponseMapper mapper to map result of activation work execution to PollResponse
*/
OneTimeActivation(PollingContext<V> rootContext,
Function<PollingContext<V>, Mono<R>> activationFunction,
Function<R, PollResponse<V>> activationPollResponseMapper) {
this.rootContext = rootContext;
this.activationFunction = activationFunction;
this.activationPollResponseMapper = activationPollResponseMapper;
}
/**
* Get the mono containing activation work which on subscription executed only once.
*
* @return the one time executable mono
*/
Mono<Boolean> getMono() {
return Mono.defer(() -> {
if (this.activated) {
// already activated let subscriber get activation result from root context.
return Mono.just(true);
}
if (this.guardActivation.compareAndSet(false, true)) {
// one-time-activation-area
//
final Mono<R> activationMono;
try {
activationMono = this.activationFunction.apply(this.rootContext);
} catch (RuntimeException e) {
// onError: sync apply() failed
// 1. remove guard so that future subscriber can retry activation.
// 2. forward error to current subscriber.
this.guardActivation.set(false);
return FluxUtil.monoError(logger, e);
}
return activationMono
.map(this.activationPollResponseMapper)
.switchIfEmpty(Mono.defer(() ->
Mono.just(new PollResponse<>(LongRunningOperationStatus.NOT_STARTED, null))))
.map(activationResponse -> {
this.rootContext.setOnetimeActivationResponse(activationResponse);
this.activated = true;
return true;
})
// onError: async activation failed
// 1. remove guard so that future subscription can retry activation.
// 2. forward error to current subscriber.
.doOnError(throwable -> this.guardActivation.set(false));
} else {
// Couldn't enter one-time-activation-area (there was already someone in the area
// trying to activate). Return empty() to outer "repeatWhenEmpty" that will result
// in another attempt to enter one-time-activation-area.
return Mono.empty();
}
})
// Keep resubscribing as long as Mono.defer [holding activation work] emits empty().
.repeatWhenEmpty((Flux<Long> longFlux) -> longFlux.concatMap(ignored -> Flux.just(true)));
}
}
}