TaskGroup.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.resourcemanager.resources.fluentcore.dag;

import com.azure.core.util.logging.ClientLogger;
import com.azure.resourcemanager.resources.fluentcore.model.Indexable;
import com.azure.resourcemanager.resources.fluentcore.utils.ResourceManagerUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Type representing a group of task entries with dependencies between them. Initially a task
 * group will have only one task entry known as root task entry, then more entries can be
 * added by taking dependency on other task groups or adding "post-run" task group dependents.
 * <p>
 * The method {@link TaskGroup#invokeAsync(InvocationContext)} ()} kick-off invocation of tasks
 * in the group, task are invoked in topological sorted order.
 * <p>
 * {@link TaskGroup#addDependencyTaskGroup(TaskGroup)}: A task group "A" can take dependency on
 * another task group "B" through this method e.g. `A.addDependencyTaskGroup(B)` indicates that
 * completion of tasks in the dependency task group "B" is required before the invocation of root
 * task in group "A". A.invokeAsync(cxt) will ensure this order.
 * <p>
 * {@link TaskGroup#addPostRunDependentTaskGroup(TaskGroup)}: there are scenarios where a subset
 * of dependent task groups say "H", "I" may required to run after the invocation of a task group
 * "K" when K.invokeAsync(cxt) is called. Such special dependents can be added via
 * K.addPostRunDependentTaskGroup(H) and K.addPostRunDependentTaskGroup(I).
 * <p>
 * The result produced by the tasks in the group are of type {@link Indexable}.
 */
public class TaskGroup
        extends DAGraph<TaskItem, TaskGroupEntry<TaskItem>>
        implements Indexable {
    /**
     * The root task in this task group.
     */
    private final TaskGroupEntry<TaskItem> rootTaskEntry;
    /**
     * Task group termination strategy to be used once any task in the group error-ed.
     */
    private TaskGroupTerminateOnErrorStrategy taskGroupTerminateOnErrorStrategy;
    /**
     * Flag indicating whether this group is marked as cancelled or not. This flag will be used only
     * when group's terminate on error strategy is set as
     * {@link TaskGroupTerminateOnErrorStrategy#TERMINATE_ON_IN_PROGRESS_TASKS_COMPLETION}.
     * Effect of setting this flag can be think as broadcasting a cancellation signal to tasks those
     * are yet to invoke.
     */
    private final AtomicBoolean isGroupCancelled;
    /**
     * The shared exception object used to indicate that a task is not invoked since the group
     * is marked as cancelled i.e. {@link this#isGroupCancelled} is set.
     */
    private final TaskCancelledException taskCancelledException = new TaskCancelledException();
    /**
     * The helper to operate on proxy TaskGroup of this TaskGroup for supporting dependents marked
     * for post run.
     */
    protected ProxyTaskGroupWrapper proxyTaskGroupWrapper;

    private final ClientLogger logger = new ClientLogger(this.getClass());

    /**
     * Creates TaskGroup.
     *
     * @param rootTaskEntry the entry holding root task
     */
    private TaskGroup(TaskGroupEntry<TaskItem> rootTaskEntry) {
        super(rootTaskEntry);
        this.isGroupCancelled = new AtomicBoolean(false);
        this.rootTaskEntry = rootTaskEntry;
        this.proxyTaskGroupWrapper = new ProxyTaskGroupWrapper(this);
    }

    /**
     * Creates TaskGroup.
     *
     * @param rootTaskItemId the id of the root task in the group
     * @param rootTaskItem the root task
     */
    public TaskGroup(String rootTaskItemId,
                     TaskItem rootTaskItem) {
        this(new TaskGroupEntry<TaskItem>(rootTaskItemId, rootTaskItem));
    }

    /**
     * Creates TaskGroup.
     *
     * @param rootTaskItem the root task
     */
    public TaskGroup(IndexableTaskItem rootTaskItem) {
        this(new TaskGroupEntry<TaskItem>(rootTaskItem.key(), rootTaskItem));
    }

    /**
     * @return the key of this task group, which is same as key of the root entry in the group
     */
    @Override
    public String key() {
        return this.rootTaskEntry.key();
    }

    /**
     * Retrieve the result produced by a task with the given id in the group.
     * <p>
     * This method can be used to retrieve the result of invocation of both dependency
     * and "post-run" dependent tasks. If task with the given id does not exists then
     * IllegalArgumentException exception will be thrown.
     *
     * @param taskId the task item id
     * @return the task result, null will be returned if task has not yet been invoked
     */
    public Indexable taskResult(String taskId) {
        TaskGroupEntry<TaskItem> taskGroupEntry = super.getNode(taskId);
        if (taskGroupEntry != null) {
            return taskGroupEntry.taskResult();
        }
        if (!this.proxyTaskGroupWrapper.isActive()) {
            throw logger.logExceptionAsError(
                new IllegalArgumentException("A dependency task with id '" + taskId + "' is not found"));
        }
        taskGroupEntry = this.proxyTaskGroupWrapper.proxyTaskGroup.getNode(taskId);
        if (taskGroupEntry != null) {
            return taskGroupEntry.taskResult();
        }
        throw logger.logExceptionAsError(new IllegalArgumentException(
            "A dependency task or 'post-run' dependent task with with id '" + taskId + "' not found"));
    }

    /**
     * Checks this TaskGroup depends on the given TaskGroup.
     *
     * @param taskGroup the TaskGroup to check
     * @return true if TaskGroup is depends on the given TaskGroup
     */
    public boolean dependsOn(TaskGroup taskGroup) {
        return this.nodeTable.containsKey(taskGroup.root().key());
    }

    /**
     * @return the root task entry in the group.
     */
    protected TaskGroupEntry<TaskItem> root() {
        return this.rootTaskEntry;
    }

    /**
     * Mark root of this task task group depends on the given TaskItem.
     * This ensure this task group's root get picked for execution only after the completion
     * of invocation of provided TaskItem.
     *
     * @param dependencyTaskItem the task item that this task group depends on
     * @return the key of the dependency
     */
    public String addDependency(FunctionalTaskItem dependencyTaskItem) {
        IndexableTaskItem dependency = IndexableTaskItem.create(dependencyTaskItem);
        this.addDependency(dependency);
        return dependency.key();
    }

    /**
     * Mark root of this task task group depends on the given item's taskGroup.
     * This ensure this task group's root get picked for execution only after the completion
     * of invocation of provided TaskItem.
     *
     * @param hasTaskGroup an item with taskGroup that this task group depends on
     */
    public void addDependency(TaskGroup.HasTaskGroup hasTaskGroup) {
        this.addDependencyTaskGroup(hasTaskGroup.taskGroup());
    }

    /**
     * Mark root of this task task group depends on the given task group's root.
     * This ensure this task group's root get picked for execution only after the completion
     * of all tasks in the given group.
     *
     * @param dependencyTaskGroup the task group that this task group depends on
     */
    public void addDependencyTaskGroup(TaskGroup dependencyTaskGroup) {
        if (dependencyTaskGroup.proxyTaskGroupWrapper.isActive()) {
            dependencyTaskGroup.proxyTaskGroupWrapper.addDependentTaskGroup(this);
        } else {
            DAGraph<TaskItem, TaskGroupEntry<TaskItem>> dependencyGraph = dependencyTaskGroup;
            super.addDependencyGraph(dependencyGraph);
        }
    }

    /**
     * Mark the given TaskItem depends on this taskGroup.
     *
     * @param dependentTaskItem the task item that depends on this task group
     * @return key to be used as parameter to taskResult(string) method to retrieve result of
     * invocation of given task item.
     */
    public String addPostRunDependent(FunctionalTaskItem dependentTaskItem) {
        IndexableTaskItem taskItem = IndexableTaskItem.create(dependentTaskItem);
        this.addPostRunDependent(taskItem);
        return taskItem.key();
    }

    /**
     * Mark the given TaskItem depends on this taskGroup.
     *
     * @param dependentTaskItem the task item that depends on this task group
     * @param internalContext the internal runtime context
     * @return key to be used as parameter to taskResult(string) method to retrieve result of
     * invocation of given task item.
     */
    public String addPostRunDependent(
        FunctionalTaskItem dependentTaskItem, ResourceManagerUtils.InternalRuntimeContext internalContext) {
        IndexableTaskItem taskItem = IndexableTaskItem.create(dependentTaskItem, internalContext);
        this.addPostRunDependent(taskItem);
        return taskItem.key();
    }

    /**
     * Mark the given item with taskGroup depends on this taskGroup.
     *
     * @param hasTaskGroup an item with as task group that depends on this task group
     */
    public void addPostRunDependent(TaskGroup.HasTaskGroup hasTaskGroup) {
        this.addPostRunDependentTaskGroup(hasTaskGroup.taskGroup());
    }

    /**
     * Mark root of the given task group depends on this task group's root.
     * This ensure given task group's root get picked for invocation only after the completion
     * of all tasks in this group. Calling invokeAsync(cxt) will run the tasks in the given
     * dependent task group as well.
     *
     * @param dependentTaskGroup the task group depends on this task group
     */
    public void addPostRunDependentTaskGroup(TaskGroup dependentTaskGroup) {
        this.proxyTaskGroupWrapper.addPostRunTaskGroupForActualTaskGroup(dependentTaskGroup);
    }

    /**
     * Invokes tasks in the group.
     * It is not guaranteed to return indexable in topological order.
     *
     * @param context group level shared context that need be passed to invokeAsync(cxt)
     *                method of each task item in the group when it is selected for invocation.
     * @return an observable that emits the result of tasks in the order they finishes.
     */
    public Flux<Indexable> invokeAsync(final InvocationContext context) {
        return Flux.defer(() -> {
            if (proxyTaskGroupWrapper.isActive()) {
                return proxyTaskGroupWrapper.taskGroup().invokeInternAsync(context, true, null);
            } else {
                Set<String> processedKeys = runBeforeGroupInvoke(null);
                if (proxyTaskGroupWrapper.isActive()) {
                    // If proxy got activated after 'runBeforeGroupInvoke()' stage due to the addition of direct
                    // 'postRunDependent's then delegate group invocation to proxy group.
                    //
                    return proxyTaskGroupWrapper.taskGroup().invokeInternAsync(context, true, processedKeys);
                } else {
                    return invokeInternAsync(context, false, null);
                }
            }
        });
    }

    /**
     * Invokes tasks in the group.
     *
     * @return the root result of task group.
     */
    public Mono<Indexable> invokeAsync() {
        return invokeAsync(this.newInvocationContext())
            .then(Mono.defer(() -> {
                if (proxyTaskGroupWrapper.isActive()) {
                    return Mono.just(proxyTaskGroupWrapper.taskGroup().root().taskResult());
                }
                return Mono.just(root().taskResult());
            }));
    }

    /**
     * Invokes dependency tasks in the group, but not.
     *
     * @param context group level shared context that need be passed to invokeAsync(cxt)
     *                method of each task item in the group when it is selected for invocation.
     * @return an observable that emits the result of tasks in the order they finishes.
     */
    public Flux<Indexable> invokeDependencyAsync(final InvocationContext context) {
        context.put(TaskGroup.InvocationContext.KEY_SKIP_TASKS, Collections.singleton(this.key()));
        return Flux.defer(() -> {
            if (proxyTaskGroupWrapper.isActive()) {
                return Flux.error(new IllegalStateException("postRunDependent is not supported"));
            } else {
                Set<String> processedKeys = runBeforeGroupInvoke(null);
                if (proxyTaskGroupWrapper.isActive()) {
                    return Flux.error(new IllegalStateException("postRunDependent is not supported"));
                } else {
                    return invokeInternAsync(context, false, null);
                }
            }
        });
    }

    /**
     * Invokes tasks in the group.
     *
     * @param context group level shared context that need be passed to invokeAsync(cxt)
     *                                   method of each task item in the group when it is selected for invocation.
     * @param shouldRunBeforeGroupInvoke indicate whether to run the 'beforeGroupInvoke' method
     *                                   of each tasks before invoking them
     * @param skipBeforeGroupInvoke the tasks keys for which 'beforeGroupInvoke' should not be called
     *                                   before invoking them
     * @return an observable that emits the result of tasks in the order they finishes.
     */
    private Flux<Indexable> invokeInternAsync(final InvocationContext context,
                                              final boolean shouldRunBeforeGroupInvoke,
                                              final Set<String> skipBeforeGroupInvoke) {
        if (!isPreparer()) {
            return Flux.error(new IllegalStateException(
                "invokeInternAsync(cxt) can be called only from root TaskGroup"));
        }
        this.taskGroupTerminateOnErrorStrategy = context.terminateOnErrorStrategy();
        if (shouldRunBeforeGroupInvoke) {
            // Prepare tasks and queue the ready tasks (terminal tasks with no dependencies)
            //
            this.runBeforeGroupInvoke(skipBeforeGroupInvoke);
        }
        // Runs the ready tasks concurrently
        //
        return this.invokeReadyTasksAsync(context);
    }

    /**
     * Run 'beforeGroupInvoke' method of the tasks in this group. The tasks can use beforeGroupInvoke()
     * method to add additional dependencies or dependents.
     *
     * @param skip the keys of the tasks that are previously processed hence they must be skipped
     * @return the keys of all the tasks those are processed (including previously processed items in skip param)
     */
    private Set<String> runBeforeGroupInvoke(final Set<String> skip) {
        HashSet<String> processedEntryKeys = new HashSet<>();
        if (skip != null) {
            processedEntryKeys.addAll(skip);
        }
        List<TaskGroupEntry<TaskItem>> entries = this.entriesSnapshot();
        boolean hasMoreToProcess;
        // Invokes 'beforeGroupInvoke' on a subset of non-processed tasks in the group.
        // Initially processing is pending on all task items.
        do {
            hasMoreToProcess = false;
            for (TaskGroupEntry<TaskItem> entry : entries) {
                if (!processedEntryKeys.contains(entry.key())) {
                    entry.data().beforeGroupInvoke();
                    processedEntryKeys.add(entry.key());
                }
            }
            int prevSize = entries.size();
            entries = this.entriesSnapshot();
            if (entries.size() > prevSize) {
                // If new task dependencies/dependents added in 'beforeGroupInvoke' then
                // set the flag which indicates another pass is required to 'prepare' new
                // task items
                hasMoreToProcess = true;
            }
        } while (hasMoreToProcess);  // Run another pass if new dependencies/dependents were added in this pass
        super.prepareForEnumeration();
        return processedEntryKeys;
    }

    /**
     * @return list with current task entries in this task group
     */
    private List<TaskGroupEntry<TaskItem>> entriesSnapshot() {
        List<TaskGroupEntry<TaskItem>> entries = new ArrayList<>();
        super.prepareForEnumeration();
        for (TaskGroupEntry<TaskItem> current = super.getNext(); current != null; current = super.getNext()) {
            entries.add(current);
            super.reportCompletion(current);
        }
        return entries;
    }

    /**
     * Invokes the ready tasks.
     *
     * @param context group level shared context that need be passed to
     *                {@link TaskGroupEntry#invokeTaskAsync(boolean, InvocationContext)}
     *                method of each entry in the group when it is selected for execution
     * @return a {@link Flux} that emits the result of tasks in the order they finishes.
     */
    // Due to it takes approximate 3ms in flux for returning, it cannot be guaranteed to return in topological order.
    // One simply fix for guaranteeing the last element could be https://github.com/Azure/azure-sdk-for-java/pull/15074
    @SuppressWarnings({"unchecked", "rawtypes"})
    private Flux<Indexable> invokeReadyTasksAsync(final InvocationContext context) {
        TaskGroupEntry<TaskItem> readyTaskEntry = super.getNext();
        final List<Flux<Indexable>> observables = new ArrayList<>();
        // Enumerate the ready tasks (those with dependencies resolved) and kickoff them concurrently
        //
        while (readyTaskEntry != null) {
            final TaskGroupEntry<TaskItem> currentEntry = readyTaskEntry;
            final TaskItem currentTaskItem = currentEntry.data();
            if (currentTaskItem instanceof ProxyTaskItem) {
                observables.add(invokeAfterPostRunAsync(currentEntry, context));
            } else {
                observables.add(invokeTaskAsync(currentEntry, context));
            }
            readyTaskEntry = super.getNext();
        }
        return Flux.mergeDelayError(32, observables.toArray(new Flux[0]));
    }

    /**
     * Invokes the task stored in the given entry.
     * <p>
     * if the task cannot be invoked because the group marked as cancelled then an observable
     * that emit {@link TaskCancelledException} will be returned.
     *
     * @param entry the entry holding task
     * @param context a group level shared context that is passed to {@link TaskItem#invokeAsync(InvocationContext)}
     *                method of the task item this entry wraps.
     * @return an observable that emits result of task in the given entry and result of subset of tasks which gets
     * scheduled after this task.
     */
    private Flux<Indexable> invokeTaskAsync(final TaskGroupEntry<TaskItem> entry, final InvocationContext context) {
        return Flux.defer(() -> {
            if (isGroupCancelled.get()) {
                // One or more tasks are in faulted state, though this task MAYBE invoked if it does not
                // have faulted tasks as transitive dependencies, we won't do it since group is cancelled
                // due to termination strategy TERMINATE_ON_IN_PROGRESS_TASKS_COMPLETION.
                //
                return processFaultedTaskAsync(entry, taskCancelledException, context);
            } else {
                // Any cached result will be ignored for root resource
                //
                boolean ignoreCachedResult = isRootEntry(entry)
                    || (entry.proxy() != null && isRootEntry(entry.proxy()));

                Mono<Indexable> taskObservable;
                Object skipTasks = context.get(InvocationContext.KEY_SKIP_TASKS);
                if (skipTasks instanceof Set && ((Set) skipTasks).contains(entry.key())) {
                    taskObservable = Mono.just(new VoidIndexable(entry.key()));
                } else {
                    taskObservable = entry.invokeTaskAsync(ignoreCachedResult, context);
                }
                return taskObservable.flatMapMany((indexable) -> Flux.just(indexable),
                    (throwable) -> processFaultedTaskAsync(entry, throwable, context),
                    () -> processCompletedTaskAsync(entry, context));
            }
        });
    }

    /**
     * Invokes the {@link TaskItem#invokeAfterPostRunAsync(boolean)} method of an actual TaskItem
     * if the given entry holds a ProxyTaskItem.
     *
     * @param entry the entry holding a ProxyTaskItem
     * @param context a group level shared context
     * @return An Observable that represents asynchronous work started by
     * {@link TaskItem#invokeAfterPostRunAsync(boolean)} method of actual TaskItem and result of subset
     * of tasks which gets scheduled after proxy task. If group was not in faulted state and
     * {@link TaskItem#invokeAfterPostRunAsync(boolean)} emits no error then stream also includes
     * result produced by actual TaskItem.
     */
    private Flux<Indexable> invokeAfterPostRunAsync(final TaskGroupEntry<TaskItem> entry,
                                                    final InvocationContext context) {
        return Flux.defer(() -> {
            final ProxyTaskItem proxyTaskItem = (ProxyTaskItem) entry.data();
            if (proxyTaskItem == null) {
                return Flux.empty();
            }
            final boolean isFaulted = entry.hasFaultedDescentDependencyTasks() || isGroupCancelled.get();

            return proxyTaskItem.invokeAfterPostRunAsync(isFaulted)
                    .flatMapMany(indexable -> Flux.error(
                        new IllegalStateException("This onNext should never be called")),
                        (error) -> processFaultedTaskAsync(entry, error, context),
                        () -> {
                            if (isFaulted) {
                                if (entry.hasFaultedDescentDependencyTasks()) {
                                    return processFaultedTaskAsync(entry,
                                        new ErroredDependencyTaskException(), context);
                                } else {
                                    return processFaultedTaskAsync(entry, taskCancelledException, context);
                                }
                            } else {
                                return Flux.concat(Flux.just(proxyTaskItem.result()),
                                        processCompletedTaskAsync(entry, context));
                            }
                        });

        });
    }

    /**
     * Handles successful completion of a task.
     * <p>
     * If the task is not root (terminal) task then this kickoff execution of next set of ready tasks
     *
     * @param completedEntry the entry holding completed task
     * @param context the context object shared across all the task entries in this group during execution
     * @return an observable represents asynchronous operation in the next stage
     */
    private Flux<Indexable> processCompletedTaskAsync(final TaskGroupEntry<TaskItem> completedEntry,
                                                      final InvocationContext context) {
        reportCompletion(completedEntry);
        if (isRootEntry(completedEntry)) {
            return Flux.empty();
        } else {
            return invokeReadyTasksAsync(context);
        }
    }

    /**
     * Handles a faulted task.
     *
     * @param faultedEntry the entry holding faulted task
     * @param throwable the reason for fault
     * @param context the context object shared across all the task entries in this group during execution
     * @return an observable represents asynchronous operation in the next stage
     */
    private Flux<Indexable> processFaultedTaskAsync(final TaskGroupEntry<TaskItem> faultedEntry,
                                                    final Throwable throwable,
                                                    final InvocationContext context) {
        markGroupAsCancelledIfTerminationStrategyIsIPTC();
        reportError(faultedEntry, throwable);
        if (isRootEntry(faultedEntry)) {
            if (shouldPropagateException(throwable)) {
                return toErrorObservable(throwable);
            }
            return Flux.empty();
        } else if (shouldPropagateException(throwable)) {
            return Flux.concatDelayError(invokeReadyTasksAsync(context), toErrorObservable(throwable));
        } else {
            return invokeReadyTasksAsync(context);
        }
    }

    /**
     * Mark this TaskGroup as cancelled if the termination strategy associated with the group
     * is {@link TaskGroupTerminateOnErrorStrategy#TERMINATE_ON_IN_PROGRESS_TASKS_COMPLETION}.
     */
    private void markGroupAsCancelledIfTerminationStrategyIsIPTC() {
        this.isGroupCancelled.set(this.taskGroupTerminateOnErrorStrategy
            == TaskGroupTerminateOnErrorStrategy.TERMINATE_ON_IN_PROGRESS_TASKS_COMPLETION);
    }

    /**
     * Check that given entry is the root entry in this group.
     *
     * @param taskGroupEntry the entry
     * @return true if the entry is root entry in the group, false otherwise.
     */
    private boolean isRootEntry(TaskGroupEntry<TaskItem> taskGroupEntry) {
        return isRootNode(taskGroupEntry);
    }

    /**
     * Checks the given throwable needs to be propagated to final stream returned by
     * {@link this#invokeAsync(InvocationContext)} ()} method.
     *
     * @param throwable the exception to check
     * @return true if the throwable needs to be included in the {@link RuntimeException}
     * emitted by the final stream.
     */
    private static boolean shouldPropagateException(Throwable throwable) {
        return (!(throwable instanceof ErroredDependencyTaskException)
                && !(throwable instanceof TaskCancelledException));
    }

    /**
     * Gets the given throwable as observable.
     *
     * @param throwable the throwable to wrap
     * @return observable with throwable wrapped
     */
    private Flux<Indexable> toErrorObservable(Throwable throwable) {
        return Flux.error(throwable);
    }

    /**
     * @return a new clean context instance.
     */
    public InvocationContext newInvocationContext() {
        return new InvocationContext(this);
    }

    /**
     * An interface representing a type composes a TaskGroup.
     */
    public interface HasTaskGroup {
        /**
         * @return Gets the task group.
         */
        TaskGroup taskGroup();
    }

    /**
     * A mutable type that can be used to pass data around task items during the invocation
     * of the TaskGroup.
     */
    public static final class InvocationContext {
        public static final String KEY_SKIP_TASKS = "SKIP_TASKS";

        private final Map<String, Object> properties;
        private final TaskGroup taskGroup;
        private TaskGroupTerminateOnErrorStrategy terminateOnErrorStrategy;
        private final ClientLogger logger = new ClientLogger(this.getClass());

        /**
         * Creates InvocationContext instance.
         *
         * @param taskGroup the task group that uses this context instance.
         */
        private InvocationContext(final TaskGroup taskGroup) {
            this.properties = new ConcurrentHashMap<>();
            this.taskGroup = taskGroup;
        }

        /**
         * @return the TaskGroup this invocation context associated with.
         */
        public TaskGroup taskGroup() {
            return this.taskGroup;
        }

        /**
         * Sets the group termination strategy to use on error.
         *
         * @param strategy the strategy
         * @return the context
         */
        public InvocationContext withTerminateOnErrorStrategy(TaskGroupTerminateOnErrorStrategy strategy) {
            if (this.terminateOnErrorStrategy != null) {
                throw logger.logExceptionAsError(new IllegalStateException(
                    "Termination strategy is already set, it is immutable for a specific context"));
            }
            this.terminateOnErrorStrategy = strategy;
            return this;
        }

        /**
         * @return the termination strategy to use upon error during the current invocation of the TaskGroup.
         */
        public TaskGroupTerminateOnErrorStrategy terminateOnErrorStrategy() {
            if (this.terminateOnErrorStrategy == null) {
                return TaskGroupTerminateOnErrorStrategy.TERMINATE_ON_HITTING_LCA_TASK;
            }
            return this.terminateOnErrorStrategy;
        }

        /**
         * Put a key-value in the context.
         *
         * @param key the key
         * @param value the value
         */
        public void put(String key, Object value) {
            this.properties.put(key, value);
        }

        /**
         * Get a value in the context with the given key.
         *
         * @param key the key
         * @return value with the given key if exists, null otherwise.
         */
        public Object get(String key) {
            return this.properties.get(key);
        }

        /**
         * Check existence of a key in the context.
         *
         * @param key the key
         * @return true if the key exists, false otherwise.
         */
        public boolean hasKey(String key) {
            return this.get(key) != null;
        }
    }

    /**
     * Wrapper type to simplify operations on proxy TaskGroup.
     * <p>
     * A proxy TaskGroup will be activated for a TaskGroup as soon as a "post-run" dependent
     * added to the actual TaskGroup via {@link TaskGroup#addPostRunDependentTaskGroup(TaskGroup)}.
     * "post run" dependents are those TaskGroup which need to be invoked as part of invocation
     * of actual TaskGroup.
     */
    protected static final class ProxyTaskGroupWrapper {
        // The "proxy TaskGroup"
        private TaskGroup proxyTaskGroup;
        // The "actual TaskGroup" for which above TaskGroup act as proxy
        private final TaskGroup actualTaskGroup;

        private final ClientLogger logger = new ClientLogger(this.getClass());

        /**
         * Creates ProxyTaskGroupWrapper.
         *
         * @param actualTaskGroup the actual TaskGroup for which proxy TaskGroup will be enabled
         */
        ProxyTaskGroupWrapper(TaskGroup actualTaskGroup) {
            this.actualTaskGroup = actualTaskGroup;
        }

        /**
         * @return true if the proxy TaskGroup is enabled for original TaskGroup.
         */
        boolean isActive() {
            return this.proxyTaskGroup != null;
        }

        /**
         * @return the wrapped proxy task group.
         */
        TaskGroup taskGroup() {
            return this.proxyTaskGroup;
        }

        /**
         * Add "post-run TaskGroup" for the "actual TaskGroup".
         *
         * @param postRunTaskGroup the dependency TaskGroup.
         */
        void addPostRunTaskGroupForActualTaskGroup(TaskGroup postRunTaskGroup) {
            if (this.proxyTaskGroup == null) {
                this.initProxyTaskGroup();
            }
            postRunTaskGroup.addDependencyGraph(this.actualTaskGroup);
            if (postRunTaskGroup.proxyTaskGroupWrapper.isActive()) {
                this.proxyTaskGroup.addDependencyGraph(postRunTaskGroup.proxyTaskGroupWrapper.proxyTaskGroup);
            } else {
                this.proxyTaskGroup.addDependencyGraph(postRunTaskGroup);
            }
        }

        /**
         * Add a dependent for the proxy TaskGroup.
         *
         * @param dependentTaskGroup the dependent TaskGroup
         */
        void addDependentTaskGroup(TaskGroup dependentTaskGroup) {
            if (this.proxyTaskGroup == null) {
                throw logger.logExceptionAsError(new IllegalStateException(
                    "addDependentTaskGroup() cannot be called in a non-active ProxyTaskGroup"));
            }
            dependentTaskGroup.addDependencyGraph(this.proxyTaskGroup);
        }

        /**
         * Initialize the proxy TaskGroup if not initialized yet.
         */
        private void initProxyTaskGroup() {
            if (this.proxyTaskGroup == null) {
                // Creates proxy TaskGroup with an instance of ProxyTaskItem as root TaskItem which delegates actions on
                // it to "actual TaskGroup"'s root.
                //
                ProxyTaskItem proxyTaskItem = new ProxyTaskItem(this.actualTaskGroup.root().data());
                this.proxyTaskGroup = new TaskGroup("proxy-" + this.actualTaskGroup.root().key(),
                        proxyTaskItem);

                if (this.actualTaskGroup.hasParents()) {
                    // Once "proxy TaskGroup" is enabled, all existing TaskGroups depends on "actual TaskGroup" should
                    // take dependency on "proxy TaskGroup".
                    //
                    String atgRootKey = this.actualTaskGroup.root().key();
                    for (DAGraph<TaskItem, TaskGroupEntry<TaskItem>> parentDAG : this.actualTaskGroup.parentDAGs) {
                        parentDAG.root().removeDependency(atgRootKey);
                        parentDAG.addDependencyGraph(this.proxyTaskGroup);
                    }
                    // re-assigned actual's parents as proxy's parents, so clear actual's parent collection.
                    //
                    this.actualTaskGroup.parentDAGs.clear();
                }
                // "Proxy TaskGroup" takes dependency on "actual TaskGroup"
                //
                this.proxyTaskGroup.addDependencyGraph(this.actualTaskGroup);
                // Add a back reference to "proxy" in actual
                //
                this.actualTaskGroup.rootTaskEntry.setProxy(this.proxyTaskGroup.rootTaskEntry);
            }
        }
    }

    /**
     * A {@link TaskItem} type that act as proxy for another {@link TaskItem}.
     */
    private static final class ProxyTaskItem implements TaskItem {
        private final TaskItem actualTaskItem;

        private ProxyTaskItem(final TaskItem actualTaskItem) {
            this.actualTaskItem = actualTaskItem;
        }

        @Override
        public Indexable result() {
            return actualTaskItem.result();
        }


        @Override
        public void beforeGroupInvoke() {
            // NOP
        }

        @Override
        public boolean isHot() {
            return actualTaskItem.isHot();
        }

        @Override
        public Mono<Indexable> invokeAsync(InvocationContext context) {
            return Mono.just(actualTaskItem.result());
        }

        @Override
        public Mono<Void> invokeAfterPostRunAsync(final boolean isGroupFaulted) {
            if (actualTaskItem.isHot()) {
                return Mono.defer(() ->
                    actualTaskItem.invokeAfterPostRunAsync(isGroupFaulted).subscribeOn(Schedulers.immediate()));
            } else {
                return this.actualTaskItem.invokeAfterPostRunAsync(isGroupFaulted)
                        .subscribeOn(Schedulers.immediate());
            }
        }
    }
}