< Summary

Class:Microsoft.Azure.Batch.AddTasksWorkflowManager
Assembly:Microsoft.Azure.Batch
File(s):C:\Git\azure-sdk-for-net\sdk\batch\Microsoft.Azure.Batch\src\AddTasksWorkflowManager.cs
Covered lines:115
Uncovered lines:34
Coverable lines:149
Total lines:474
Line coverage:77.1% (115 of 149)
Covered branches:47
Total branches:72
Branch coverage:65.2% (47 of 72)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-100%100%
AddTasksAsync()-85.71%81.82%
CheckForCancellationOrTimeoutAndThrow()-80%50%
IsWorkflowDone()-100%100%
StageFilesAndAddTasks()-55.32%32.14%
ProcessProtocolAddTaskResults(...)-81.82%83.33%
ProcessPendingOperationResults()-100%100%
WaitForTasksAndThrowParallelOperationsExceptionAsync()-66.67%100%
get_JobId()-0%100%
get_Task()-100%100%
get_RetryCount()-100%100%
get_JobOperations()-0%100%
get_ProtocolTask()-100%100%
.ctor(...)-100%100%
IncrementRetryCount()-0%100%
GetProtocolTask()-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\batch\Microsoft.Azure.Batch\src\AddTasksWorkflowManager.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License. See License.txt in the project root for license information.
 3
 4namespace Microsoft.Azure.Batch
 5{
 6    using System;
 7    using System.Collections.Concurrent;
 8    using System.Collections.Generic;
 9    using System.Globalization;
 10    using System.Linq;
 11    using System.Threading;
 12    using System.Threading.Tasks;
 13    using Microsoft.Azure.Batch.FileStaging;
 14    using Models = Microsoft.Azure.Batch.Protocol.Models;
 15
 16    /// <summary>
 17    /// Manages the AddTasks workflow, performs multiple CloudTaskAdd requests, and handles retries/exception handling.
 18    /// </summary>
 19    internal class AddTasksWorkflowManager
 20    {
 21        private readonly JobOperations _jobOperations;
 22        private readonly string _jobId;
 23        private readonly ConcurrentQueue<TrackedCloudTask> _remainingTasksToAdd;
 24        private readonly List<Func<AddTaskResult, CancellationToken, AddTaskResultStatus>> _addTaskResultHandlerCollecti
 25        private readonly BatchClientParallelOptions _parallelOptions;
 26        private readonly List<Task> _pendingAsyncOperations;
 27        private readonly ConcurrentBag<ConcurrentDictionary<Type, IFileStagingArtifact>> _customerVisibleFileStagingArti
 28        private readonly BehaviorManager _behaviorManager;
 29        private DateTime _timeToTimeoutAt;
 30        private int _hasRun; //Have to use an int because CompareExchange doesn't support bool
 31        private int _maxTasks;
 32
 33        private const int HasNotRun = 0;
 34        private const int HasRun = 1;
 35
 36        /// <summary>
 37        /// Creates the AddTasks workflow manager with the specified arguments.
 38        /// </summary>
 39        /// <param name="jobOperations"></param>
 40        /// <param name="jobId"></param>
 41        /// <param name="parallelOptions">The parallel options associated with this operation.  If this is null, the def
 42        /// <param name="fileStagingArtifacts">File staging artifacts associated with this operation.  If the customer d
 43        /// <param name="bhMgr">The behavior manager.</param>
 744        internal AddTasksWorkflowManager(
 745            JobOperations jobOperations,
 746            string jobId,
 747            BatchClientParallelOptions parallelOptions,
 748            ConcurrentBag<ConcurrentDictionary<Type, IFileStagingArtifact>> fileStagingArtifacts,
 749            BehaviorManager bhMgr)
 50        {
 51            //
 52            // Setup some defaults for the parameters if they were null
 53            //
 754            if (parallelOptions == null)
 55            {
 356                parallelOptions = new BatchClientParallelOptions();
 57            }
 58
 59            //
 60            // Set up the data structures associated with this workflow
 61            //
 762            this._jobOperations = jobOperations;
 763            this._jobId = jobId;
 764            this._remainingTasksToAdd = new ConcurrentQueue<TrackedCloudTask>();
 765            this._addTaskResultHandlerCollection = new List<Func<AddTaskResult, CancellationToken, AddTaskResultStatus>>
 766            this._parallelOptions = parallelOptions;
 767            this._pendingAsyncOperations = new List<Task>();
 768            this._customerVisibleFileStagingArtifacts = fileStagingArtifacts ?? new ConcurrentBag<ConcurrentDictionary<T
 769            this._behaviorManager = bhMgr;
 770            this._maxTasks = Constants.MaxTasksInSingleAddTaskCollectionRequest;
 771            this._hasRun = HasNotRun; //Has not run by default
 72
 73            //Read the behavior manager and populate the collection
 774            List<AddTaskCollectionResultHandler> behaviorList = this._behaviorManager.GetBehaviors<AddTaskCollectionResu
 2675            foreach (AddTaskCollectionResultHandler handler in behaviorList)
 76            {
 677                this._addTaskResultHandlerCollection.Add(handler.ResultHandler);
 78            }
 79
 80            //Validation that there is a handler for AddTaskResult
 781            if (this._addTaskResultHandlerCollection.Count == 0)
 82            {
 183                throw new BatchClientException(
 184                    string.Format(CultureInfo.InvariantCulture, BatchErrorMessages.GeneralBehaviorMissing, typeof(AddTas
 85            }
 686        }
 87
 88#region Public methods
 89
 90        public async System.Threading.Tasks.Task AddTasksAsync(IEnumerable<CloudTask> tasksToAdd, TimeSpan? timeout = nu
 91        {
 92            //Ensure that this object has not already been used
 693            int original = Interlocked.CompareExchange(ref this._hasRun, HasRun, HasNotRun);
 94
 695            if (original != HasNotRun)
 96            {
 097                throw new RunOnceException(string.Format(CultureInfo.InvariantCulture, BatchErrorMessages.CanOnlyBeRunOn
 98            }
 99
 100            //Determine what time to timeout at
 6101            if (timeout != null)
 102            {
 0103                this._timeToTimeoutAt = DateTime.UtcNow + timeout.Value;
 104            }
 105            else
 106            {
 107                //TODO: We set this to max value -- maybe in the future we can be a bit more friendly to customers and n
 6108                this._timeToTimeoutAt = DateTime.MaxValue;
 109            }
 110
 111            //
 112            // Collect the tasks and add them to the pending queue
 113            //
 114            //TODO: There is a tension between allowing lazy enumeration of tasksToAdd and performing any validation on 
 115            //TODO: For now (since most customer implementations are not going to write their own lazy collection) we go
 116            //TODO: some input validation...
 117
 118            //
 119            // Perform some basic input validation
 120            //
 121            //TODO: Check no duplicate task names?
 122            //TODO: Check the tasks are not children of some other object already
 123
 124            //Enumerate the supplied collection asynchronously if required
 6125            tasksToAdd = await UtilitiesInternal.EnumerateIfNeededAsync(tasksToAdd, this._parallelOptions.CancellationTo
 126
 127            //TODO: For now perform a copy into a queue -- in the future consider honoring lazy loading and do this late
 1817128            foreach (CloudTask cloudTask in tasksToAdd)
 129            {
 903130                if (cloudTask == null)
 131                {
 1132                    throw new ArgumentNullException(nameof(tasksToAdd), BatchErrorMessages.CollectionMustNotContainNull)
 133                }
 134
 902135                if (cloudTask.BindingState == BindingState.Bound)
 136                {
 0137                    throw UtilitiesInternal.OperationForbiddenOnBoundObjects;
 138                }
 139
 902140                this._remainingTasksToAdd.Enqueue(new TrackedCloudTask(this._jobId, this._jobOperations, cloudTask));
 141            }
 142
 143            //
 144            // Fire the requests
 145            //
 16146            while (!this.IsWorkflowDone())
 147            {
 148                //Wait for an open request "slot" (an existing request to complete) if:
 149                //1. We have no free request slots
 150                //2. We have no more tasks to add and there are ongoing pending operations which could result in task ad
 16151                bool noFreeSlots = this._pendingAsyncOperations.Count >= this._parallelOptions.MaxDegreeOfParallelism;
 16152                bool noTasksToAddButSomePendingLegsRemain = this._remainingTasksToAdd.Count == 0 && this._pendingAsyncOp
 16153                if (noFreeSlots || noTasksToAddButSomePendingLegsRemain)
 154                {
 6155                    await this.ProcessPendingOperationResults().ConfigureAwait(continueOnCapturedContext: false);
 156                }
 157
 158                //If we get here, we are starting a single new leg.  Another iteration of this loop will get to any task
 159
 160                //Add any tasks (up to max count in 1 request) which are remaining since we have an open parallel slot
 11161                Dictionary<string, TrackedCloudTask> nameToTaskMapping = new Dictionary<string, TrackedCloudTask>();
 162
 163                //Attempt to take some items from the set of tasks remaining to be added and prepare it to be added
 164                TrackedCloudTask taskToAdd;
 11165                int tmpMaxTasks = this._maxTasks;
 913166                while (nameToTaskMapping.Count < tmpMaxTasks && this._remainingTasksToAdd.TryDequeue(out taskToAdd))
 167                {
 902168                    nameToTaskMapping.Add(taskToAdd.Task.Id, taskToAdd);
 169                }
 170
 11171                if (nameToTaskMapping.Count > 0)
 172                {
 173                    //Start the async operation to stage the files (if required) and issue the protocol add task request
 10174                    Task asyncTask = this.StageFilesAndAddTasks(
 10175                        nameToTaskMapping,
 10176                        null);
 177
 178                    //Add the request to the operation tracker
 10179                    this._pendingAsyncOperations.Add(asyncTask);
 180                }
 181            }
 182
 183            //If we reach here, we have succeeded - yay!
 0184        }
 185
 186#endregion
 187
 188#region Private methods
 189
 190        /// <summary>
 191        /// Checks for operation cancelation or timeout, and throws the corresponding exception.
 192        /// </summary>
 193        private void CheckForCancellationOrTimeoutAndThrow()
 194        {
 195            //We always throw when cancelation is requested
 20196            this._parallelOptions.CancellationToken.ThrowIfCancellationRequested();
 197
 198
 20199            DateTime currentTime = DateTime.UtcNow;
 200
 20201            if (currentTime > this._timeToTimeoutAt)
 202            {
 0203                throw new TimeoutException();
 204            }
 20205        }
 206
 207        /// <summary>
 208        /// Determines if the workflow has finished or not.
 209        /// </summary>
 210        /// <returns>True if the workflow has successfully completed, false if it has not.</returns>
 211        private bool IsWorkflowDone()
 212        {
 16213            return !(this._remainingTasksToAdd.Count > 0 || this._pendingAsyncOperations.Count > 0);
 214        }
 215
 216        /// <summary>
 217        /// Performs file staging and also issues the AddTaskCollection request for the set of tasks to add.
 218        /// </summary>
 219        /// <param name="tasksToAdd">The set of tasks to add.</param>
 220        /// <param name="namingFragment"></param>
 221        /// <returns></returns>
 222        private async Task StageFilesAndAddTasks(
 223            Dictionary<string, TrackedCloudTask> tasksToAdd,
 224            string namingFragment)
 225        {
 10226            List<Models.TaskAddParameter> protoTasksToAdd = new List<Models.TaskAddParameter>();
 227
 10228            this.CheckForCancellationOrTimeoutAndThrow();
 229
 230            //
 231            // Perform file staging
 232            //
 233
 234            // list of all files to be staged across all Tasks
 10235            List<IFileStagingProvider> allFiles = new List<IFileStagingProvider>();
 236
 237            // collect all files to be staged
 1824238            foreach (TrackedCloudTask trackedCloudTask in tasksToAdd.Values)
 239            {
 902240                if (trackedCloudTask.Task.FilesToStage != null)
 241                {
 242                    // add in the files for the current task
 0243                    allFiles.AddRange(trackedCloudTask.Task.FilesToStage);
 244                }
 245            }
 246
 247            //This dictonary is only for the purpose of this batch add
 10248            ConcurrentDictionary<Type, IFileStagingArtifact> legStagingArtifacts = new ConcurrentDictionary<Type, IFileS
 249
 250            //Add the file staging artifacts for this let to the overall bag so as to allow customers to track the file 
 10251            this._customerVisibleFileStagingArtifacts.Add(legStagingArtifacts);
 252
 253            // now we have all files, send off to file staging machine
 10254            System.Threading.Tasks.Task fileStagingTask = FileStagingUtils.StageFilesAsync(allFiles, legStagingArtifacts
 255
 256            // wait for file staging async task
 10257            await fileStagingTask.ConfigureAwait(continueOnCapturedContext: false);
 258
 259            // now update each non-finalized Task with its new ResourceFiles
 1824260            foreach (TrackedCloudTask taskToAdd in tasksToAdd.Values)
 261            {
 262                //Update the resource files if the task hasn't already been finalized
 902263                if (taskToAdd.Task.FilesToStage != null)
 264                {
 0265                    foreach (IFileStagingProvider curFile in taskToAdd.Task.FilesToStage)
 266                    {
 0267                        IEnumerable<ResourceFile> curStagedFiles = curFile.StagedFiles;
 268
 0269                        if (null != curStagedFiles && !((IReadOnly)taskToAdd.Task).IsReadOnly)
 270                        {
 271                            //TODO: There is a threading issue here -- lock this property down somehow?
 0272                            if (taskToAdd.Task.ResourceFiles == null)
 273                            {
 0274                                taskToAdd.Task.ResourceFiles = new List<ResourceFile>();
 275                            }
 276
 0277                            foreach (ResourceFile curStagedFile in curStagedFiles)
 278                            {
 0279                                taskToAdd.Task.ResourceFiles.Add(curStagedFile);
 280                            }
 281                        }
 282                    }
 283
 284                    //Mark the file staging collection as read only just incase there's another reference to it
 0285                    ConcurrentChangeTrackedList<IFileStagingProvider> filesToStageListImpl =
 0286                        taskToAdd.Task.FilesToStage as ConcurrentChangeTrackedList<IFileStagingProvider>;
 287
 0288                    filesToStageListImpl.IsReadOnly = true; //Set read only
 289                }
 290
 902291                Models.TaskAddParameter protoTask = taskToAdd.GetProtocolTask();
 902292                protoTasksToAdd.Add(protoTask);
 293            }
 294
 10295            this.CheckForCancellationOrTimeoutAndThrow();
 296
 297            //
 298            // Fire the protocol add collection request
 299            //
 300            try
 301            {
 10302                var asyncTask = this._jobOperations.ParentBatchClient.ProtocolLayer.AddTaskCollection(
 10303                    this._jobId,
 10304                    protoTasksToAdd,
 10305                    this._behaviorManager,
 10306                    this._parallelOptions.CancellationToken);
 307
 10308                var response = await asyncTask.ConfigureAwait(continueOnCapturedContext: false);
 309                //
 310                // Process the results of the add task collection request
 311                //
 3312                this.ProcessProtocolAddTaskResults(response.Body.Value, tasksToAdd);
 2313            }
 1314            catch(Common.BatchException e)
 315            {
 1316                if (e.InnerException is Models.BatchErrorException)
 317                {
 0318                    Models.BatchError error = ((Models.BatchErrorException)e.InnerException).Body;
 0319                    int currLength = tasksToAdd.Count;
 0320                    if (error.Code == Common.BatchErrorCodeStrings.RequestBodyTooLarge && currLength != 1)
 321                    {
 322                        // Our chunk sizes were too large to fit in a request, so universally reduce size
 323                        // This is an internal error due to us using greedy initial maximum chunk size,
 324                        //   so do not increment retry counter.
 325                        {
 0326                            int newLength = currLength / 2;
 0327                            int tmpMaxTasks = this._maxTasks;
 0328                            while (newLength < tmpMaxTasks)
 329                            {
 0330                                tmpMaxTasks = Interlocked.CompareExchange(ref this._maxTasks, newLength, tmpMaxTasks);
 331                            }
 0332                            foreach (TrackedCloudTask trackedTask in tasksToAdd.Values)
 333                            {
 0334                                this._remainingTasksToAdd.Enqueue(trackedTask);
 335                            }
 0336                            return;
 337                        }
 338                    }
 339                }
 1340                throw;
 341            }
 2342        }
 343
 344        /// <summary>
 345        /// Processes a set AddTaskResults from the protocol and groups them according to the results of the AddTaskResu
 346        /// </summary>
 347        /// <param name="addTaskResults"></param>
 348        /// <param name="taskMap">Dictionary of task name to task object instance for the specific protocol response.</p
 349        private void ProcessProtocolAddTaskResults(
 350            IEnumerable<Protocol.Models.TaskAddResult> addTaskResults,
 351            IReadOnlyDictionary<string, TrackedCloudTask> taskMap)
 352        {
 9353            foreach (Protocol.Models.TaskAddResult protoAddTaskResult in addTaskResults)
 354            {
 2355                string taskId = protoAddTaskResult.TaskId;
 2356                TrackedCloudTask trackedTask = taskMap[taskId];
 357
 2358                AddTaskResult omResult = new AddTaskResult(trackedTask.Task, trackedTask.RetryCount, protoAddTaskResult)
 359
 360                //We know that there must be at least one AddTaskResultHandler so the below ForEach will always be calle
 361                //at least once.
 2362                AddTaskResultStatus status = AddTaskResultStatus.Success; //The default is success to avoid infinite ret
 363
 364                //Call the customer defined result handler
 7365                foreach (var resultHandlerFunction in this._addTaskResultHandlerCollection)
 366                {
 2367                    status = resultHandlerFunction(omResult, this._parallelOptions.CancellationToken);
 368                }
 369
 1370                if (status == AddTaskResultStatus.Retry)
 371                {
 372                    //Increment retry count
 0373                    trackedTask.IncrementRetryCount();
 374
 375                    //TODO: There is nothing stopping the user from marking all tasks as Retry and never exiting this wo
 376                    //TODO: In that case maybe we should forcibly abort them after some # of attempts?
 0377                    this._remainingTasksToAdd.Enqueue(trackedTask);
 378                }
 379            }
 2380        }
 381
 382        /// <summary>
 383        /// Waits for a pending operation to complete and throws if the operation failed.
 384        /// </summary>
 385        /// <returns></returns>
 386        private async Task ProcessPendingOperationResults()
 387        {
 388            //Wait for any task to complete
 6389            Task completedTask = await Task.WhenAny(this._pendingAsyncOperations).ConfigureAwait(continueOnCapturedConte
 390
 391            //Check for a task failure -- if there is one, we a-wait for all remaining tasks to complete (this will thro
 6392            if (completedTask.IsFaulted)
 393            {
 4394                await WaitForTasksAndThrowParallelOperationsExceptionAsync(this._pendingAsyncOperations).ConfigureAwait(
 395            }
 396            else
 397            {
 2398                await completedTask.ConfigureAwait(continueOnCapturedContext: false); //This await should finish immedia
 399
 400                //Remove the task which completed
 1401                this._pendingAsyncOperations.Remove(completedTask);
 402            }
 1403        }
 404
 405        private static async Task WaitForTasksAndThrowParallelOperationsExceptionAsync(List<Task> tasks)
 406        {
 407            //We know that this will throw, but we want to catch the exception so that we can provide a better aggregate
 408            try
 409            {
 410                //NOTE: This try block should only do a WhenAll and nothing else, since the exception thrown in this try
 4411                await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext: false); //This will throw and termin
 0412            }
 4413            catch (Exception)
 414            {
 415                //Swallow the exception and throw a new one
 416
 17417                IEnumerable<Exception> exceptions = tasks.Where(t => t.IsFaulted).SelectMany(t => t.Exception.InnerExcep
 4418                throw new ParallelOperationsException(BatchErrorMessages.MultipleParallelRequestsHitUnexpectedErrors, ex
 419            }
 0420        }
 421
 422#endregion
 423
 424#region Private classes
 425
 426        /// <summary>
 427        /// Internal task wrapper which tracks a tasks retry count and holds a reference to the protocol object and Clou
 428        /// </summary>
 429        private class TrackedCloudTask
 430        {
 0431            public string JobId { get; private set; }
 5414432            public CloudTask Task { get; private set; }
 904433            public int RetryCount { get; private set; }
 0434            public JobOperations JobOperations { get; private set; }
 435
 2706436            private Models.TaskAddParameter ProtocolTask { get; set; }
 437
 902438            internal TrackedCloudTask(
 902439                string jobId,
 902440                JobOperations jobOperations,
 902441                CloudTask cloudTask)
 442            {
 902443                this.Task = cloudTask;
 902444                this.JobId = jobId;
 902445                this.JobOperations = jobOperations;  // matt-c: do we really need one of these in every instance?  Even 
 902446                this.RetryCount = 0;
 902447            }
 448
 449            public void IncrementRetryCount()
 450            {
 0451                this.RetryCount++;
 0452            }
 453
 454            /// <summary>
 455            /// Gets the finalized version of this task.  Also internally updates this.Task to refer to a bound read-onl
 456            /// CloudTask.  Subsequent calls to this method will return a reference to the same object
 457            /// </summary>
 458            /// <returns></returns>
 459            public Models.TaskAddParameter GetProtocolTask()
 460            {
 902461                if (this.ProtocolTask == null)
 462                {
 902463                    this.ProtocolTask = this.Task.GetTransportObject();
 902464                    this.Task.Freeze(); //Mark the underlying task readonly
 465                }
 466
 902467                return this.ProtocolTask;
 468            }
 469        }
 470
 471#endregion
 472
 473    }
 474}