| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. See License.txt in the project root for license information. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Concurrent; |
| | 6 | | using System.Collections.Generic; |
| | 7 | | using System.Diagnostics; |
| | 8 | | using System.Linq; |
| | 9 | | using System.Runtime.InteropServices; |
| | 10 | | using System.Security; |
| | 11 | | using System.Text; |
| | 12 | | using System.Threading.Tasks; |
| | 13 | |
|
| | 14 | | namespace Microsoft.Azure.Batch.FileStaging |
| | 15 | | { |
| | 16 | | internal static class FileStagingUtils |
| | 17 | | { |
| | 18 | | /// <summary> |
| | 19 | | /// Creates buckets for the FileStagingProviders. |
| | 20 | | /// </summary> |
| | 21 | | /// <returns></returns> |
| | 22 | | private static Dictionary<Type, List<IFileStagingProvider>> BucketizeFileStagingProviders(List<IFileStagingProvi |
| | 23 | | { |
| 18 | 24 | | Dictionary<Type, List<IFileStagingProvider>> bucketizedProviders = new Dictionary<Type, List<IFileStagingPro |
| | 25 | |
|
| | 26 | | // walk all files and create buckets and populate them. |
| 0 | 27 | | foreach (IFileStagingProvider curFSP in filesToStage) |
| | 28 | | { |
| 0 | 29 | | Type curType = curFSP.GetType(); |
| | 30 | | List<IFileStagingProvider> foundFileStagingProvider; |
| | 31 | |
|
| | 32 | | // if no bucket exists create one and register it |
| 0 | 33 | | if (!bucketizedProviders.TryGetValue(curType, out foundFileStagingProvider)) |
| | 34 | | { |
| 0 | 35 | | foundFileStagingProvider = new List<IFileStagingProvider>(); |
| | 36 | |
|
| | 37 | | // one more bucket |
| 0 | 38 | | bucketizedProviders.Add(curType, foundFileStagingProvider); |
| | 39 | | } |
| | 40 | |
|
| | 41 | | // bucket has one more file |
| 0 | 42 | | foundFileStagingProvider.Add(curFSP); |
| | 43 | | } |
| | 44 | |
|
| 18 | 45 | | return bucketizedProviders; |
| | 46 | | } |
| | 47 | |
|
| | 48 | | internal static async Task StageFilesAsync(List<IFileStagingProvider> filesToStage, ConcurrentDictionary<Type, I |
| | 49 | | { |
| 8 | 50 | | Task asyncTask = StageFilesAsync(filesToStage, allFileStagingArtifacts, string.Empty); |
| 8 | 51 | | await asyncTask.ConfigureAwait(continueOnCapturedContext: false); |
| 8 | 52 | | } |
| | 53 | |
|
| | 54 | | internal static async Task StageFilesAsync(List<IFileStagingProvider> filesToStage, ConcurrentDictionary<Type, I |
| | 55 | | { |
| | 56 | | try |
| | 57 | | { |
| 18 | 58 | | if (null == allFileStagingArtifacts) |
| | 59 | | { |
| 0 | 60 | | throw new ArgumentNullException(nameof(allFileStagingArtifacts)); |
| | 61 | | } |
| | 62 | |
|
| 18 | 63 | | if (allFileStagingArtifacts.Count > 0) |
| | 64 | | { |
| 0 | 65 | | throw new ArgumentOutOfRangeException(nameof(allFileStagingArtifacts)); |
| | 66 | | } |
| | 67 | |
|
| | 68 | | // first we get the buckets. One for each file staging provider that contains only the files for that p |
| 18 | 69 | | Dictionary<Type, List<IFileStagingProvider>> bucketByProviders = BucketizeFileStagingProviders(filesToSt |
| | 70 | |
|
| | 71 | | // missing artifacts will be instantiated and stored here temporarily |
| 18 | 72 | | Dictionary<Type, IFileStagingArtifact> pendingArtifactsToAdd = new Dictionary<Type, IFileStagingArtifact |
| | 73 | |
|
| | 74 | | // detect any missing staging artifacts. Each bucket must have a staging artifact. |
| 0 | 75 | | foreach (Type curProviderType in bucketByProviders.Keys) |
| | 76 | | { |
| | 77 | | IFileStagingArtifact curProviderArtifact; |
| | 78 | |
|
| | 79 | | // if no artifact was passed in, instantiate one and have it added |
| 0 | 80 | | if (!allFileStagingArtifacts.TryGetValue(curProviderType, out curProviderArtifact)) |
| | 81 | | { |
| | 82 | | // we need to have the staging provider create an artifact instance |
| | 83 | | // so first we retrieve the list of files and ask one of them |
| | 84 | | List<IFileStagingProvider> filesForProviderType; |
| | 85 | |
|
| 0 | 86 | | if (bucketByProviders.TryGetValue(curProviderType, out filesForProviderType)) |
| | 87 | | { |
| | 88 | | Debug.Assert(filesForProviderType.Count > 0); // to be in a bucket means there must be at le |
| | 89 | |
|
| 0 | 90 | | IFileStagingProvider curProviderAsInterface = filesForProviderType[0]; |
| | 91 | |
|
| 0 | 92 | | IFileStagingArtifact newArtifactFreshFromProvider = curProviderAsInterface.CreateStagingArti |
| | 93 | |
|
| | 94 | | // give the file stager the naming fragment if it does not already have one by default |
| 0 | 95 | | if (string.IsNullOrEmpty(newArtifactFreshFromProvider.NamingFragment) && !string.IsNullOrEmp |
| | 96 | | { |
| 0 | 97 | | newArtifactFreshFromProvider.NamingFragment = namingFragment; |
| | 98 | | } |
| | 99 | |
|
| 0 | 100 | | pendingArtifactsToAdd.Add(curProviderType, newArtifactFreshFromProvider); |
| | 101 | | } |
| | 102 | | } |
| | 103 | | } |
| | 104 | |
|
| | 105 | | // add missing artifacts to collection |
| 0 | 106 | | foreach (Type curProvderType in pendingArtifactsToAdd.Keys) |
| | 107 | | { |
| | 108 | | IFileStagingArtifact curArtifact; |
| | 109 | |
|
| 0 | 110 | | if (pendingArtifactsToAdd.TryGetValue(curProvderType, out curArtifact)) |
| | 111 | | { |
| 0 | 112 | | allFileStagingArtifacts.TryAdd(curProvderType, curArtifact); |
| | 113 | | } |
| | 114 | | } |
| | 115 | |
|
| | 116 | | // now we have buckets of files for each provider and artifacts for each provider |
| | 117 | | // start tasks for each provider |
| | 118 | |
|
| | 119 | | // list of all running providers |
| 18 | 120 | | List<Task> runningProviders = new List<Task>(); |
| | 121 | |
|
| | 122 | | // start a task for each FileStagingProvider |
| 0 | 123 | | foreach (List<IFileStagingProvider> curProviderFilesToStage in bucketByProviders.Values) |
| | 124 | | { |
| | 125 | | Debug.Assert(curProviderFilesToStage.Count > 0); |
| | 126 | |
|
| 0 | 127 | | IFileStagingProvider anyInstance = curProviderFilesToStage[0]; // had to be at least one to get her |
| | 128 | | Task providerTask; // this is the async task for this provider |
| | 129 | | IFileStagingArtifact stagingArtifact; // artifact for this provider |
| | 130 | |
|
| 0 | 131 | | if (allFileStagingArtifacts.TryGetValue(anyInstance.GetType(), out stagingArtifact)) // register th |
| | 132 | | { |
| 0 | 133 | | providerTask = anyInstance.StageFilesAsync(curProviderFilesToStage, stagingArtifact); |
| | 134 | |
|
| 0 | 135 | | runningProviders.Add(providerTask); |
| | 136 | | } |
| | 137 | | else |
| | 138 | | { |
| | 139 | | Debug.Assert(true, "The staging artifacts collection is somehow missing an artifact for " + anyI |
| | 140 | | } |
| | 141 | | } |
| | 142 | |
|
| | 143 | | // |
| | 144 | | // the individual tasks were created above |
| | 145 | | // now a-wait for them all to finish |
| | 146 | | // |
| 18 | 147 | | Task[] runningArray = runningProviders.ToArray(); |
| | 148 | |
|
| 18 | 149 | | Task allRunningTasks = Task.WhenAll(runningArray); |
| | 150 | |
|
| | 151 | | // actual a-wait for all the providers |
| 18 | 152 | | await allRunningTasks.ConfigureAwait(continueOnCapturedContext: false); |
| 18 | 153 | | } |
| 0 | 154 | | catch (Exception ex) |
| | 155 | | { |
| 0 | 156 | | if (null != ex) |
| | 157 | | { |
| 0 | 158 | | throw; // TODO: trace here? |
| | 159 | | } |
| 0 | 160 | | } |
| 18 | 161 | | } |
| | 162 | | } |
| | 163 | | } |