< Summary

Class:Azure.Storage.PartitionedUploader`2
Assembly:Azure.Storage.Files.DataLake
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Common\src\Shared\PartitionedUploader.cs
Covered lines:184
Uncovered lines:28
Coverable lines:212
Total lines:611
Line coverage:86.7% (184 of 212)
Covered branches:63
Total branches:88
Branch coverage:71.5% (63 of 88)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_InitializeDestination()-100%100%
get_SingleUpload()-100%100%
get_UploadPartition()-100%100%
get_CommitPartitionedUpload()-100%100%
get_Scope()-100%100%
.cctor()-0%100%
.ctor(...)-96.67%70.83%
UploadInternal()-97.22%83.33%
UploadInSequenceInternal()-62%50%
UploadInParallelAsync()-92.86%78.57%
StagePartitionAndDisposeInternal()-100%100%
GetLengthOrDefault(...)-71.43%100%
GetPartitionsAsync()-95%72.73%
GetBufferedPartitionInternal()-100%100%
GetStreamedPartitionInternal(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Common\src\Shared\PartitionedUploader.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Buffers;
 6using System.Collections.Generic;
 7using System.IO;
 8using System.Runtime.CompilerServices;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Azure.Core.Pipeline;
 12using Azure.Storage.Shared;
 13
 14namespace Azure.Storage
 15{
 16    internal class PartitionedUploader<TServiceSpecificArgs, TCompleteUploadReturn>
 17    {
 18        #region Definitions
 19        // delegte for getting a partition from a stream based on the selected data management stragegy
 20        private delegate Task<SlicedStream> GetNextStreamPartition(
 21            Stream stream,
 22            long minCount,
 23            long maxCount,
 24            long absolutePosition,
 25            bool async,
 26            CancellationToken cancellationToken);
 27
 28        // injected behaviors for services to use partitioned uploads
 29        public delegate DiagnosticScope CreateScope(string operationName);
 30        public delegate Task InitializeDestinationInternal(TServiceSpecificArgs args, bool async, CancellationToken canc
 31        public delegate Task<Response<TCompleteUploadReturn>> SingleUploadInternal(
 32            Stream contentStream,
 33            TServiceSpecificArgs args,
 34            IProgress<long> progressHandler,
 35            string operationName,
 36            bool async,
 37            CancellationToken cancellationToken);
 38        public delegate Task UploadPartitionInternal(Stream contentStream,
 39            long offset,
 40            TServiceSpecificArgs args,
 41            IProgress<long> progressHandler,
 42            bool async,
 43            CancellationToken cancellationToken);
 44        public delegate Task<Response<TCompleteUploadReturn>> CommitPartitionedUploadInternal(
 45            List<(long Offset, long Size)> partitions,
 46            TServiceSpecificArgs args,
 47            bool async,
 48            CancellationToken cancellationToken);
 49
 50        public struct Behaviors
 51        {
 38452            public InitializeDestinationInternal InitializeDestination { get; set; }
 38453            public SingleUploadInternal SingleUpload { get; set; }
 38454            public UploadPartitionInternal UploadPartition { get; set; }
 38455            public CommitPartitionedUploadInternal CommitPartitionedUpload { get; set; }
 38456            public CreateScope Scope { get; set; }
 57        }
 58
 059        public static readonly InitializeDestinationInternal InitializeNoOp = (args, async, cancellationToken) => Task.C
 60        #endregion
 61
 62        private readonly InitializeDestinationInternal _initializeDestinationInternal;
 63        private readonly SingleUploadInternal _singleUploadInternal;
 64        private readonly UploadPartitionInternal _uploadPartitionInternal;
 65        private readonly CommitPartitionedUploadInternal _commitPartitionedUploadInternal;
 66        private readonly CreateScope _createScope;
 67
 68        /// <summary>
 69        /// The maximum number of simultaneous workers.
 70        /// </summary>
 71        private readonly int _maxWorkerCount;
 72
 73        /// <summary>
 74        /// A pool of memory we use to partition the stream into blocks.
 75        /// </summary>
 76        private readonly ArrayPool<byte> _arrayPool;
 77
 78        /// <summary>
 79        /// The size we use to determine whether to upload as a one-off request or
 80        /// a partitioned/committed upload
 81        /// </summary>
 82        private readonly long _singleUploadThreshold;
 83
 84        /// <summary>
 85        /// The size of each staged block.  If null, we'll change between 4MB
 86        /// and 8MB depending on the size of the content.
 87        /// </summary>
 88        private readonly long? _blockSize;
 89
 90        /// <summary>
 91        /// The name of the calling operaiton.
 92        /// </summary>
 93        private readonly string _operationName;
 94
 19295        public PartitionedUploader(
 19296            Behaviors behaviors,
 19297            StorageTransferOptions transferOptions,
 19298            ArrayPool<byte> arrayPool = null,
 19299            string operationName = null)
 100        {
 101            // initialize isn't required for all services and can use a no-op; rest are required
 192102            _initializeDestinationInternal = behaviors.InitializeDestination ?? InitializeNoOp;
 192103            _singleUploadInternal = behaviors.SingleUpload
 192104                ?? throw Errors.ArgumentNull(nameof(behaviors.SingleUpload));
 192105            _uploadPartitionInternal = behaviors.UploadPartition
 192106                ?? throw Errors.ArgumentNull(nameof(behaviors.UploadPartition));
 192107            _commitPartitionedUploadInternal = behaviors.CommitPartitionedUpload
 192108                ?? throw Errors.ArgumentNull(nameof(behaviors.CommitPartitionedUpload));
 192109            _createScope = behaviors.Scope
 192110                ?? throw Errors.ArgumentNull(nameof(behaviors.Scope));
 111
 192112            _arrayPool = arrayPool ?? ArrayPool<byte>.Shared;
 113
 114            // Set _maxWorkerCount
 192115            if (transferOptions.MaximumConcurrency.HasValue
 192116                && transferOptions.MaximumConcurrency > 0)
 117            {
 0118                _maxWorkerCount = transferOptions.MaximumConcurrency.Value;
 119            }
 120            else
 121            {
 192122                _maxWorkerCount = Constants.Blob.Block.DefaultConcurrentTransfersCount;
 123            }
 124
 125            // Set _singleUploadThreshold
 192126            if (transferOptions.InitialTransferSize.HasValue
 192127                && transferOptions.InitialTransferSize.Value > 0)
 128            {
 4129                _singleUploadThreshold = Math.Min(transferOptions.InitialTransferSize.Value, Constants.Blob.Block.MaxUpl
 130            }
 131            else
 132            {
 188133                _singleUploadThreshold = Constants.Blob.Block.MaxUploadBytes;
 134            }
 135
 136            // Set _blockSize
 192137            if (transferOptions.MaximumTransferSize.HasValue
 192138                && transferOptions.MaximumTransferSize > 0)
 139            {
 16140                _blockSize = Math.Min(
 16141                    Constants.Blob.Block.MaxStageBytes,
 16142                    transferOptions.MaximumTransferSize.Value);
 143            }
 144
 192145            _operationName = operationName;
 192146        }
 147
 148        public async Task<Response<TCompleteUploadReturn>> UploadInternal(
 149            Stream content,
 150            TServiceSpecificArgs args,
 151            IProgress<long> progressHandler,
 152            bool async,
 153            CancellationToken cancellationToken = default)
 154        {
 192155            if (content == default)
 156            {
 0157                throw Errors.ArgumentNull(nameof(content));
 158            }
 159
 192160            await _initializeDestinationInternal(args, async, cancellationToken).ConfigureAwait(false);
 161
 162            // some strategies are unavailable if we don't know the stream length, and some can still work
 163            // we may introduce separately provided stream lengths in the future for unseekable streams with
 164            // an expected length
 184165            long? length = GetLengthOrDefault(content);
 166
 167            // If we know the length and it's small enough
 184168            if (length < _singleUploadThreshold)
 169            {
 170                // Upload it in a single request
 160171                return await _singleUploadInternal(
 160172                    content,
 160173                    args,
 160174                    progressHandler,
 160175                    _operationName,
 160176                    async,
 160177                    cancellationToken)
 160178                    .ConfigureAwait(false);
 179            }
 180
 181            // If the caller provided an explicit block size, we'll use it.
 182            // Otherwise we'll adjust dynamically based on the size of the
 183            // content.
 24184            long blockSize = _blockSize != null
 24185                ? _blockSize.Value
 24186                : length < Constants.LargeUploadThreshold ?
 24187                    Constants.DefaultBufferSize :
 24188                    Constants.LargeBufferSize;
 189
 190            // Otherwise stage individual blocks
 191
 192            /* We only support parallel upload in an async context to avoid issues in our overall sync story.
 193             * We're branching on both async and max worker count, where 3 combinations lead to
 194             * UploadInSequenceInternal and 1 combination leads to UploadInParallelAsync. We are guaranteed
 195             * to be in an async context when we call UploadInParallelAsync, even though the analyzer can't
 196             * detext this, and we properly pass in the async context in the else case when we haven't
 197             * explicitly checked.
 198             */
 199#pragma warning disable AZC0109 // Misuse of 'async' parameter.
 200#pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 24201            if (async && _maxWorkerCount > 1)
 202            {
 12203                return await UploadInParallelAsync(
 12204                    content,
 12205                    length,
 12206                    blockSize,
 12207                    args,
 12208                    progressHandler,
 12209                    cancellationToken)
 12210                    .ConfigureAwait(false);
 211            }
 212#pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 213#pragma warning restore AZC0109 // Misuse of 'async' parameter.
 214            else
 215            {
 12216                return await UploadInSequenceInternal(
 12217                    content,
 12218                    length,
 12219                    blockSize,
 12220                    args,
 12221                    progressHandler,
 12222                    async: async,
 12223                    cancellationToken).ConfigureAwait(false);
 224            }
 184225        }
 226
 227        private async Task<Response<TCompleteUploadReturn>> UploadInSequenceInternal(
 228            Stream content,
 229            long? contentLength,
 230            long partitionSize,
 231            TServiceSpecificArgs args,
 232            IProgress<long> progressHandler,
 233            bool async,
 234            CancellationToken cancellationToken)
 235        {
 236            // Wrap the staging and commit calls in an Upload span for
 237            // distributed tracing
 12238            DiagnosticScope scope = _createScope(_operationName);
 239            try
 240            {
 12241                scope.Start();
 242
 243                // Wrap progressHandler in a AggregatingProgressIncrementer to prevent
 244                // progress from being reset with each stage blob operation.
 12245                if (progressHandler != null)
 246                {
 12247                    progressHandler = new AggregatingProgressIncrementer(progressHandler);
 248                }
 249
 250                // The list tracking blocks IDs we're going to commit
 12251                List<(long Offset, long Size)> partitions = new List<(long, long)>();
 252
 253                /* Streamed partitions only work if we can seek the stream; we need retries on
 254                 * individual uploads.
 255                 */
 12256                GetNextStreamPartition partitionGetter = content.CanSeek
 12257                            ? (GetNextStreamPartition)GetStreamedPartitionInternal
 12258                            : /*   redundant cast   */GetBufferedPartitionInternal;
 259
 260                // Partition the stream into individual blocks and stage them
 12261                if (async)
 262                {
 0263                    await foreach (SlicedStream block in GetPartitionsAsync(
 0264                        content,
 0265                        contentLength,
 0266                        partitionSize,
 0267                        partitionGetter,
 0268                        async: true,
 0269                        cancellationToken).ConfigureAwait(false))
 270                    {
 0271                        await StagePartitionAndDisposeInternal(
 0272                            block,
 0273                            block.AbsolutePosition,
 0274                            args,
 0275                            progressHandler,
 0276                            async: true,
 0277                            cancellationToken).ConfigureAwait(false);
 278
 0279                        partitions.Add((block.AbsolutePosition, block.Length));
 0280                    }
 281                }
 282                else
 283                {
 216284                    foreach (SlicedStream block in GetPartitionsAsync(
 12285                        content,
 12286                        contentLength,
 12287                        partitionSize,
 12288                        partitionGetter,
 12289                        async: false,
 12290                        cancellationToken).EnsureSyncEnumerable())
 291                    {
 96292                        StagePartitionAndDisposeInternal(
 96293                            block,
 96294                            block.AbsolutePosition,
 96295                            args,
 96296                            progressHandler,
 96297                            async: false,
 96298                            cancellationToken).EnsureCompleted();
 299
 96300                        partitions.Add((block.AbsolutePosition, block.Length));
 301                    }
 302                }
 303
 304                // Commit the block list after everything has been staged to
 305                // complete the upload
 12306                return await _commitPartitionedUploadInternal(
 12307                    partitions,
 12308                    args,
 12309                    async,
 12310                    cancellationToken).ConfigureAwait(false);
 311            }
 0312            catch (Exception ex)
 313            {
 0314                scope.Failed(ex);
 0315                throw;
 316            }
 317            finally
 318            {
 12319                scope.Dispose();
 320            }
 12321        }
 322
 323        private async Task<Response<TCompleteUploadReturn>> UploadInParallelAsync(
 324            Stream content,
 325            long? contentLength,
 326            long blockSize,
 327            TServiceSpecificArgs args,
 328            IProgress<long> progressHandler,
 329            CancellationToken cancellationToken)
 330        {
 331            // Wrap the staging and commit calls in an Upload span for
 332            // distributed tracing
 12333            DiagnosticScope scope = _createScope(_operationName);
 334            try
 335            {
 12336                scope.Start();
 337
 338                // Wrap progressHandler in a AggregatingProgressIncrementer to prevent
 339                // progress from being reset with each stage blob operation.
 12340                if (progressHandler != null)
 341                {
 12342                    progressHandler = new AggregatingProgressIncrementer(progressHandler);
 343                }
 344
 345                // The list tracking blocks IDs we're going to commit
 12346                List<(long Offset, long Size)> partitions = new List<(long, long)>();
 347
 348                // A list of tasks that are currently executing which will
 349                // always be smaller than _maxWorkerCount
 12350                List<Task> runningTasks = new List<Task>();
 351
 352                // Partition the stream into individual blocks
 216353                await foreach (SlicedStream block in GetPartitionsAsync(
 12354                    content,
 12355                    contentLength,
 12356                    blockSize,
 12357                    GetBufferedPartitionInternal, // we always buffer for upload in parallel from stream
 12358                    async: true,
 12359                    cancellationToken).ConfigureAwait(false))
 360                {
 361                    /* We need to do this first! Length is calculated on the fly based on stream buffer
 362                     * contents; We need to record the partition data first before consuming the stream
 363                     * asynchronously. */
 96364                    partitions.Add((block.AbsolutePosition, block.Length));
 365
 366                    // Start staging the next block (but don't await the Task!)
 96367                    Task task = StagePartitionAndDisposeInternal(
 96368                        block,
 96369                        block.AbsolutePosition,
 96370                        args,
 96371                        progressHandler,
 96372                        async: true,
 96373                        cancellationToken);
 374
 375                    // Add the block to our task and commit lists
 96376                    runningTasks.Add(task);
 377
 378                    // If we run out of workers
 96379                    if (runningTasks.Count >= _maxWorkerCount)
 380                    {
 381                        // Wait for at least one of them to finish
 32382                        await Task.WhenAny(runningTasks).ConfigureAwait(false);
 383
 384                        // Clear any completed blocks from the task list
 384385                        for (int i = 0; i < runningTasks.Count; i++)
 386                        {
 160387                            Task runningTask = runningTasks[i];
 160388                            if (!runningTask.IsCompleted)
 389                            {
 390                                continue;
 391                            }
 392
 74393                            await runningTask.ConfigureAwait(false);
 74394                            runningTasks.RemoveAt(i);
 74395                            i--;
 396                        }
 397                    }
 398                }
 399
 400                // Wait for all the remaining blocks to finish staging and then
 401                // commit the block list to complete the upload
 12402                await Task.WhenAll(runningTasks).ConfigureAwait(false);
 403
 404                // Calling internal method for easier mocking in PartitionedUploaderTests
 12405                return await _commitPartitionedUploadInternal(
 12406                    partitions,
 12407                    args,
 12408                    async: true,
 12409                    cancellationToken)
 12410                    .ConfigureAwait(false);
 411            }
 0412            catch (Exception ex)
 413            {
 0414                scope.Failed(ex);
 0415                throw;
 416            }
 417            finally
 418            {
 12419                scope.Dispose();
 420            }
 12421        }
 422
 423        /// <summary>
 424        /// Wraps both the async method and dispose call in one task.
 425        /// </summary>
 426        private async Task StagePartitionAndDisposeInternal(
 427            SlicedStream partition,
 428            long offset,
 429            TServiceSpecificArgs args,
 430            IProgress<long> progressHandler,
 431            bool async,
 432            CancellationToken cancellationToken)
 433        {
 434            try
 435            {
 192436                await _uploadPartitionInternal(
 192437                    partition,
 192438                    offset,
 192439                    args,
 192440                    progressHandler,
 192441                    async,
 192442                    cancellationToken)
 192443                    .ConfigureAwait(false);
 192444            }
 445            finally
 446            {
 447                // Return the memory used by the block to our ArrayPool as soon
 448                // as we've staged it
 192449                partition.Dispose();
 450            }
 192451        }
 452
 453        /// <summary>
 454        /// Some streams will throw if you try to access their length so we wrap
 455        /// the check in a TryGet helper.
 456        /// </summary>
 457        private static long? GetLengthOrDefault(Stream content)
 458        {
 459            try
 460            {
 184461                if (content.CanSeek)
 462                {
 164463                    return content.Length;
 464                }
 20465            }
 0466            catch (NotSupportedException)
 467            {
 0468            }
 20469            return default;
 164470        }
 471
 472        #region Stream Splitters
 473        /// <summary>
 474        /// Partition a stream into a series of blocks buffered as needed by an array pool.
 475        /// </summary>
 476        private static async IAsyncEnumerable<SlicedStream> GetPartitionsAsync(
 477            Stream stream,
 478            long? streamLength,
 479            long blockSize,
 480            GetNextStreamPartition getNextPartition,
 481            bool async,
 482            [EnumeratorCancellation] CancellationToken cancellationToken)
 483        {
 484            // The minimum amount of data we'll accept from a stream before
 485            // splitting another block. Code that sets `blockSize` will always
 486            // set it to a positive number. Min() only avoids edge case where
 487            // user sets their block size to 1.
 24488            long acceptableBlockSize = Math.Max(1, blockSize / 2);
 489
 490            // if we know the data length, assert boundaries before spending resources uploading beyond service capabili
 24491            if (streamLength.HasValue)
 492            {
 493                // service has a max block count per blob
 494                // block size * block count limit = max data length to upload
 495                // if stream length is longer than specified max block size allows, can't upload
 4496                long minRequiredBlockSize = (long)Math.Ceiling((double)streamLength.Value / Constants.Blob.Block.MaxBloc
 4497                if (blockSize < minRequiredBlockSize)
 498                {
 0499                    throw Errors.InsufficientStorageTransferOptions(streamLength.Value, blockSize, minRequiredBlockSize)
 500                }
 501                // bring min up to our min required by the service
 4502                acceptableBlockSize = Math.Max(acceptableBlockSize, minRequiredBlockSize);
 503            }
 504
 505            long read;
 24506            long absolutePosition = 0;
 507            do
 508            {
 216509                SlicedStream partition = await getNextPartition(
 216510                    stream,
 216511                    acceptableBlockSize,
 216512                    blockSize,
 216513                    absolutePosition,
 216514                    async,
 216515                    cancellationToken).ConfigureAwait(false);
 216516                read = partition.Length;
 216517                absolutePosition += read;
 518
 519                // If we read anything, turn it into a StreamPartition and
 520                // return it for staging
 216521                if (partition.Length != 0)
 522                {
 523                    // The StreamParitition is disposable and it'll be the
 524                    // user's responsibility to return the bytes used to our
 525                    // ArrayPool
 192526                    yield return partition;
 527                }
 528
 529                // Continue reading blocks until we've exhausted the stream
 216530            } while (read != 0);
 24531        }
 532
 533        /// <summary>
 534        /// Gets a partition from the current location of the given stream.
 535        ///
 536        /// This partition is buffered and it is safe to get many before using any of them.
 537        /// </summary>
 538        /// <param name="stream">
 539        /// Stream to buffer a partition from.
 540        /// </param>
 541        /// <param name="minCount">
 542        /// Minimum amount of data to wait on before finalizing buffer.
 543        /// </param>
 544        /// <param name="maxCount">
 545        /// Max amount of data to buffer before cutting off for the next.
 546        /// </param>
 547        /// <param name="absolutePosition">
 548        /// Offset of this stream relative to the large stream.
 549        /// </param>
 550        /// <param name="async">
 551        /// Whether to buffer this partition asynchronously.
 552        /// </param>
 553        /// <param name="cancellationToken">
 554        /// Cancellation token.
 555        /// </param>
 556        /// <returns>
 557        /// Task containing the buffered stream partition.
 558        /// </returns>
 559        private async Task<SlicedStream> GetBufferedPartitionInternal(
 560            Stream stream,
 561            long minCount,
 562            long maxCount,
 563            long absolutePosition,
 564            bool async,
 565            CancellationToken cancellationToken)
 210566            => await PooledMemoryStream.BufferStreamPartitionInternal(
 210567                stream,
 210568                minCount,
 210569                maxCount,
 210570                absolutePosition,
 210571                _arrayPool,
 210572                maxArrayPoolRentalSize: default,
 210573                async,
 210574                cancellationToken).ConfigureAwait(false);
 575
 576        /// <summary>
 577        /// Gets a partition from the current location of the given stream.
 578        ///
 579        /// This partition is a facade over the existing stream, and the
 580        /// previous partition should be consumed before using the next.
 581        /// </summary>
 582        /// <param name="stream">
 583        /// Stream to wrap.
 584        /// </param>
 585        /// <param name="minCount">
 586        /// Unused, but part of <see cref="GetNextStreamPartition"/> definition.
 587        /// </param>
 588        /// <param name="maxCount">
 589        /// Length of this facade stream.
 590        /// </param>
 591        /// <param name="absolutePosition">
 592        /// Offset of this stream relative to the large stream.
 593        /// </param>
 594        /// <param name="async">
 595        /// Unused, but part of <see cref="GetNextStreamPartition"/> definition.
 596        /// </param>
 597        /// <param name="cancellationToken"></param>
 598        /// <returns>
 599        /// Task containing the stream facade.
 600        /// </returns>
 601        private static Task<SlicedStream> GetStreamedPartitionInternal(
 602            Stream stream,
 603            long minCount,
 604            long maxCount,
 605            long absolutePosition,
 606            bool async,
 607            CancellationToken cancellationToken)
 6608            => Task.FromResult((SlicedStream)WindowStream.GetWindow(stream, maxCount, absolutePosition));
 609        #endregion
 610    }
 611}