< Summary

Class:Azure.Storage.Blobs.PartitionedDownloader
Assembly:Azure.Storage.Blobs
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs\src\PartitionedDownloader.cs
Covered lines:141
Uncovered lines:1
Coverable lines:142
Total lines:361
Line coverage:99.2% (141 of 142)
Covered branches:46
Total branches:48
Branch coverage:95.8% (46 of 48)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-100%100%
DownloadToAsync()-100%100%
<DownloadToAsync()-100%100%
DownloadTo(...)-100%100%
ParseRangeTotalLength(...)-83.33%75%
CreateConditionsWithEtag(...)-100%91.67%
CopyToAsync()-100%100%
CopyTo(...)-100%100%
GetRanges()-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs\src\PartitionedDownloader.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Diagnostics;
 7using System.Globalization;
 8using System.IO;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Azure.Core.Pipeline;
 12using Azure.Storage.Blobs.Models;
 13using Azure.Storage.Blobs.Specialized;
 14
 15namespace Azure.Storage.Blobs
 16{
 17    internal class PartitionedDownloader
 18    {
 19        /// <summary>
 20        /// The client used to download the blob.
 21        /// </summary>
 22        private readonly BlobBaseClient _client;
 23
 24        /// <summary>
 25        /// The maximum number of simultaneous workers.
 26        /// </summary>
 27        private readonly int _maxWorkerCount;
 28
 29        /// <summary>
 30        /// The size of the first range requested (which can be larger than the
 31        /// other ranges).
 32        /// </summary>
 33        private readonly long _initialRangeSize;
 34
 35        /// <summary>
 36        /// The size of subsequent ranges.
 37        /// </summary>
 38        private readonly long _rangeSize;
 39
 14040        public PartitionedDownloader(
 14041            BlobBaseClient client,
 14042            StorageTransferOptions transferOptions = default)
 43        {
 14044            _client = client;
 45
 46            // Set _maxWorkerCount
 14047            if (transferOptions.MaximumConcurrency.HasValue
 14048                && transferOptions.MaximumConcurrency > 0)
 49            {
 450                _maxWorkerCount = transferOptions.MaximumConcurrency.Value;
 51            }
 52            else
 53            {
 13654                _maxWorkerCount = Constants.Blob.Block.DefaultConcurrentTransfersCount;
 55            }
 56
 57            // Set _rangeSize
 14058            if (transferOptions.MaximumTransferSize.HasValue
 14059                && transferOptions.MaximumTransferSize.Value > 0)
 60            {
 5261                _rangeSize = Math.Min(transferOptions.MaximumTransferSize.Value, Constants.Blob.Block.MaxDownloadBytes);
 62            }
 63            else
 64            {
 8865                _rangeSize = Constants.DefaultBufferSize;
 66            }
 67
 68            // Set _initialRangeSize
 14069            if (transferOptions.InitialTransferSize.HasValue
 14070                && transferOptions.InitialTransferSize.Value > 0)
 71            {
 1272                _initialRangeSize = transferOptions.MaximumTransferSize.Value;
 73            }
 74            else
 75            {
 12876                _initialRangeSize = Constants.Blob.Block.DefaultInitalDownloadRangeSize;
 77            }
 12878        }
 79
 80        public async Task<Response> DownloadToAsync(
 81            Stream destination,
 82            BlobRequestConditions conditions,
 83            CancellationToken cancellationToken)
 84        {
 85            // Wrap the download range calls in a Download span for distributed
 86            // tracing
 10687            DiagnosticScope scope = _client.ClientDiagnostics.CreateScope($"{nameof(BlobBaseClient)}.{nameof(BlobBaseCli
 88            try
 89            {
 10690                scope.Start();
 91
 92                // Just start downloading using an initial range.  If it's a
 93                // small blob, we'll get the whole thing in one shot.  If it's
 94                // a large blob, we'll get its full size in Content-Range and
 95                // can keep downloading it in segments.
 10696                var initialRange = new HttpRange(0, _initialRangeSize);
 10697                Task<Response<BlobDownloadInfo>> initialResponseTask =
 10698                    _client.DownloadAsync(
 10699                        initialRange,
 106100                        conditions,
 106101                        rangeGetContentHash: false,
 106102                        cancellationToken);
 103
 106104                Response<BlobDownloadInfo> initialResponse = null;
 105                try
 106                {
 106107                    initialResponse = await initialResponseTask.ConfigureAwait(false);
 102108                }
 2109                catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.InvalidRange)
 110                {
 2111                    initialResponse = await _client.DownloadAsync(
 2112                        range: default,
 2113                        conditions,
 2114                        false,
 2115                        cancellationToken)
 2116                        .ConfigureAwait(false);
 117                }
 118
 119                // If the initial request returned no content (i.e., a 304),
 120                // we'll pass that back to the user immediately
 104121                if (initialResponse.IsUnavailable())
 122                {
 2123                    return initialResponse.GetRawResponse();
 124                }
 125
 126                // If the first segment was the entire blob, we'll copy that to
 127                // the output stream and finish now
 102128                long initialLength = initialResponse.Value.ContentLength;
 102129                long totalLength = ParseRangeTotalLength(initialResponse.Value.Details.ContentRange);
 102130                if (initialLength == totalLength)
 131                {
 94132                    await CopyToAsync(
 94133                        initialResponse,
 94134                        destination,
 94135                        cancellationToken)
 94136                        .ConfigureAwait(false);
 94137                    return initialResponse.GetRawResponse();
 138                }
 139
 140                // Capture the etag from the first segment and construct
 141                // conditions to ensure the blob doesn't change while we're
 142                // downloading the remaining segments
 8143                ETag etag = initialResponse.Value.Details.ETag;
 8144                BlobRequestConditions conditionsWithEtag = CreateConditionsWithEtag(conditions, etag);
 145
 146                // Create a queue of tasks that will each download one segment
 147                // of the blob.  The queue maintains the order of the segments
 148                // so we can keep appending to the end of the destination
 149                // stream when each segment finishes.
 8150                var runningTasks = new Queue<Task<Response<BlobDownloadInfo>>>();
 8151                runningTasks.Enqueue(initialResponseTask);
 152
 153                // Fill the queue with tasks to download each of the remaining
 154                // ranges in the blob
 108155                foreach (HttpRange httpRange in GetRanges(initialLength, totalLength))
 156                {
 157                    // Add the next Task (which will start the download but
 158                    // return before it's completed downloading)
 48159                    runningTasks.Enqueue(_client.DownloadAsync(
 48160                        httpRange,
 48161                        conditionsWithEtag,
 48162                        rangeGetContentHash: false,
 48163                        cancellationToken));
 164
 165                    // If we have fewer tasks than alotted workers, then just
 166                    // continue adding tasks until we have _maxWorkerCount
 167                    // running in parallel
 48168                    if (runningTasks.Count < _maxWorkerCount)
 169                    {
 170                        continue;
 171                    }
 172
 173                    // Once all the workers are busy, wait for the first
 174                    // segment to finish downloading before we create more work
 36175                    await ConsumeQueuedTask().ConfigureAwait(false);
 176                }
 177
 178                // Wait for all of the remaining segments to download
 20179                while (runningTasks.Count > 0)
 180                {
 16181                    await ConsumeQueuedTask().ConfigureAwait(false);
 182                }
 183
 4184                return initialResponse.GetRawResponse();
 185
 186                // Wait for the first segment in the queue of tasks to complete
 187                // downloading and copy it to the destination stream
 188                async Task ConsumeQueuedTask()
 189                {
 190                    // Don't need to worry about 304s here because the ETag
 191                    // condition will turn into a 412 and throw a proper
 192                    // RequestFailedException
 52193                    using BlobDownloadInfo result =
 52194                        await runningTasks.Dequeue().ConfigureAwait(false);
 195
 196                    // Even though the BlobDownloadInfo is returned immediately,
 197                    // CopyToAsync causes ConsumeQueuedTask to wait until the
 198                    // download is complete
 48199                    await CopyToAsync(
 48200                        result,
 48201                        destination,
 48202                        cancellationToken)
 48203                        .ConfigureAwait(false);
 48204                }
 205            }
 6206            catch (Exception ex)
 207            {
 6208                scope.Failed(ex);
 6209                throw;
 210            }
 211            finally
 212            {
 106213                scope.Dispose();
 214            }
 100215        }
 216
 217        public Response DownloadTo(
 218            Stream destination,
 219            BlobRequestConditions conditions,
 220            CancellationToken cancellationToken)
 221        {
 222            // Wrap the download range calls in a Download span for distributed
 223            // tracing
 34224            DiagnosticScope scope = _client.ClientDiagnostics.CreateScope($"{nameof(BlobBaseClient)}.{nameof(BlobBaseCli
 225            try
 226            {
 34227                scope.Start();
 228
 229                // Just start downloading using an initial range.  If it's a
 230                // small blob, we'll get the whole thing in one shot.  If it's
 231                // a large blob, we'll get its full size in Content-Range and
 232                // can keep downloading it in segments.
 34233                var initialRange = new HttpRange(0, _initialRangeSize);
 234                Response<BlobDownloadInfo> initialResponse;
 235
 236                try
 237                {
 34238                    initialResponse = _client.Download(
 34239                        initialRange,
 34240                        conditions,
 34241                        rangeGetContentHash: false,
 34242                        cancellationToken);
 30243                }
 2244                catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.InvalidRange)
 245                {
 2246                    initialResponse = _client.Download(
 2247                    range: default,
 2248                    conditions,
 2249                    rangeGetContentHash: false,
 2250                    cancellationToken);
 2251                }
 252
 253                // If the initial request returned no content (i.e., a 304),
 254                // we'll pass that back to the user immediately
 32255                if (initialResponse.IsUnavailable())
 256                {
 2257                    return initialResponse.GetRawResponse();
 258                }
 259
 260                // Copy the first segment to the destination stream
 30261                CopyTo(initialResponse, destination, cancellationToken);
 262
 263                // If the first segment was the entire blob, we're finished now
 30264                long initialLength = initialResponse.Value.ContentLength;
 30265                long totalLength = ParseRangeTotalLength(initialResponse.Value.Details.ContentRange);
 30266                if (initialLength == totalLength)
 267                {
 26268                    return initialResponse.GetRawResponse();
 269                }
 270
 271                // Capture the etag from the first segment and construct
 272                // conditions to ensure the blob doesn't change while we're
 273                // downloading the remaining segments
 4274                ETag etag = initialResponse.Value.Details.ETag;
 4275                BlobRequestConditions conditionsWithEtag = CreateConditionsWithEtag(conditions, etag);
 276
 277                // Download each of the remaining ranges in the blob
 80278                foreach (HttpRange httpRange in GetRanges(initialLength, totalLength))
 279                {
 280                    // Don't need to worry about 304s here because the ETag
 281                    // condition will turn into a 412 and throw a proper
 282                    // RequestFailedException
 36283                    Response<BlobDownloadInfo> result = _client.Download(
 36284                        httpRange,
 36285                        conditionsWithEtag,
 36286                        rangeGetContentHash: false,
 36287                        cancellationToken);
 36288                    CopyTo(result.Value, destination, cancellationToken);
 289                }
 290
 4291                return initialResponse.GetRawResponse();
 292            }
 2293            catch (Exception ex)
 294            {
 2295                scope.Failed(ex);
 2296                throw;
 297            }
 298            finally
 299            {
 34300                scope.Dispose();
 34301            }
 32302        }
 303
 304        private static long ParseRangeTotalLength(string range)
 305        {
 132306            if (range == null)
 307            {
 4308                return 0;
 309            }
 128310            int lengthSeparator = range.IndexOf("/", StringComparison.InvariantCultureIgnoreCase);
 128311            if (lengthSeparator == -1)
 312            {
 0313                throw BlobErrors.ParsingFullHttpRangeFailed(range);
 314            }
 128315            return long.Parse(range.Substring(lengthSeparator + 1), CultureInfo.InvariantCulture);
 316        }
 317
 318        private static BlobRequestConditions CreateConditionsWithEtag(BlobRequestConditions conditions, ETag etag) =>
 12319            new BlobRequestConditions
 12320            {
 12321                LeaseId = conditions?.LeaseId,
 12322                IfMatch = conditions?.IfMatch ?? etag,
 12323                IfNoneMatch = conditions?.IfNoneMatch,
 12324                IfModifiedSince = conditions?.IfModifiedSince,
 12325                IfUnmodifiedSince = conditions?.IfUnmodifiedSince
 12326            };
 327
 328        private static async Task CopyToAsync(
 329            BlobDownloadInfo result,
 330            Stream destination,
 331            CancellationToken cancellationToken)
 332        {
 142333            using Stream source = result.Content;
 334
 142335            await source.CopyToAsync(
 142336                destination,
 142337                Constants.DefaultDownloadCopyBufferSize,
 142338                cancellationToken)
 142339                .ConfigureAwait(false);
 142340        }
 341
 342
 343        private static void CopyTo(
 344            BlobDownloadInfo result,
 345            Stream destination,
 346            CancellationToken cancellationToken)
 347        {
 66348            cancellationToken.ThrowIfCancellationRequested();
 66349            result.Content.CopyTo(destination, Constants.DefaultDownloadCopyBufferSize);
 66350            result.Content.Dispose();
 66351        }
 352
 353        private IEnumerable<HttpRange> GetRanges(long initialLength, long totalLength)
 354        {
 184355            for (long offset = initialLength; offset < totalLength; offset += _rangeSize)
 356            {
 84357                yield return new HttpRange(offset, Math.Min(totalLength - offset, _rangeSize));
 358            }
 8359        }
 360    }
 361}