| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. See License.txt in the project root for license information. |
| | 3 | |
|
| | 4 | | namespace Microsoft.Azure.Batch |
| | 5 | | { |
| | 6 | | using System; |
| | 7 | | using System.Collections.Concurrent; |
| | 8 | | using System.Collections.Generic; |
| | 9 | | using System.Globalization; |
| | 10 | | using System.Linq; |
| | 11 | | using System.Threading; |
| | 12 | | using System.Threading.Tasks; |
| | 13 | | using Microsoft.Azure.Batch.FileStaging; |
| | 14 | | using Models = Microsoft.Azure.Batch.Protocol.Models; |
| | 15 | |
|
| | 16 | | /// <summary> |
| | 17 | | /// Manages the AddTasks workflow, performs multiple CloudTaskAdd requests, and handles retries/exception handling. |
| | 18 | | /// </summary> |
| | 19 | | internal class AddTasksWorkflowManager |
| | 20 | | { |
| | 21 | | private readonly JobOperations _jobOperations; |
| | 22 | | private readonly string _jobId; |
| | 23 | | private readonly ConcurrentQueue<TrackedCloudTask> _remainingTasksToAdd; |
| | 24 | | private readonly List<Func<AddTaskResult, CancellationToken, AddTaskResultStatus>> _addTaskResultHandlerCollecti |
| | 25 | | private readonly BatchClientParallelOptions _parallelOptions; |
| | 26 | | private readonly List<Task> _pendingAsyncOperations; |
| | 27 | | private readonly ConcurrentBag<ConcurrentDictionary<Type, IFileStagingArtifact>> _customerVisibleFileStagingArti |
| | 28 | | private readonly BehaviorManager _behaviorManager; |
| | 29 | | private DateTime _timeToTimeoutAt; |
| | 30 | | private int _hasRun; //Have to use an int because CompareExchange doesn't support bool |
| | 31 | | private int _maxTasks; |
| | 32 | |
|
| | 33 | | private const int HasNotRun = 0; |
| | 34 | | private const int HasRun = 1; |
| | 35 | |
|
| | 36 | | /// <summary> |
| | 37 | | /// Creates the AddTasks workflow manager with the specified arguments. |
| | 38 | | /// </summary> |
| | 39 | | /// <param name="jobOperations"></param> |
| | 40 | | /// <param name="jobId"></param> |
| | 41 | | /// <param name="parallelOptions">The parallel options associated with this operation. If this is null, the def |
| | 42 | | /// <param name="fileStagingArtifacts">File staging artifacts associated with this operation. If the customer d |
| | 43 | | /// <param name="bhMgr">The behavior manager.</param> |
| 7 | 44 | | internal AddTasksWorkflowManager( |
| 7 | 45 | | JobOperations jobOperations, |
| 7 | 46 | | string jobId, |
| 7 | 47 | | BatchClientParallelOptions parallelOptions, |
| 7 | 48 | | ConcurrentBag<ConcurrentDictionary<Type, IFileStagingArtifact>> fileStagingArtifacts, |
| 7 | 49 | | BehaviorManager bhMgr) |
| | 50 | | { |
| | 51 | | // |
| | 52 | | // Setup some defaults for the parameters if they were null |
| | 53 | | // |
| 7 | 54 | | if (parallelOptions == null) |
| | 55 | | { |
| 3 | 56 | | parallelOptions = new BatchClientParallelOptions(); |
| | 57 | | } |
| | 58 | |
|
| | 59 | | // |
| | 60 | | // Set up the data structures associated with this workflow |
| | 61 | | // |
| 7 | 62 | | this._jobOperations = jobOperations; |
| 7 | 63 | | this._jobId = jobId; |
| 7 | 64 | | this._remainingTasksToAdd = new ConcurrentQueue<TrackedCloudTask>(); |
| 7 | 65 | | this._addTaskResultHandlerCollection = new List<Func<AddTaskResult, CancellationToken, AddTaskResultStatus>> |
| 7 | 66 | | this._parallelOptions = parallelOptions; |
| 7 | 67 | | this._pendingAsyncOperations = new List<Task>(); |
| 7 | 68 | | this._customerVisibleFileStagingArtifacts = fileStagingArtifacts ?? new ConcurrentBag<ConcurrentDictionary<T |
| 7 | 69 | | this._behaviorManager = bhMgr; |
| 7 | 70 | | this._maxTasks = Constants.MaxTasksInSingleAddTaskCollectionRequest; |
| 7 | 71 | | this._hasRun = HasNotRun; //Has not run by default |
| | 72 | |
|
| | 73 | | //Read the behavior manager and populate the collection |
| 7 | 74 | | List<AddTaskCollectionResultHandler> behaviorList = this._behaviorManager.GetBehaviors<AddTaskCollectionResu |
| 26 | 75 | | foreach (AddTaskCollectionResultHandler handler in behaviorList) |
| | 76 | | { |
| 6 | 77 | | this._addTaskResultHandlerCollection.Add(handler.ResultHandler); |
| | 78 | | } |
| | 79 | |
|
| | 80 | | //Validation that there is a handler for AddTaskResult |
| 7 | 81 | | if (this._addTaskResultHandlerCollection.Count == 0) |
| | 82 | | { |
| 1 | 83 | | throw new BatchClientException( |
| 1 | 84 | | string.Format(CultureInfo.InvariantCulture, BatchErrorMessages.GeneralBehaviorMissing, typeof(AddTas |
| | 85 | | } |
| 6 | 86 | | } |
| | 87 | |
|
| | 88 | | #region Public methods |
| | 89 | |
|
| | 90 | | public async System.Threading.Tasks.Task AddTasksAsync(IEnumerable<CloudTask> tasksToAdd, TimeSpan? timeout = nu |
| | 91 | | { |
| | 92 | | //Ensure that this object has not already been used |
| 6 | 93 | | int original = Interlocked.CompareExchange(ref this._hasRun, HasRun, HasNotRun); |
| | 94 | |
|
| 6 | 95 | | if (original != HasNotRun) |
| | 96 | | { |
| 0 | 97 | | throw new RunOnceException(string.Format(CultureInfo.InvariantCulture, BatchErrorMessages.CanOnlyBeRunOn |
| | 98 | | } |
| | 99 | |
|
| | 100 | | //Determine what time to timeout at |
| 6 | 101 | | if (timeout != null) |
| | 102 | | { |
| 0 | 103 | | this._timeToTimeoutAt = DateTime.UtcNow + timeout.Value; |
| | 104 | | } |
| | 105 | | else |
| | 106 | | { |
| | 107 | | //TODO: We set this to max value -- maybe in the future we can be a bit more friendly to customers and n |
| 6 | 108 | | this._timeToTimeoutAt = DateTime.MaxValue; |
| | 109 | | } |
| | 110 | |
|
| | 111 | | // |
| | 112 | | // Collect the tasks and add them to the pending queue |
| | 113 | | // |
| | 114 | | //TODO: There is a tension between allowing lazy enumeration of tasksToAdd and performing any validation on |
| | 115 | | //TODO: For now (since most customer implementations are not going to write their own lazy collection) we go |
| | 116 | | //TODO: some input validation... |
| | 117 | |
|
| | 118 | | // |
| | 119 | | // Perform some basic input validation |
| | 120 | | // |
| | 121 | | //TODO: Check no duplicate task names? |
| | 122 | | //TODO: Check the tasks are not children of some other object already |
| | 123 | |
|
| | 124 | | //Enumerate the supplied collection asynchronously if required |
| 6 | 125 | | tasksToAdd = await UtilitiesInternal.EnumerateIfNeededAsync(tasksToAdd, this._parallelOptions.CancellationTo |
| | 126 | |
|
| | 127 | | //TODO: For now perform a copy into a queue -- in the future consider honoring lazy loading and do this late |
| 1817 | 128 | | foreach (CloudTask cloudTask in tasksToAdd) |
| | 129 | | { |
| 903 | 130 | | if (cloudTask == null) |
| | 131 | | { |
| 1 | 132 | | throw new ArgumentNullException(nameof(tasksToAdd), BatchErrorMessages.CollectionMustNotContainNull) |
| | 133 | | } |
| | 134 | |
|
| 902 | 135 | | if (cloudTask.BindingState == BindingState.Bound) |
| | 136 | | { |
| 0 | 137 | | throw UtilitiesInternal.OperationForbiddenOnBoundObjects; |
| | 138 | | } |
| | 139 | |
|
| 902 | 140 | | this._remainingTasksToAdd.Enqueue(new TrackedCloudTask(this._jobId, this._jobOperations, cloudTask)); |
| | 141 | | } |
| | 142 | |
|
| | 143 | | // |
| | 144 | | // Fire the requests |
| | 145 | | // |
| 16 | 146 | | while (!this.IsWorkflowDone()) |
| | 147 | | { |
| | 148 | | //Wait for an open request "slot" (an existing request to complete) if: |
| | 149 | | //1. We have no free request slots |
| | 150 | | //2. We have no more tasks to add and there are ongoing pending operations which could result in task ad |
| 16 | 151 | | bool noFreeSlots = this._pendingAsyncOperations.Count >= this._parallelOptions.MaxDegreeOfParallelism; |
| 16 | 152 | | bool noTasksToAddButSomePendingLegsRemain = this._remainingTasksToAdd.Count == 0 && this._pendingAsyncOp |
| 16 | 153 | | if (noFreeSlots || noTasksToAddButSomePendingLegsRemain) |
| | 154 | | { |
| 6 | 155 | | await this.ProcessPendingOperationResults().ConfigureAwait(continueOnCapturedContext: false); |
| | 156 | | } |
| | 157 | |
|
| | 158 | | //If we get here, we are starting a single new leg. Another iteration of this loop will get to any task |
| | 159 | |
|
| | 160 | | //Add any tasks (up to max count in 1 request) which are remaining since we have an open parallel slot |
| 11 | 161 | | Dictionary<string, TrackedCloudTask> nameToTaskMapping = new Dictionary<string, TrackedCloudTask>(); |
| | 162 | |
|
| | 163 | | //Attempt to take some items from the set of tasks remaining to be added and prepare it to be added |
| | 164 | | TrackedCloudTask taskToAdd; |
| 11 | 165 | | int tmpMaxTasks = this._maxTasks; |
| 913 | 166 | | while (nameToTaskMapping.Count < tmpMaxTasks && this._remainingTasksToAdd.TryDequeue(out taskToAdd)) |
| | 167 | | { |
| 902 | 168 | | nameToTaskMapping.Add(taskToAdd.Task.Id, taskToAdd); |
| | 169 | | } |
| | 170 | |
|
| 11 | 171 | | if (nameToTaskMapping.Count > 0) |
| | 172 | | { |
| | 173 | | //Start the async operation to stage the files (if required) and issue the protocol add task request |
| 10 | 174 | | Task asyncTask = this.StageFilesAndAddTasks( |
| 10 | 175 | | nameToTaskMapping, |
| 10 | 176 | | null); |
| | 177 | |
|
| | 178 | | //Add the request to the operation tracker |
| 10 | 179 | | this._pendingAsyncOperations.Add(asyncTask); |
| | 180 | | } |
| | 181 | | } |
| | 182 | |
|
| | 183 | | //If we reach here, we have succeeded - yay! |
| 0 | 184 | | } |
| | 185 | |
|
| | 186 | | #endregion |
| | 187 | |
|
| | 188 | | #region Private methods |
| | 189 | |
|
| | 190 | | /// <summary> |
| | 191 | | /// Checks for operation cancelation or timeout, and throws the corresponding exception. |
| | 192 | | /// </summary> |
| | 193 | | private void CheckForCancellationOrTimeoutAndThrow() |
| | 194 | | { |
| | 195 | | //We always throw when cancelation is requested |
| 20 | 196 | | this._parallelOptions.CancellationToken.ThrowIfCancellationRequested(); |
| | 197 | |
|
| | 198 | |
|
| 20 | 199 | | DateTime currentTime = DateTime.UtcNow; |
| | 200 | |
|
| 20 | 201 | | if (currentTime > this._timeToTimeoutAt) |
| | 202 | | { |
| 0 | 203 | | throw new TimeoutException(); |
| | 204 | | } |
| 20 | 205 | | } |
| | 206 | |
|
| | 207 | | /// <summary> |
| | 208 | | /// Determines if the workflow has finished or not. |
| | 209 | | /// </summary> |
| | 210 | | /// <returns>True if the workflow has successfully completed, false if it has not.</returns> |
| | 211 | | private bool IsWorkflowDone() |
| | 212 | | { |
| 16 | 213 | | return !(this._remainingTasksToAdd.Count > 0 || this._pendingAsyncOperations.Count > 0); |
| | 214 | | } |
| | 215 | |
|
| | 216 | | /// <summary> |
| | 217 | | /// Performs file staging and also issues the AddTaskCollection request for the set of tasks to add. |
| | 218 | | /// </summary> |
| | 219 | | /// <param name="tasksToAdd">The set of tasks to add.</param> |
| | 220 | | /// <param name="namingFragment"></param> |
| | 221 | | /// <returns></returns> |
| | 222 | | private async Task StageFilesAndAddTasks( |
| | 223 | | Dictionary<string, TrackedCloudTask> tasksToAdd, |
| | 224 | | string namingFragment) |
| | 225 | | { |
| 10 | 226 | | List<Models.TaskAddParameter> protoTasksToAdd = new List<Models.TaskAddParameter>(); |
| | 227 | |
|
| 10 | 228 | | this.CheckForCancellationOrTimeoutAndThrow(); |
| | 229 | |
|
| | 230 | | // |
| | 231 | | // Perform file staging |
| | 232 | | // |
| | 233 | |
|
| | 234 | | // list of all files to be staged across all Tasks |
| 10 | 235 | | List<IFileStagingProvider> allFiles = new List<IFileStagingProvider>(); |
| | 236 | |
|
| | 237 | | // collect all files to be staged |
| 1824 | 238 | | foreach (TrackedCloudTask trackedCloudTask in tasksToAdd.Values) |
| | 239 | | { |
| 902 | 240 | | if (trackedCloudTask.Task.FilesToStage != null) |
| | 241 | | { |
| | 242 | | // add in the files for the current task |
| 0 | 243 | | allFiles.AddRange(trackedCloudTask.Task.FilesToStage); |
| | 244 | | } |
| | 245 | | } |
| | 246 | |
|
| | 247 | | //This dictonary is only for the purpose of this batch add |
| 10 | 248 | | ConcurrentDictionary<Type, IFileStagingArtifact> legStagingArtifacts = new ConcurrentDictionary<Type, IFileS |
| | 249 | |
|
| | 250 | | //Add the file staging artifacts for this let to the overall bag so as to allow customers to track the file |
| 10 | 251 | | this._customerVisibleFileStagingArtifacts.Add(legStagingArtifacts); |
| | 252 | |
|
| | 253 | | // now we have all files, send off to file staging machine |
| 10 | 254 | | System.Threading.Tasks.Task fileStagingTask = FileStagingUtils.StageFilesAsync(allFiles, legStagingArtifacts |
| | 255 | |
|
| | 256 | | // wait for file staging async task |
| 10 | 257 | | await fileStagingTask.ConfigureAwait(continueOnCapturedContext: false); |
| | 258 | |
|
| | 259 | | // now update each non-finalized Task with its new ResourceFiles |
| 1824 | 260 | | foreach (TrackedCloudTask taskToAdd in tasksToAdd.Values) |
| | 261 | | { |
| | 262 | | //Update the resource files if the task hasn't already been finalized |
| 902 | 263 | | if (taskToAdd.Task.FilesToStage != null) |
| | 264 | | { |
| 0 | 265 | | foreach (IFileStagingProvider curFile in taskToAdd.Task.FilesToStage) |
| | 266 | | { |
| 0 | 267 | | IEnumerable<ResourceFile> curStagedFiles = curFile.StagedFiles; |
| | 268 | |
|
| 0 | 269 | | if (null != curStagedFiles && !((IReadOnly)taskToAdd.Task).IsReadOnly) |
| | 270 | | { |
| | 271 | | //TODO: There is a threading issue here -- lock this property down somehow? |
| 0 | 272 | | if (taskToAdd.Task.ResourceFiles == null) |
| | 273 | | { |
| 0 | 274 | | taskToAdd.Task.ResourceFiles = new List<ResourceFile>(); |
| | 275 | | } |
| | 276 | |
|
| 0 | 277 | | foreach (ResourceFile curStagedFile in curStagedFiles) |
| | 278 | | { |
| 0 | 279 | | taskToAdd.Task.ResourceFiles.Add(curStagedFile); |
| | 280 | | } |
| | 281 | | } |
| | 282 | | } |
| | 283 | |
|
| | 284 | | //Mark the file staging collection as read only just incase there's another reference to it |
| 0 | 285 | | ConcurrentChangeTrackedList<IFileStagingProvider> filesToStageListImpl = |
| 0 | 286 | | taskToAdd.Task.FilesToStage as ConcurrentChangeTrackedList<IFileStagingProvider>; |
| | 287 | |
|
| 0 | 288 | | filesToStageListImpl.IsReadOnly = true; //Set read only |
| | 289 | | } |
| | 290 | |
|
| 902 | 291 | | Models.TaskAddParameter protoTask = taskToAdd.GetProtocolTask(); |
| 902 | 292 | | protoTasksToAdd.Add(protoTask); |
| | 293 | | } |
| | 294 | |
|
| 10 | 295 | | this.CheckForCancellationOrTimeoutAndThrow(); |
| | 296 | |
|
| | 297 | | // |
| | 298 | | // Fire the protocol add collection request |
| | 299 | | // |
| | 300 | | try |
| | 301 | | { |
| 10 | 302 | | var asyncTask = this._jobOperations.ParentBatchClient.ProtocolLayer.AddTaskCollection( |
| 10 | 303 | | this._jobId, |
| 10 | 304 | | protoTasksToAdd, |
| 10 | 305 | | this._behaviorManager, |
| 10 | 306 | | this._parallelOptions.CancellationToken); |
| | 307 | |
|
| 10 | 308 | | var response = await asyncTask.ConfigureAwait(continueOnCapturedContext: false); |
| | 309 | | // |
| | 310 | | // Process the results of the add task collection request |
| | 311 | | // |
| 3 | 312 | | this.ProcessProtocolAddTaskResults(response.Body.Value, tasksToAdd); |
| 2 | 313 | | } |
| 1 | 314 | | catch(Common.BatchException e) |
| | 315 | | { |
| 1 | 316 | | if (e.InnerException is Models.BatchErrorException) |
| | 317 | | { |
| 0 | 318 | | Models.BatchError error = ((Models.BatchErrorException)e.InnerException).Body; |
| 0 | 319 | | int currLength = tasksToAdd.Count; |
| 0 | 320 | | if (error.Code == Common.BatchErrorCodeStrings.RequestBodyTooLarge && currLength != 1) |
| | 321 | | { |
| | 322 | | // Our chunk sizes were too large to fit in a request, so universally reduce size |
| | 323 | | // This is an internal error due to us using greedy initial maximum chunk size, |
| | 324 | | // so do not increment retry counter. |
| | 325 | | { |
| 0 | 326 | | int newLength = currLength / 2; |
| 0 | 327 | | int tmpMaxTasks = this._maxTasks; |
| 0 | 328 | | while (newLength < tmpMaxTasks) |
| | 329 | | { |
| 0 | 330 | | tmpMaxTasks = Interlocked.CompareExchange(ref this._maxTasks, newLength, tmpMaxTasks); |
| | 331 | | } |
| 0 | 332 | | foreach (TrackedCloudTask trackedTask in tasksToAdd.Values) |
| | 333 | | { |
| 0 | 334 | | this._remainingTasksToAdd.Enqueue(trackedTask); |
| | 335 | | } |
| 0 | 336 | | return; |
| | 337 | | } |
| | 338 | | } |
| | 339 | | } |
| 1 | 340 | | throw; |
| | 341 | | } |
| 2 | 342 | | } |
| | 343 | |
|
| | 344 | | /// <summary> |
| | 345 | | /// Processes a set AddTaskResults from the protocol and groups them according to the results of the AddTaskResu |
| | 346 | | /// </summary> |
| | 347 | | /// <param name="addTaskResults"></param> |
| | 348 | | /// <param name="taskMap">Dictionary of task name to task object instance for the specific protocol response.</p |
| | 349 | | private void ProcessProtocolAddTaskResults( |
| | 350 | | IEnumerable<Protocol.Models.TaskAddResult> addTaskResults, |
| | 351 | | IReadOnlyDictionary<string, TrackedCloudTask> taskMap) |
| | 352 | | { |
| 9 | 353 | | foreach (Protocol.Models.TaskAddResult protoAddTaskResult in addTaskResults) |
| | 354 | | { |
| 2 | 355 | | string taskId = protoAddTaskResult.TaskId; |
| 2 | 356 | | TrackedCloudTask trackedTask = taskMap[taskId]; |
| | 357 | |
|
| 2 | 358 | | AddTaskResult omResult = new AddTaskResult(trackedTask.Task, trackedTask.RetryCount, protoAddTaskResult) |
| | 359 | |
|
| | 360 | | //We know that there must be at least one AddTaskResultHandler so the below ForEach will always be calle |
| | 361 | | //at least once. |
| 2 | 362 | | AddTaskResultStatus status = AddTaskResultStatus.Success; //The default is success to avoid infinite ret |
| | 363 | |
|
| | 364 | | //Call the customer defined result handler |
| 7 | 365 | | foreach (var resultHandlerFunction in this._addTaskResultHandlerCollection) |
| | 366 | | { |
| 2 | 367 | | status = resultHandlerFunction(omResult, this._parallelOptions.CancellationToken); |
| | 368 | | } |
| | 369 | |
|
| 1 | 370 | | if (status == AddTaskResultStatus.Retry) |
| | 371 | | { |
| | 372 | | //Increment retry count |
| 0 | 373 | | trackedTask.IncrementRetryCount(); |
| | 374 | |
|
| | 375 | | //TODO: There is nothing stopping the user from marking all tasks as Retry and never exiting this wo |
| | 376 | | //TODO: In that case maybe we should forcibly abort them after some # of attempts? |
| 0 | 377 | | this._remainingTasksToAdd.Enqueue(trackedTask); |
| | 378 | | } |
| | 379 | | } |
| 2 | 380 | | } |
| | 381 | |
|
| | 382 | | /// <summary> |
| | 383 | | /// Waits for a pending operation to complete and throws if the operation failed. |
| | 384 | | /// </summary> |
| | 385 | | /// <returns></returns> |
| | 386 | | private async Task ProcessPendingOperationResults() |
| | 387 | | { |
| | 388 | | //Wait for any task to complete |
| 6 | 389 | | Task completedTask = await Task.WhenAny(this._pendingAsyncOperations).ConfigureAwait(continueOnCapturedConte |
| | 390 | |
|
| | 391 | | //Check for a task failure -- if there is one, we a-wait for all remaining tasks to complete (this will thro |
| 6 | 392 | | if (completedTask.IsFaulted) |
| | 393 | | { |
| 4 | 394 | | await WaitForTasksAndThrowParallelOperationsExceptionAsync(this._pendingAsyncOperations).ConfigureAwait( |
| | 395 | | } |
| | 396 | | else |
| | 397 | | { |
| 2 | 398 | | await completedTask.ConfigureAwait(continueOnCapturedContext: false); //This await should finish immedia |
| | 399 | |
|
| | 400 | | //Remove the task which completed |
| 1 | 401 | | this._pendingAsyncOperations.Remove(completedTask); |
| | 402 | | } |
| 1 | 403 | | } |
| | 404 | |
|
| | 405 | | private static async Task WaitForTasksAndThrowParallelOperationsExceptionAsync(List<Task> tasks) |
| | 406 | | { |
| | 407 | | //We know that this will throw, but we want to catch the exception so that we can provide a better aggregate |
| | 408 | | try |
| | 409 | | { |
| | 410 | | //NOTE: This try block should only do a WhenAll and nothing else, since the exception thrown in this try |
| 4 | 411 | | await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext: false); //This will throw and termin |
| 0 | 412 | | } |
| 4 | 413 | | catch (Exception) |
| | 414 | | { |
| | 415 | | //Swallow the exception and throw a new one |
| | 416 | |
|
| 17 | 417 | | IEnumerable<Exception> exceptions = tasks.Where(t => t.IsFaulted).SelectMany(t => t.Exception.InnerExcep |
| 4 | 418 | | throw new ParallelOperationsException(BatchErrorMessages.MultipleParallelRequestsHitUnexpectedErrors, ex |
| | 419 | | } |
| 0 | 420 | | } |
| | 421 | |
|
| | 422 | | #endregion |
| | 423 | |
|
| | 424 | | #region Private classes |
| | 425 | |
|
| | 426 | | /// <summary> |
| | 427 | | /// Internal task wrapper which tracks a tasks retry count and holds a reference to the protocol object and Clou |
| | 428 | | /// </summary> |
| | 429 | | private class TrackedCloudTask |
| | 430 | | { |
| 0 | 431 | | public string JobId { get; private set; } |
| 5414 | 432 | | public CloudTask Task { get; private set; } |
| 904 | 433 | | public int RetryCount { get; private set; } |
| 0 | 434 | | public JobOperations JobOperations { get; private set; } |
| | 435 | |
|
| 2706 | 436 | | private Models.TaskAddParameter ProtocolTask { get; set; } |
| | 437 | |
|
| 902 | 438 | | internal TrackedCloudTask( |
| 902 | 439 | | string jobId, |
| 902 | 440 | | JobOperations jobOperations, |
| 902 | 441 | | CloudTask cloudTask) |
| | 442 | | { |
| 902 | 443 | | this.Task = cloudTask; |
| 902 | 444 | | this.JobId = jobId; |
| 902 | 445 | | this.JobOperations = jobOperations; // matt-c: do we really need one of these in every instance? Even |
| 902 | 446 | | this.RetryCount = 0; |
| 902 | 447 | | } |
| | 448 | |
|
| | 449 | | public void IncrementRetryCount() |
| | 450 | | { |
| 0 | 451 | | this.RetryCount++; |
| 0 | 452 | | } |
| | 453 | |
|
| | 454 | | /// <summary> |
| | 455 | | /// Gets the finalized version of this task. Also internally updates this.Task to refer to a bound read-onl |
| | 456 | | /// CloudTask. Subsequent calls to this method will return a reference to the same object |
| | 457 | | /// </summary> |
| | 458 | | /// <returns></returns> |
| | 459 | | public Models.TaskAddParameter GetProtocolTask() |
| | 460 | | { |
| 902 | 461 | | if (this.ProtocolTask == null) |
| | 462 | | { |
| 902 | 463 | | this.ProtocolTask = this.Task.GetTransportObject(); |
| 902 | 464 | | this.Task.Freeze(); //Mark the underlying task readonly |
| | 465 | | } |
| | 466 | |
|
| 902 | 467 | | return this.ProtocolTask; |
| | 468 | | } |
| | 469 | | } |
| | 470 | |
|
| | 471 | | #endregion |
| | 472 | |
|
| | 473 | | } |
| | 474 | | } |