< Summary

Class:Microsoft.Azure.Batch.FileStaging.FileToStage
Assembly:Microsoft.Azure.Batch.FileStaging
File(s):C:\Git\azure-sdk-for-net\sdk\batch\Microsoft.Azure.Batch.FileStaging\src\SequentialFileStaging.cs
Covered lines:0
Uncovered lines:142
Coverable lines:142
Total lines:440
Line coverage:0% (0 of 142)
Covered branches:0
Total branches:62
Branch coverage:0% (0 of 62)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_LocalFileToStage()-0%100%
set_LocalFileToStage(...)-0%100%
get_NodeFileName()-0%100%
set_NodeFileName(...)-0%100%
get_StagedFiles()-0%100%
set_StagedFiles(...)-0%100%
get_Exception()-0%100%
set_Exception(...)-0%100%
.ctor()-0%100%
.ctor(...)-0%0%
StageFilesAsync()-0%100%
CreateStagingArtifact()-0%100%
Validate()-0%0%
get_StagingStorageAccount()-0%100%
ConstructBlobSource(...)-0%0%
CreateContainerWithPolicySASIfNotExist(...)-0%0%
CreateDefaultBlobContainerAndSASIfNeededReturn(...)-0%0%
FindAtLeastOne(...)-0%0%
StageFilesInternalAsync()-0%0%
StageFilesAsync()-0%0%
StageOneFileAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\batch\Microsoft.Azure.Batch.FileStaging\src\SequentialFileStaging.cs

#LineLine coverage
 1// Copyright (c) Microsoft and contributors.  All rights reserved.
 2//
 3// Licensed under the Apache License, Version 2.0 (the "License");
 4// you may not use this file except in compliance with the License.
 5// You may obtain a copy of the License at
 6// http://www.apache.org/licenses/LICENSE-2.0
 7//
 8// Unless required by applicable law or agreed to in writing, software
 9// distributed under the License is distributed on an "AS IS" BASIS,
 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 11//
 12// See the License for the specific language governing permissions and
 13// limitations under the License.
 14
 15using System;
 16using System.Collections.Generic;
 17using System.IO;
 18using Microsoft.WindowsAzure.Storage;
 19using Microsoft.WindowsAzure.Storage.Blob;
 20
 21
 22namespace Microsoft.Azure.Batch.FileStaging
 23{
 24    /// <summary>
 25    /// Provides for file staging of a local file to blob storage.
 26    /// </summary>
 27    public sealed class FileToStage : IFileStagingProvider
 28    {
 29        /// <summary>
 30        /// The name of the local file to stage to blob storage
 31        /// </summary>
 32        public string LocalFileToStage
 33        {
 034            get;
 035            internal set;
 36        }
 37
 38        /// <summary>
 39        /// The target filename, on the compute node, to which the blob contents will be downloaded.
 40        /// </summary>
 41        public string NodeFileName
 42        {
 043            get;
 044            internal set;
 45        }
 46
 47        /// <summary>
 48        /// The instances of ResourcesFile for the staged local file.
 49        /// For this implementation, successful file staging of this object will
 50        /// result in a collection with only one entry.
 51        /// </summary>
 52        public IEnumerable<ResourceFile> StagedFiles
 53        {
 054            get;
 055            internal set;
 56        }
 57
 58        /// <summary>
 59        /// The exception, if any, caught while attempting to stage this file.
 60        /// </summary>
 61        public Exception Exception
 62        {
 063            get;
 064            internal set;
 65        }
 66
 67#region constructors
 68
 069        private FileToStage()
 70        {
 071        }
 72
 73        /// <summary>
 74        /// Specifies that a local file should be staged to blob storage.
 75        /// The specified account will be charged for storage costs.
 76        /// </summary>
 77        /// <param name="localFileToStage">The name of the local file.</param>
 78        /// <param name="storageCredentials">The storage credentials to be used when creating the default container.</pa
 79        /// <param name="nodeFileName">Optional name to be given to the file on the compute node.  If this parameter is 
 80        /// the name on the compute node will be set to the value of localFileToStage stripped of all path information.<
 081        public FileToStage(string localFileToStage, StagingStorageAccount storageCredentials, string nodeFileName = null
 82        {
 083            this.LocalFileToStage = localFileToStage;
 084            this.StagingStorageAccount = storageCredentials;
 85
 086            if (string.IsNullOrWhiteSpace(this.LocalFileToStage))
 87            {
 088                throw new ArgumentOutOfRangeException("localFileToStage");
 89            }
 90
 91            // map null to base name of local file
 092            if (string.IsNullOrWhiteSpace(nodeFileName))
 93            {
 094                this.NodeFileName = Path.GetFileName(this.LocalFileToStage);
 95            }
 96            else
 97            {
 098                this.NodeFileName = nodeFileName;
 99            }
 0100        }
 101
 102#endregion // constructors
 103
 104#region // IFileStagingProvider
 105
 106        /// <summary>
 107        /// See <see cref="IFileStagingProvider.StageFilesAsync"/>.
 108        /// </summary>
 109        /// <param name="filesToStage">The instances of IFileStagingProvider to stage.</param>
 110        /// <param name="fileStagingArtifact">IFileStagingProvider specific staging artifacts including error/progress.<
 111        /// <returns>A <see cref="System.Threading.Tasks.Task"/> object that represents the asynchronous operation.</ret
 112        public async System.Threading.Tasks.Task StageFilesAsync(List<IFileStagingProvider> filesToStage, IFileStagingAr
 113        {
 0114            System.Threading.Tasks.Task taskForStaticStaging = FileToStage.StageFilesInternalAsync(filesToStage, fileSta
 115
 0116            await taskForStaticStaging.ConfigureAwait(continueOnCapturedContext: false);
 117
 0118            return;
 0119        }
 120
 121        /// <summary>
 122        /// See <see cref="IFileStagingProvider.CreateStagingArtifact"/>.
 123        /// </summary>
 124        /// <returns>An instance of IFileStagingArtifact with default values.</returns>
 125        public IFileStagingArtifact CreateStagingArtifact()
 126        {
 0127            return new SequentialFileStagingArtifact() as IFileStagingArtifact;
 128        }
 129
 130        /// <summary>
 131        /// See <see cref="IFileStagingProvider.Validate"/>.
 132        /// </summary>
 133        public void Validate()
 134        {
 0135            if (!File.Exists(this.LocalFileToStage))
 136            {
 0137                throw new FileNotFoundException(string.Format(ErrorMessages.FileStagingLocalFileNotFound, this.LocalFile
 138            }
 0139        }
 140
 141#endregion // IFileStagingProvier
 142
 143#region internal/private
 144
 145        // the staging code needs to get the secrets
 0146        internal StagingStorageAccount StagingStorageAccount { get; set; }
 147
 148        /// <summary>
 149        /// combine container and blob into an URL.
 150        /// </summary>
 151        /// <param name="container">container url</param>
 152        /// <param name="blob">blob url</param>
 153        /// <returns>full url</returns>
 154        private static string ConstructBlobSource(string container, string blob)
 155        {
 0156            int index = container.IndexOf("?");
 157
 0158            if (index != -1)
 159            {
 160                //SAS
 0161                string containerAbsoluteUrl = container.Substring(0, index);
 0162                return containerAbsoluteUrl + "/" + blob + container.Substring(index);
 163            }
 164            else
 165            {
 0166                return container + "/" + blob;
 167            }
 168        }
 169
 170        /// <summary>
 171        /// create a container if doesn't exist, setting permission with policy, and return assosciated SAS signature
 172        /// </summary>
 173        /// <param name="account">storage account</param>
 174        /// <param name="key">storage key</param>
 175        /// <param name="container">container to be created</param>
 176        /// <param name="policy">name for the policy</param>
 177        /// <param name="start">start time of the policy</param>
 178        /// <param name="end">expire time of the policy</param>
 179        /// <param name="permissions">permission on the name</param>
 180        /// <param name="blobUri">blob URI</param>
 181        /// <returns>the SAS for the container, in full URI format.</returns>
 182        private static string CreateContainerWithPolicySASIfNotExist(string account, string key, Uri blobUri, string con
 183        {
 184            // 1. form the credentail and initial client
 0185            CloudStorageAccount storageaccount = new CloudStorageAccount(new WindowsAzure.Storage.Auth.StorageCredential
 0186                                                                         blobEndpoint: blobUri,
 0187                                                                         queueEndpoint: null,
 0188                                                                         tableEndpoint: null,
 0189                                                                         fileEndpoint: null);
 190
 0191            CloudBlobClient client = storageaccount.CreateCloudBlobClient();
 192
 193            // 2. create container if it doesn't exist
 0194            CloudBlobContainer storagecontainer = client.GetContainerReference(container);
 0195            storagecontainer.CreateIfNotExistsAsync().GetAwaiter().GetResult();
 196
 197            // 3. validate policy, create/overwrite if doesn't match
 0198            bool policyFound = false;
 199
 0200            SharedAccessBlobPolicy accesspolicy = new SharedAccessBlobPolicy()
 0201            {
 0202                SharedAccessExpiryTime = end,
 0203                SharedAccessStartTime = start,
 0204                Permissions = permissions
 0205            };
 206
 0207            BlobContainerPermissions blobPermissions = storagecontainer.GetPermissionsAsync().GetAwaiter().GetResult();
 208
 0209            if (blobPermissions.SharedAccessPolicies.ContainsKey(policy))
 210            {
 0211                SharedAccessBlobPolicy containerpolicy = blobPermissions.SharedAccessPolicies[policy];
 0212                if (!(permissions == (containerpolicy.Permissions & permissions) && start <= containerpolicy.SharedAcces
 213                {
 0214                    blobPermissions.SharedAccessPolicies[policy] = accesspolicy;
 215                }
 216                else
 217                {
 0218                    policyFound = true;
 219                }
 220            }
 221            else
 222            {
 0223                blobPermissions.SharedAccessPolicies.Add(policy, accesspolicy);
 224            }
 225
 0226            if (!policyFound)
 227            {
 0228                storagecontainer.SetPermissionsAsync(blobPermissions).GetAwaiter().GetResult();
 229            }
 230
 231            // 4. genereate SAS and return
 0232            string container_sas = storagecontainer.GetSharedAccessSignature(new SharedAccessBlobPolicy(), policy);
 0233            string container_url = storagecontainer.Uri.AbsoluteUri + container_sas;
 234
 0235            return container_url;
 236        }
 237
 238        private static void CreateDefaultBlobContainerAndSASIfNeededReturn(List<IFileStagingProvider> filesToStage, Sequ
 239        {
 0240            if ((null != filesToStage) && (filesToStage.Count > 0))
 241            {
 242                // construct the name of the new blob container.
 0243                seqArtifact.BlobContainerCreated = FileStagingNamingHelpers.ConstructDefaultName(seqArtifact.NamingFragm
 244
 245                // get any instance for the storage credentials
 0246                FileToStage anyRealInstance = FindAtLeastOne(filesToStage);
 247
 0248                if (null != anyRealInstance)
 249                {
 0250                    StagingStorageAccount creds = anyRealInstance.StagingStorageAccount;
 0251                    string policyName = Batch.Constants.DefaultConveniencePrefix + Constants.DefaultContainerPolicyFragm
 0252                    DateTime startTime = DateTime.UtcNow;
 0253                    DateTime expiredAtTime = startTime + new TimeSpan(24 /* hrs*/, 0, 0);
 254
 0255                    seqArtifact.DefaultContainerSAS = CreateContainerWithPolicySASIfNotExist(
 0256                                                            creds.StorageAccount,
 0257                                                            creds.StorageAccountKey,
 0258                                                            creds.BlobUri,
 0259                                                            seqArtifact.BlobContainerCreated,
 0260                                                            policyName,
 0261                                                            startTime,
 0262                                                            expiredAtTime,
 0263                                                            SharedAccessBlobPermissions.Read);
 264
 0265                    return;  // done
 266                }
 267            }
 0268        }
 269
 270        /// <summary>
 271        /// Since this is the SequentialFileStagingProvider, all files are supposed to be of this type.
 272        /// Find any one and return the implementation instance.
 273        /// </summary>
 274        /// <param name="filesToStage"></param>
 275        /// <returns>Null means there was not even one.</returns>
 276        private static FileToStage FindAtLeastOne(List<IFileStagingProvider> filesToStage)
 277        {
 0278            if ((null != filesToStage) && (filesToStage.Count > 0))
 279            {
 0280                foreach(IFileStagingProvider curProvider in filesToStage)
 281                {
 0282                    FileToStage thisIsReal = curProvider as FileToStage;
 283
 0284                    if (null != thisIsReal)
 285                    {
 0286                        return thisIsReal;
 287                    }
 288                }
 289            }
 290
 0291            return null;
 0292        }
 293
 294        /// <summary>
 295        /// Starts an asynchronous call to stage the given files.
 296        /// </summary>
 297        private static async System.Threading.Tasks.Task StageFilesInternalAsync(List<IFileStagingProvider> filesToStage
 298        {
 0299            if (null == filesToStage)
 300            {
 0301                throw new ArgumentNullException("filesToStage");
 302            }
 303
 0304            if (null == fileStagingArtifact)
 305            {
 0306                throw new ArgumentNullException("filesStagingArtifact");
 307            }
 308
 0309            SequentialFileStagingArtifact seqArtifact = fileStagingArtifact as SequentialFileStagingArtifact;
 310
 0311            if (null == seqArtifact)
 312            {
 0313                throw new ArgumentOutOfRangeException(ErrorMessages.FileStagingIncorrectArtifact);
 314            }
 315
 316            // is there any work to do?
 0317            if (null == FindAtLeastOne(filesToStage))
 318            {
 0319                return;  // now work to do.  none of the files belong to this provider
 320            }
 321
 322            // is there any work to do
 0323            if ((null == filesToStage) || (filesToStage.Count <= 0))
 324            {
 0325                return;  // we are done
 326            }
 327
 328            // create a Run task to create the blob containers if needed
 0329            System.Threading.Tasks.Task createContainerTask = System.Threading.Tasks.Task.Run(() => { CreateDefaultBlobC
 330
 331            // wait for container to be created
 0332            await createContainerTask.ConfigureAwait(continueOnCapturedContext: false);
 333
 334            // begin staging the files
 0335            System.Threading.Tasks.Task stageTask = StageFilesAsync(filesToStage, seqArtifact);
 336
 337            // wait for files to be staged
 0338            await stageTask.ConfigureAwait(continueOnCapturedContext: false);
 0339        }
 340
 341        /// <summary>
 342        /// Stages all files in the queue
 343        /// </summary>
 344        private async static System.Threading.Tasks.Task StageFilesAsync(List<IFileStagingProvider> filesToStage, Sequen
 345        {
 0346            foreach (IFileStagingProvider currentFile in filesToStage)
 347            {
 348                // for "retry" and/or "double calls" we ignore files that have already been staged
 0349                if (null == currentFile.StagedFiles)
 350                {
 0351                    FileToStage fts = currentFile as FileToStage;
 352
 0353                    if (null != fts)
 354                    {
 0355                        System.Threading.Tasks.Task stageTask = StageOneFileAsync(fts, seqArtifacts);
 356
 0357                        await stageTask.ConfigureAwait(continueOnCapturedContext: false);
 358                    }
 359                }
 360            }
 0361        }
 362
 363        /// <summary>
 364        /// Stage a single file.
 365        /// </summary>
 366        private async static System.Threading.Tasks.Task StageOneFileAsync(FileToStage stageThisFile, SequentialFileStag
 367        {
 0368            StagingStorageAccount storecreds = stageThisFile.StagingStorageAccount;
 0369            string containerName = seqArtifacts.BlobContainerCreated;
 370
 371            // TODO: this flattens all files to the top of the compute node/task relative file directory. solve the hiea
 0372            string blobName = Path.GetFileName(stageThisFile.LocalFileToStage);
 373
 374            // Create the storage account with the connection string.
 0375            CloudStorageAccount storageAccount = new CloudStorageAccount(
 0376                                                        new WindowsAzure.Storage.Auth.StorageCredentials(storecreds.Stor
 0377                                                        blobEndpoint: storecreds.BlobUri,
 0378                                                        queueEndpoint: null,
 0379                                                        tableEndpoint: null,
 0380                                                        fileEndpoint: null);
 381
 0382            CloudBlobClient client = storageAccount.CreateCloudBlobClient();
 0383            CloudBlobContainer container = client.GetContainerReference(containerName);
 0384            ICloudBlob blob = container.GetBlockBlobReference(blobName);
 385            bool doesBlobExist;
 386
 387            try
 388            {
 389                // fetch attributes so we can compare file lengths
 0390                System.Threading.Tasks.Task fetchTask = blob.FetchAttributesAsync();
 391
 0392                await fetchTask.ConfigureAwait(continueOnCapturedContext: false);
 393
 0394                doesBlobExist = true;
 0395            }
 0396            catch (StorageException scex)
 397            {
 398                // check to see if blob does not exist
 0399                if ((int)System.Net.HttpStatusCode.NotFound == scex.RequestInformation.HttpStatusCode)
 400                {
 0401                    doesBlobExist = false;
 402                }
 403                else
 404                {
 0405                    throw;  // unknown exception, throw to caller
 406                }
 0407            }
 408
 0409            bool mustUploadBlob = true;  // we do not re-upload blobs if they have already been uploaded
 410
 0411            if (doesBlobExist) // if the blob exists, compare
 412            {
 0413                FileInfo fi = new FileInfo(stageThisFile.LocalFileToStage);
 414
 415                // since we don't have a hash of the contents... we check length
 0416                if (blob.Properties.Length == fi.Length)
 417                {
 0418                    mustUploadBlob = false;
 419                }
 420            }
 421
 0422            if (mustUploadBlob)
 423            {
 424                // upload the file
 0425                System.Threading.Tasks.Task uploadTask = blob.UploadFromFileAsync(stageThisFile.LocalFileToStage);
 426
 0427                await uploadTask.ConfigureAwait(continueOnCapturedContext: false);
 428            }
 429
 430            // get the SAS for the blob
 0431            string blobSAS = ConstructBlobSource(seqArtifacts.DefaultContainerSAS, blobName);
 0432            string nodeFileName = stageThisFile.NodeFileName;
 433
 434            // create a new ResourceFile and populate it.  This file is now staged!
 0435            stageThisFile.StagedFiles = new ResourceFile[] { ResourceFile.FromUrl(blobSAS, nodeFileName) };
 0436        }
 437
 438#endregion internal/private
 439    }
 440}