AsyncPollResponse.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util.polling;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
 * AsyncPollResponse represents an event emitted by the {@link PollerFlux} that asynchronously polls
 * a long-running operation (LRO). An AsyncPollResponse event provides information such as the current
 * {@link LongRunningOperationStatus status} of the LRO, any {@link #getValue value} returned
 * in the poll, as well as other useful information provided by the service.
 * AsyncPollResponse also exposes {@link #cancelOperation} method to cancel the long-running operation
 * from reactor operator chain and {@link #getFinalResult()} method that returns final result of
 * the long-running operation.
 *
 * @param <T> The type of poll response value.
 * @param <U> The type of the final result of long-running operation.
 */
public final class AsyncPollResponse<T, U> {
    private final ClientLogger logger = new ClientLogger(AsyncPollResponse.class);
    private final PollingContext<T> pollingContext;
    private final BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancellationOperation;
    private final Function<PollingContext<T>, Mono<U>> fetchResultOperation;
    private final PollResponse<T> pollResponse;

    /**
     * Creates AsyncPollResponse.
     *
     * @param pollingContext the polling context
     * @param cancellationOperation the cancellation operation if supported by the service
     * @param fetchResultOperation the operation to fetch final result of long-running operation, if supported
     *                             by the service
     */
    AsyncPollResponse(PollingContext<T> pollingContext,
                      BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancellationOperation,
                      Function<PollingContext<T>, Mono<U>> fetchResultOperation) {
        this.pollingContext = Objects.requireNonNull(pollingContext,
                "'pollingContext' cannot be null.");
        this.cancellationOperation = Objects.requireNonNull(cancellationOperation,
                "'cancellationOperation' cannot be null.");
        this.fetchResultOperation = Objects.requireNonNull(fetchResultOperation,
                "'fetchResultOperation' cannot be null.");
        this.pollResponse = this.pollingContext.getLatestResponse();
    }

    /**
     * Represents the status of the long-running operation at the time the last polling operation finished successfully.
     * @return A {@link LongRunningOperationStatus} representing the result of the poll operation.
     */
    public LongRunningOperationStatus getStatus() {
        return pollResponse.getStatus();
    }

    /**
     * The value returned as a result of the last successful poll operation. This can be any custom user defined object,
     * or null if no value was returned from the service.
     *
     * @return T result of poll operation.
     */
    public T getValue() {
        return pollResponse.getValue();
    }

    /**
     * @return a Mono, upon subscription it cancels the remote long-running operation if cancellation
     * is supported by the service.
     */
    public Mono<T> cancelOperation() {
        return Mono.defer(() -> {
            try {
                return this.cancellationOperation
                        .apply(this.pollingContext, this.pollingContext.getActivationResponse());
            } catch (RuntimeException re) {
                return FluxUtil.monoError(logger, re);
            }
        });
    }

    /**
     * @return a Mono, upon subscription it fetches the final result of long-running operation if it
     * is supported by the service. If the long-running operation is not completed, then an empty
     * Mono will be returned.
     */
    public Mono<U> getFinalResult() {
        return Mono.defer(() -> {
            if (!this.pollResponse.getStatus().isComplete()) {
                return Mono.empty();
            } else {
                try {
                    return this.fetchResultOperation
                            .apply(this.pollingContext);
                } catch (RuntimeException re) {
                    return FluxUtil.monoError(logger, re);
                }
            }
        });
    }

    /**
     * Returns the delay the service has requested until the next polling operation is performed. A null or negative
     * value will be taken to mean that the Poller should determine on its own when the next poll operation is
     * to occur.
     *
     * @return Duration How long to wait before next retry.
     */
    Duration getRetryAfter() {
        return pollResponse.getRetryAfter();
    }
}