ExecuteTask.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.resourcemanager.resources.fluentcore.model.implementation;

import com.azure.resourcemanager.resources.fluentcore.model.Indexable;
import com.azure.resourcemanager.resources.fluentcore.dag.TaskGroup;
import com.azure.resourcemanager.resources.fluentcore.dag.TaskItem;
import com.azure.resourcemanager.resources.fluentcore.utils.ResourceManagerUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * A {@link TaskItem} type, when invoked it execute a work using the {@link Executor}
 * it composes.
 *
 * @param <ResultT> the type of the result that this task produces upon execution
 */
public class ExecuteTask<ResultT extends Indexable> implements TaskItem {
    /**
     * the underlying instance that can execute the task.
     */
    private Executor<ResultT> executor;
    /**
     * result of execution.
     */
    private ResultT result;

    /**
     * Creates ExecuteTask.
     *
     * @param executor executor used by this TaskItem to execute the work when invoked.
     */
    public ExecuteTask(Executor<ResultT> executor) {
        this.executor = executor;
    }

    @Override
    public ResultT result() {
        return this.result;
    }

    @Override
    public void beforeGroupInvoke() {
        executor.beforeGroupExecute();
    }

    @Override
    public boolean isHot() {
        return executor.isHot();
    }

    @Override
    public Mono<Indexable> invokeAsync(TaskGroup.InvocationContext context) {
        return this.executor.executeWorkAsync()
                .subscribeOn(ResourceManagerUtils.InternalRuntimeContext.getReactorScheduler())
                .doOnNext(resultT -> result = resultT)
                .map(resourceT -> resourceT);
    }

    @Override
    public Mono<Void> invokeAfterPostRunAsync(boolean isGroupFaulted) {
        return this.executor.afterPostRunAsync(isGroupFaulted);
    }

    /**
     * Represents a type that know how to execute a work that produces result of type {@link T}.
     * <p>
     * An instance of {@link ExecuteTask} wraps this type and invokes appropriate methods when
     * ExecuteTask methods get called during TaskGroup invocation.
     *
     * @param <T> the type of the produced value.
     */
    public interface Executor<T extends Indexable> {
        /**
         * The method that gets called before invoking all the tasks in the {@link TaskGroup}
         * that the parent {@link ExecuteTask} belongs to.
         */
        void beforeGroupExecute();

        /**
         * @return true if the observable returned by {@link this#executeWorkAsync()} is hot, false if it is
         * cold observable.
         */
        boolean isHot();

        /**
         * Execute the work asynchronously.
         *
         * @return the {@link Mono} reference
         */
        Mono<T> executeWorkAsync();

        /**
         * Perform any action followed by the processing of work scheduled to be invoked
         * (i.e. "post run") after {@link this#executeWorkAsync()}.
         *
         * @param isGroupFaulted true if one or more tasks in the group this work belongs
         *                       to are in faulted state.
         * @return a {@link Flux} represents the asynchronous action
         */
        Mono<Void> afterPostRunAsync(boolean isGroupFaulted);
    }
}