|  |  | 1 |  | // Copyright (c) Microsoft Corporation. All rights reserved. | 
|  |  | 2 |  | // Licensed under the MIT License. See License.txt in the project root for license information. | 
|  |  | 3 |  |  | 
|  |  | 4 |  | using System; | 
|  |  | 5 |  | using System.Collections.Concurrent; | 
|  |  | 6 |  | using System.Collections.Generic; | 
|  |  | 7 |  | using System.Diagnostics; | 
|  |  | 8 |  | using System.Linq; | 
|  |  | 9 |  | using System.Runtime.InteropServices; | 
|  |  | 10 |  | using System.Security; | 
|  |  | 11 |  | using System.Text; | 
|  |  | 12 |  | using System.Threading.Tasks; | 
|  |  | 13 |  |  | 
|  |  | 14 |  | namespace Microsoft.Azure.Batch.FileStaging | 
|  |  | 15 |  | { | 
|  |  | 16 |  |     internal static class FileStagingUtils | 
|  |  | 17 |  |     { | 
|  |  | 18 |  |         /// <summary> | 
|  |  | 19 |  |         /// Creates buckets for the FileStagingProviders. | 
|  |  | 20 |  |         /// </summary> | 
|  |  | 21 |  |         /// <returns></returns> | 
|  |  | 22 |  |         private static Dictionary<Type, List<IFileStagingProvider>> BucketizeFileStagingProviders(List<IFileStagingProvi | 
|  |  | 23 |  |         { | 
|  | 18 | 24 |  |             Dictionary<Type, List<IFileStagingProvider>> bucketizedProviders = new Dictionary<Type, List<IFileStagingPro | 
|  |  | 25 |  |  | 
|  |  | 26 |  |             // walk all files and create buckets and populate them. | 
|  | 0 | 27 |  |             foreach (IFileStagingProvider curFSP in filesToStage) | 
|  |  | 28 |  |             { | 
|  | 0 | 29 |  |                 Type curType = curFSP.GetType(); | 
|  |  | 30 |  |                 List<IFileStagingProvider> foundFileStagingProvider; | 
|  |  | 31 |  |  | 
|  |  | 32 |  |                 // if no bucket exists create one and register it | 
|  | 0 | 33 |  |                 if (!bucketizedProviders.TryGetValue(curType, out foundFileStagingProvider)) | 
|  |  | 34 |  |                 { | 
|  | 0 | 35 |  |                     foundFileStagingProvider = new List<IFileStagingProvider>(); | 
|  |  | 36 |  |  | 
|  |  | 37 |  |                     // one more bucket | 
|  | 0 | 38 |  |                     bucketizedProviders.Add(curType, foundFileStagingProvider); | 
|  |  | 39 |  |                 } | 
|  |  | 40 |  |  | 
|  |  | 41 |  |                 // bucket has one more file | 
|  | 0 | 42 |  |                 foundFileStagingProvider.Add(curFSP); | 
|  |  | 43 |  |             } | 
|  |  | 44 |  |  | 
|  | 18 | 45 |  |             return bucketizedProviders; | 
|  |  | 46 |  |         } | 
|  |  | 47 |  |  | 
|  |  | 48 |  |         internal static async Task StageFilesAsync(List<IFileStagingProvider> filesToStage, ConcurrentDictionary<Type, I | 
|  |  | 49 |  |         { | 
|  | 8 | 50 |  |             Task asyncTask = StageFilesAsync(filesToStage, allFileStagingArtifacts, string.Empty); | 
|  | 8 | 51 |  |             await asyncTask.ConfigureAwait(continueOnCapturedContext: false); | 
|  | 8 | 52 |  |         } | 
|  |  | 53 |  |  | 
|  |  | 54 |  |         internal static async Task StageFilesAsync(List<IFileStagingProvider> filesToStage, ConcurrentDictionary<Type, I | 
|  |  | 55 |  |         { | 
|  |  | 56 |  |             try | 
|  |  | 57 |  |             { | 
|  | 18 | 58 |  |                 if (null == allFileStagingArtifacts) | 
|  |  | 59 |  |                 { | 
|  | 0 | 60 |  |                     throw new ArgumentNullException(nameof(allFileStagingArtifacts)); | 
|  |  | 61 |  |                 } | 
|  |  | 62 |  |  | 
|  | 18 | 63 |  |                 if (allFileStagingArtifacts.Count > 0) | 
|  |  | 64 |  |                 { | 
|  | 0 | 65 |  |                     throw new ArgumentOutOfRangeException(nameof(allFileStagingArtifacts)); | 
|  |  | 66 |  |                 } | 
|  |  | 67 |  |  | 
|  |  | 68 |  |                 // first we get the buckets.  One for each file staging provider that contains only the files for that p | 
|  | 18 | 69 |  |                 Dictionary<Type, List<IFileStagingProvider>> bucketByProviders = BucketizeFileStagingProviders(filesToSt | 
|  |  | 70 |  |  | 
|  |  | 71 |  |                 // missing artifacts will be instantiated and stored here temporarily | 
|  | 18 | 72 |  |                 Dictionary<Type, IFileStagingArtifact> pendingArtifactsToAdd = new Dictionary<Type, IFileStagingArtifact | 
|  |  | 73 |  |  | 
|  |  | 74 |  |                 // detect any missing staging artifacts.  Each bucket must have a staging artifact. | 
|  | 0 | 75 |  |                 foreach (Type curProviderType in bucketByProviders.Keys) | 
|  |  | 76 |  |                 { | 
|  |  | 77 |  |                     IFileStagingArtifact curProviderArtifact; | 
|  |  | 78 |  |  | 
|  |  | 79 |  |                     // if no artifact was passed in, instantiate one and have it added | 
|  | 0 | 80 |  |                     if (!allFileStagingArtifacts.TryGetValue(curProviderType, out curProviderArtifact)) | 
|  |  | 81 |  |                     { | 
|  |  | 82 |  |                         // we need to have the staging provider create an artifact instance | 
|  |  | 83 |  |                         // so first we retrieve the list of files and ask one of them | 
|  |  | 84 |  |                         List<IFileStagingProvider> filesForProviderType; | 
|  |  | 85 |  |  | 
|  | 0 | 86 |  |                         if (bucketByProviders.TryGetValue(curProviderType, out filesForProviderType)) | 
|  |  | 87 |  |                         { | 
|  |  | 88 |  |                             Debug.Assert(filesForProviderType.Count > 0); // to be in a bucket means there must be at le | 
|  |  | 89 |  |  | 
|  | 0 | 90 |  |                             IFileStagingProvider curProviderAsInterface = filesForProviderType[0]; | 
|  |  | 91 |  |  | 
|  | 0 | 92 |  |                             IFileStagingArtifact newArtifactFreshFromProvider = curProviderAsInterface.CreateStagingArti | 
|  |  | 93 |  |  | 
|  |  | 94 |  |                             // give the file stager the naming fragment if it does not already have one by default | 
|  | 0 | 95 |  |                             if (string.IsNullOrEmpty(newArtifactFreshFromProvider.NamingFragment) && !string.IsNullOrEmp | 
|  |  | 96 |  |                             { | 
|  | 0 | 97 |  |                                 newArtifactFreshFromProvider.NamingFragment = namingFragment; | 
|  |  | 98 |  |                             } | 
|  |  | 99 |  |  | 
|  | 0 | 100 |  |                             pendingArtifactsToAdd.Add(curProviderType, newArtifactFreshFromProvider); | 
|  |  | 101 |  |                         } | 
|  |  | 102 |  |                     } | 
|  |  | 103 |  |                 } | 
|  |  | 104 |  |  | 
|  |  | 105 |  |                 // add missing artifacts to collection | 
|  | 0 | 106 |  |                 foreach (Type curProvderType in pendingArtifactsToAdd.Keys) | 
|  |  | 107 |  |                 { | 
|  |  | 108 |  |                     IFileStagingArtifact curArtifact; | 
|  |  | 109 |  |  | 
|  | 0 | 110 |  |                     if (pendingArtifactsToAdd.TryGetValue(curProvderType, out curArtifact)) | 
|  |  | 111 |  |                     { | 
|  | 0 | 112 |  |                         allFileStagingArtifacts.TryAdd(curProvderType, curArtifact); | 
|  |  | 113 |  |                     } | 
|  |  | 114 |  |                 } | 
|  |  | 115 |  |  | 
|  |  | 116 |  |                 // now we have buckets of files for each provider and artifacts for each provider | 
|  |  | 117 |  |                 // start tasks for each provider | 
|  |  | 118 |  |  | 
|  |  | 119 |  |                 // list of all running providers | 
|  | 18 | 120 |  |                 List<Task> runningProviders = new List<Task>(); | 
|  |  | 121 |  |  | 
|  |  | 122 |  |                 // start a task for each FileStagingProvider | 
|  | 0 | 123 |  |                 foreach (List<IFileStagingProvider> curProviderFilesToStage in bucketByProviders.Values) | 
|  |  | 124 |  |                 { | 
|  |  | 125 |  |                     Debug.Assert(curProviderFilesToStage.Count > 0); | 
|  |  | 126 |  |  | 
|  | 0 | 127 |  |                     IFileStagingProvider anyInstance = curProviderFilesToStage[0];  // had to be at least one to get her | 
|  |  | 128 |  |                     Task providerTask;  // this is the async task for this provider | 
|  |  | 129 |  |                     IFileStagingArtifact stagingArtifact; // artifact for this provider | 
|  |  | 130 |  |  | 
|  | 0 | 131 |  |                     if (allFileStagingArtifacts.TryGetValue(anyInstance.GetType(), out stagingArtifact))  // register th | 
|  |  | 132 |  |                     { | 
|  | 0 | 133 |  |                         providerTask = anyInstance.StageFilesAsync(curProviderFilesToStage, stagingArtifact); | 
|  |  | 134 |  |  | 
|  | 0 | 135 |  |                         runningProviders.Add(providerTask); | 
|  |  | 136 |  |                     } | 
|  |  | 137 |  |                     else | 
|  |  | 138 |  |                     { | 
|  |  | 139 |  |                         Debug.Assert(true, "The staging artifacts collection is somehow missing an artifact for " + anyI | 
|  |  | 140 |  |                     } | 
|  |  | 141 |  |                 } | 
|  |  | 142 |  |  | 
|  |  | 143 |  |                 // | 
|  |  | 144 |  |                 // the individual tasks were created above | 
|  |  | 145 |  |                 // now a-wait for them all to finish | 
|  |  | 146 |  |                 // | 
|  | 18 | 147 |  |                 Task[] runningArray = runningProviders.ToArray(); | 
|  |  | 148 |  |  | 
|  | 18 | 149 |  |                 Task allRunningTasks = Task.WhenAll(runningArray); | 
|  |  | 150 |  |  | 
|  |  | 151 |  |                 // actual a-wait for all the providers | 
|  | 18 | 152 |  |                 await allRunningTasks.ConfigureAwait(continueOnCapturedContext: false); | 
|  | 18 | 153 |  |             } | 
|  | 0 | 154 |  |             catch (Exception ex) | 
|  |  | 155 |  |             { | 
|  | 0 | 156 |  |                 if (null != ex) | 
|  |  | 157 |  |                 { | 
|  | 0 | 158 |  |                     throw; // TODO:  trace here? | 
|  |  | 159 |  |                 } | 
|  | 0 | 160 |  |             } | 
|  | 18 | 161 |  |         } | 
|  |  | 162 |  |     } | 
|  |  | 163 |  | } |