< Summary

Class:Azure.Storage.PartitionedUploader`2
Assembly:Azure.Storage.Blobs
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Common\src\Shared\PartitionedUploader.cs
Covered lines:202
Uncovered lines:10
Coverable lines:212
Total lines:611
Line coverage:95.2% (202 of 212)
Covered branches:71
Total branches:88
Branch coverage:80.6% (71 of 88)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_InitializeDestination()-0%100%
get_SingleUpload()-100%100%
get_UploadPartition()-100%100%
get_CommitPartitionedUpload()-100%100%
get_Scope()-100%100%
.cctor()-100%100%
.ctor(...)-100%83.33%
UploadInternal()-100%91.67%
UploadInSequenceInternal()-94%78.57%
UploadInParallelAsync()-92.86%78.57%
StagePartitionAndDisposeInternal()-100%100%
GetLengthOrDefault(...)-71.43%100%
GetPartitionsAsync()-95%72.73%
GetBufferedPartitionInternal()-100%100%
GetStreamedPartitionInternal(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Common\src\Shared\PartitionedUploader.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Buffers;
 6using System.Collections.Generic;
 7using System.IO;
 8using System.Runtime.CompilerServices;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Azure.Core.Pipeline;
 12using Azure.Storage.Shared;
 13
 14namespace Azure.Storage
 15{
 16    internal class PartitionedUploader<TServiceSpecificArgs, TCompleteUploadReturn>
 17    {
 18        #region Definitions
 19        // delegte for getting a partition from a stream based on the selected data management stragegy
 20        private delegate Task<SlicedStream> GetNextStreamPartition(
 21            Stream stream,
 22            long minCount,
 23            long maxCount,
 24            long absolutePosition,
 25            bool async,
 26            CancellationToken cancellationToken);
 27
 28        // injected behaviors for services to use partitioned uploads
 29        public delegate DiagnosticScope CreateScope(string operationName);
 30        public delegate Task InitializeDestinationInternal(TServiceSpecificArgs args, bool async, CancellationToken canc
 31        public delegate Task<Response<TCompleteUploadReturn>> SingleUploadInternal(
 32            Stream contentStream,
 33            TServiceSpecificArgs args,
 34            IProgress<long> progressHandler,
 35            string operationName,
 36            bool async,
 37            CancellationToken cancellationToken);
 38        public delegate Task UploadPartitionInternal(Stream contentStream,
 39            long offset,
 40            TServiceSpecificArgs args,
 41            IProgress<long> progressHandler,
 42            bool async,
 43            CancellationToken cancellationToken);
 44        public delegate Task<Response<TCompleteUploadReturn>> CommitPartitionedUploadInternal(
 45            List<(long Offset, long Size)> partitions,
 46            TServiceSpecificArgs args,
 47            bool async,
 48            CancellationToken cancellationToken);
 49
 50        public struct Behaviors
 51        {
 052            public InitializeDestinationInternal InitializeDestination { get; set; }
 680053            public SingleUploadInternal SingleUpload { get; set; }
 680054            public UploadPartitionInternal UploadPartition { get; set; }
 680055            public CommitPartitionedUploadInternal CommitPartitionedUpload { get; set; }
 680056            public CreateScope Scope { get; set; }
 57        }
 58
 339859        public static readonly InitializeDestinationInternal InitializeNoOp = (args, async, cancellationToken) => Task.C
 60        #endregion
 61
 62        private readonly InitializeDestinationInternal _initializeDestinationInternal;
 63        private readonly SingleUploadInternal _singleUploadInternal;
 64        private readonly UploadPartitionInternal _uploadPartitionInternal;
 65        private readonly CommitPartitionedUploadInternal _commitPartitionedUploadInternal;
 66        private readonly CreateScope _createScope;
 67
 68        /// <summary>
 69        /// The maximum number of simultaneous workers.
 70        /// </summary>
 71        private readonly int _maxWorkerCount;
 72
 73        /// <summary>
 74        /// A pool of memory we use to partition the stream into blocks.
 75        /// </summary>
 76        private readonly ArrayPool<byte> _arrayPool;
 77
 78        /// <summary>
 79        /// The size we use to determine whether to upload as a one-off request or
 80        /// a partitioned/committed upload
 81        /// </summary>
 82        private readonly long _singleUploadThreshold;
 83
 84        /// <summary>
 85        /// The size of each staged block.  If null, we'll change between 4MB
 86        /// and 8MB depending on the size of the content.
 87        /// </summary>
 88        private readonly long? _blockSize;
 89
 90        /// <summary>
 91        /// The name of the calling operaiton.
 92        /// </summary>
 93        private readonly string _operationName;
 94
 340095        public PartitionedUploader(
 340096            Behaviors behaviors,
 340097            StorageTransferOptions transferOptions,
 340098            ArrayPool<byte> arrayPool = null,
 340099            string operationName = null)
 100        {
 101            // initialize isn't required for all services and can use a no-op; rest are required
 3400102            _initializeDestinationInternal = behaviors.InitializeDestination ?? InitializeNoOp;
 3400103            _singleUploadInternal = behaviors.SingleUpload
 3400104                ?? throw Errors.ArgumentNull(nameof(behaviors.SingleUpload));
 3400105            _uploadPartitionInternal = behaviors.UploadPartition
 3400106                ?? throw Errors.ArgumentNull(nameof(behaviors.UploadPartition));
 3400107            _commitPartitionedUploadInternal = behaviors.CommitPartitionedUpload
 3400108                ?? throw Errors.ArgumentNull(nameof(behaviors.CommitPartitionedUpload));
 3400109            _createScope = behaviors.Scope
 3400110                ?? throw Errors.ArgumentNull(nameof(behaviors.Scope));
 111
 3400112            _arrayPool = arrayPool ?? ArrayPool<byte>.Shared;
 113
 114            // Set _maxWorkerCount
 3400115            if (transferOptions.MaximumConcurrency.HasValue
 3400116                && transferOptions.MaximumConcurrency > 0)
 117            {
 44118                _maxWorkerCount = transferOptions.MaximumConcurrency.Value;
 119            }
 120            else
 121            {
 3356122                _maxWorkerCount = Constants.Blob.Block.DefaultConcurrentTransfersCount;
 123            }
 124
 125            // Set _singleUploadThreshold
 3400126            if (transferOptions.InitialTransferSize.HasValue
 3400127                && transferOptions.InitialTransferSize.Value > 0)
 128            {
 72129                _singleUploadThreshold = Math.Min(transferOptions.InitialTransferSize.Value, Constants.Blob.Block.MaxUpl
 130            }
 131            else
 132            {
 3328133                _singleUploadThreshold = Constants.Blob.Block.MaxUploadBytes;
 134            }
 135
 136            // Set _blockSize
 3400137            if (transferOptions.MaximumTransferSize.HasValue
 3400138                && transferOptions.MaximumTransferSize > 0)
 139            {
 88140                _blockSize = Math.Min(
 88141                    Constants.Blob.Block.MaxStageBytes,
 88142                    transferOptions.MaximumTransferSize.Value);
 143            }
 144
 3400145            _operationName = operationName;
 3400146        }
 147
 148        public async Task<Response<TCompleteUploadReturn>> UploadInternal(
 149            Stream content,
 150            TServiceSpecificArgs args,
 151            IProgress<long> progressHandler,
 152            bool async,
 153            CancellationToken cancellationToken = default)
 154        {
 3400155            if (content == default)
 156            {
 8157                throw Errors.ArgumentNull(nameof(content));
 158            }
 159
 3392160            await _initializeDestinationInternal(args, async, cancellationToken).ConfigureAwait(false);
 161
 162            // some strategies are unavailable if we don't know the stream length, and some can still work
 163            // we may introduce separately provided stream lengths in the future for unseekable streams with
 164            // an expected length
 3392165            long? length = GetLengthOrDefault(content);
 166
 167            // If we know the length and it's small enough
 3392168            if (length < _singleUploadThreshold)
 169            {
 170                // Upload it in a single request
 3304171                return await _singleUploadInternal(
 3304172                    content,
 3304173                    args,
 3304174                    progressHandler,
 3304175                    _operationName,
 3304176                    async,
 3304177                    cancellationToken)
 3304178                    .ConfigureAwait(false);
 179            }
 180
 181            // If the caller provided an explicit block size, we'll use it.
 182            // Otherwise we'll adjust dynamically based on the size of the
 183            // content.
 88184            long blockSize = _blockSize != null
 88185                ? _blockSize.Value
 88186                : length < Constants.LargeUploadThreshold ?
 88187                    Constants.DefaultBufferSize :
 88188                    Constants.LargeBufferSize;
 189
 190            // Otherwise stage individual blocks
 191
 192            /* We only support parallel upload in an async context to avoid issues in our overall sync story.
 193             * We're branching on both async and max worker count, where 3 combinations lead to
 194             * UploadInSequenceInternal and 1 combination leads to UploadInParallelAsync. We are guaranteed
 195             * to be in an async context when we call UploadInParallelAsync, even though the analyzer can't
 196             * detext this, and we properly pass in the async context in the else case when we haven't
 197             * explicitly checked.
 198             */
 199#pragma warning disable AZC0109 // Misuse of 'async' parameter.
 200#pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 88201            if (async && _maxWorkerCount > 1)
 202            {
 68203                return await UploadInParallelAsync(
 68204                    content,
 68205                    length,
 68206                    blockSize,
 68207                    args,
 68208                    progressHandler,
 68209                    cancellationToken)
 68210                    .ConfigureAwait(false);
 211            }
 212#pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 213#pragma warning restore AZC0109 // Misuse of 'async' parameter.
 214            else
 215            {
 20216                return await UploadInSequenceInternal(
 20217                    content,
 20218                    length,
 20219                    blockSize,
 20220                    args,
 20221                    progressHandler,
 20222                    async: async,
 20223                    cancellationToken).ConfigureAwait(false);
 224            }
 3344225        }
 226
 227        private async Task<Response<TCompleteUploadReturn>> UploadInSequenceInternal(
 228            Stream content,
 229            long? contentLength,
 230            long partitionSize,
 231            TServiceSpecificArgs args,
 232            IProgress<long> progressHandler,
 233            bool async,
 234            CancellationToken cancellationToken)
 235        {
 236            // Wrap the staging and commit calls in an Upload span for
 237            // distributed tracing
 20238            DiagnosticScope scope = _createScope(_operationName);
 239            try
 240            {
 20241                scope.Start();
 242
 243                // Wrap progressHandler in a AggregatingProgressIncrementer to prevent
 244                // progress from being reset with each stage blob operation.
 20245                if (progressHandler != null)
 246                {
 20247                    progressHandler = new AggregatingProgressIncrementer(progressHandler);
 248                }
 249
 250                // The list tracking blocks IDs we're going to commit
 20251                List<(long Offset, long Size)> partitions = new List<(long, long)>();
 252
 253                /* Streamed partitions only work if we can seek the stream; we need retries on
 254                 * individual uploads.
 255                 */
 20256                GetNextStreamPartition partitionGetter = content.CanSeek
 20257                            ? (GetNextStreamPartition)GetStreamedPartitionInternal
 20258                            : /*   redundant cast   */GetBufferedPartitionInternal;
 259
 260                // Partition the stream into individual blocks and stage them
 20261                if (async)
 262                {
 24263                    await foreach (SlicedStream block in GetPartitionsAsync(
 4264                        content,
 4265                        contentLength,
 4266                        partitionSize,
 4267                        partitionGetter,
 4268                        async: true,
 4269                        cancellationToken).ConfigureAwait(false))
 270                    {
 8271                        await StagePartitionAndDisposeInternal(
 8272                            block,
 8273                            block.AbsolutePosition,
 8274                            args,
 8275                            progressHandler,
 8276                            async: true,
 8277                            cancellationToken).ConfigureAwait(false);
 278
 8279                        partitions.Add((block.AbsolutePosition, block.Length));
 8280                    }
 281                }
 282                else
 283                {
 240284                    foreach (SlicedStream block in GetPartitionsAsync(
 16285                        content,
 16286                        contentLength,
 16287                        partitionSize,
 16288                        partitionGetter,
 16289                        async: false,
 16290                        cancellationToken).EnsureSyncEnumerable())
 291                    {
 104292                        StagePartitionAndDisposeInternal(
 104293                            block,
 104294                            block.AbsolutePosition,
 104295                            args,
 104296                            progressHandler,
 104297                            async: false,
 104298                            cancellationToken).EnsureCompleted();
 299
 104300                        partitions.Add((block.AbsolutePosition, block.Length));
 301                    }
 302                }
 303
 304                // Commit the block list after everything has been staged to
 305                // complete the upload
 20306                return await _commitPartitionedUploadInternal(
 20307                    partitions,
 20308                    args,
 20309                    async,
 20310                    cancellationToken).ConfigureAwait(false);
 311            }
 0312            catch (Exception ex)
 313            {
 0314                scope.Failed(ex);
 0315                throw;
 316            }
 317            finally
 318            {
 20319                scope.Dispose();
 320            }
 20321        }
 322
 323        private async Task<Response<TCompleteUploadReturn>> UploadInParallelAsync(
 324            Stream content,
 325            long? contentLength,
 326            long blockSize,
 327            TServiceSpecificArgs args,
 328            IProgress<long> progressHandler,
 329            CancellationToken cancellationToken)
 330        {
 331            // Wrap the staging and commit calls in an Upload span for
 332            // distributed tracing
 68333            DiagnosticScope scope = _createScope(_operationName);
 334            try
 335            {
 68336                scope.Start();
 337
 338                // Wrap progressHandler in a AggregatingProgressIncrementer to prevent
 339                // progress from being reset with each stage blob operation.
 68340                if (progressHandler != null)
 341                {
 12342                    progressHandler = new AggregatingProgressIncrementer(progressHandler);
 343                }
 344
 345                // The list tracking blocks IDs we're going to commit
 68346                List<(long Offset, long Size)> partitions = new List<(long, long)>();
 347
 348                // A list of tasks that are currently executing which will
 349                // always be smaller than _maxWorkerCount
 68350                List<Task> runningTasks = new List<Task>();
 351
 352                // Partition the stream into individual blocks
 2200353                await foreach (SlicedStream block in GetPartitionsAsync(
 68354                    content,
 68355                    contentLength,
 68356                    blockSize,
 68357                    GetBufferedPartitionInternal, // we always buffer for upload in parallel from stream
 68358                    async: true,
 68359                    cancellationToken).ConfigureAwait(false))
 360                {
 361                    /* We need to do this first! Length is calculated on the fly based on stream buffer
 362                     * contents; We need to record the partition data first before consuming the stream
 363                     * asynchronously. */
 1032364                    partitions.Add((block.AbsolutePosition, block.Length));
 365
 366                    // Start staging the next block (but don't await the Task!)
 1032367                    Task task = StagePartitionAndDisposeInternal(
 1032368                        block,
 1032369                        block.AbsolutePosition,
 1032370                        args,
 1032371                        progressHandler,
 1032372                        async: true,
 1032373                        cancellationToken);
 374
 375                    // Add the block to our task and commit lists
 1032376                    runningTasks.Add(task);
 377
 378                    // If we run out of workers
 1032379                    if (runningTasks.Count >= _maxWorkerCount)
 380                    {
 381                        // Wait for at least one of them to finish
 638382                        await Task.WhenAny(runningTasks).ConfigureAwait(false);
 383
 384                        // Clear any completed blocks from the task list
 7656385                        for (int i = 0; i < runningTasks.Count; i++)
 386                        {
 3190387                            Task runningTask = runningTasks[i];
 3190388                            if (!runningTask.IsCompleted)
 389                            {
 390                                continue;
 391                            }
 392
 838393                            await runningTask.ConfigureAwait(false);
 838394                            runningTasks.RemoveAt(i);
 838395                            i--;
 396                        }
 397                    }
 398                }
 399
 400                // Wait for all the remaining blocks to finish staging and then
 401                // commit the block list to complete the upload
 68402                await Task.WhenAll(runningTasks).ConfigureAwait(false);
 403
 404                // Calling internal method for easier mocking in PartitionedUploaderTests
 68405                return await _commitPartitionedUploadInternal(
 68406                    partitions,
 68407                    args,
 68408                    async: true,
 68409                    cancellationToken)
 68410                    .ConfigureAwait(false);
 411            }
 0412            catch (Exception ex)
 413            {
 0414                scope.Failed(ex);
 0415                throw;
 416            }
 417            finally
 418            {
 68419                scope.Dispose();
 420            }
 68421        }
 422
 423        /// <summary>
 424        /// Wraps both the async method and dispose call in one task.
 425        /// </summary>
 426        private async Task StagePartitionAndDisposeInternal(
 427            SlicedStream partition,
 428            long offset,
 429            TServiceSpecificArgs args,
 430            IProgress<long> progressHandler,
 431            bool async,
 432            CancellationToken cancellationToken)
 433        {
 434            try
 435            {
 1144436                await _uploadPartitionInternal(
 1144437                    partition,
 1144438                    offset,
 1144439                    args,
 1144440                    progressHandler,
 1144441                    async,
 1144442                    cancellationToken)
 1144443                    .ConfigureAwait(false);
 1144444            }
 445            finally
 446            {
 447                // Return the memory used by the block to our ArrayPool as soon
 448                // as we've staged it
 1144449                partition.Dispose();
 450            }
 1144451        }
 452
 453        /// <summary>
 454        /// Some streams will throw if you try to access their length so we wrap
 455        /// the check in a TryGet helper.
 456        /// </summary>
 457        private static long? GetLengthOrDefault(Stream content)
 458        {
 459            try
 460            {
 3392461                if (content.CanSeek)
 462                {
 3368463                    return content.Length;
 464                }
 24465            }
 0466            catch (NotSupportedException)
 467            {
 0468            }
 24469            return default;
 3368470        }
 471
 472        #region Stream Splitters
 473        /// <summary>
 474        /// Partition a stream into a series of blocks buffered as needed by an array pool.
 475        /// </summary>
 476        private static async IAsyncEnumerable<SlicedStream> GetPartitionsAsync(
 477            Stream stream,
 478            long? streamLength,
 479            long blockSize,
 480            GetNextStreamPartition getNextPartition,
 481            bool async,
 482            [EnumeratorCancellation] CancellationToken cancellationToken)
 483        {
 484            // The minimum amount of data we'll accept from a stream before
 485            // splitting another block. Code that sets `blockSize` will always
 486            // set it to a positive number. Min() only avoids edge case where
 487            // user sets their block size to 1.
 88488            long acceptableBlockSize = Math.Max(1, blockSize / 2);
 489
 490            // if we know the data length, assert boundaries before spending resources uploading beyond service capabili
 88491            if (streamLength.HasValue)
 492            {
 493                // service has a max block count per blob
 494                // block size * block count limit = max data length to upload
 495                // if stream length is longer than specified max block size allows, can't upload
 64496                long minRequiredBlockSize = (long)Math.Ceiling((double)streamLength.Value / Constants.Blob.Block.MaxBloc
 64497                if (blockSize < minRequiredBlockSize)
 498                {
 0499                    throw Errors.InsufficientStorageTransferOptions(streamLength.Value, blockSize, minRequiredBlockSize)
 500                }
 501                // bring min up to our min required by the service
 64502                acceptableBlockSize = Math.Max(acceptableBlockSize, minRequiredBlockSize);
 503            }
 504
 505            long read;
 88506            long absolutePosition = 0;
 507            do
 508            {
 1232509                SlicedStream partition = await getNextPartition(
 1232510                    stream,
 1232511                    acceptableBlockSize,
 1232512                    blockSize,
 1232513                    absolutePosition,
 1232514                    async,
 1232515                    cancellationToken).ConfigureAwait(false);
 1232516                read = partition.Length;
 1232517                absolutePosition += read;
 518
 519                // If we read anything, turn it into a StreamPartition and
 520                // return it for staging
 1232521                if (partition.Length != 0)
 522                {
 523                    // The StreamParitition is disposable and it'll be the
 524                    // user's responsibility to return the bytes used to our
 525                    // ArrayPool
 1144526                    yield return partition;
 527                }
 528
 529                // Continue reading blocks until we've exhausted the stream
 1232530            } while (read != 0);
 88531        }
 532
 533        /// <summary>
 534        /// Gets a partition from the current location of the given stream.
 535        ///
 536        /// This partition is buffered and it is safe to get many before using any of them.
 537        /// </summary>
 538        /// <param name="stream">
 539        /// Stream to buffer a partition from.
 540        /// </param>
 541        /// <param name="minCount">
 542        /// Minimum amount of data to wait on before finalizing buffer.
 543        /// </param>
 544        /// <param name="maxCount">
 545        /// Max amount of data to buffer before cutting off for the next.
 546        /// </param>
 547        /// <param name="absolutePosition">
 548        /// Offset of this stream relative to the large stream.
 549        /// </param>
 550        /// <param name="async">
 551        /// Whether to buffer this partition asynchronously.
 552        /// </param>
 553        /// <param name="cancellationToken">
 554        /// Cancellation token.
 555        /// </param>
 556        /// <returns>
 557        /// Task containing the buffered stream partition.
 558        /// </returns>
 559        private async Task<SlicedStream> GetBufferedPartitionInternal(
 560            Stream stream,
 561            long minCount,
 562            long maxCount,
 563            long absolutePosition,
 564            bool async,
 565            CancellationToken cancellationToken)
 1208566            => await PooledMemoryStream.BufferStreamPartitionInternal(
 1208567                stream,
 1208568                minCount,
 1208569                maxCount,
 1208570                absolutePosition,
 1208571                _arrayPool,
 1208572                maxArrayPoolRentalSize: default,
 1208573                async,
 1208574                cancellationToken).ConfigureAwait(false);
 575
 576        /// <summary>
 577        /// Gets a partition from the current location of the given stream.
 578        ///
 579        /// This partition is a facade over the existing stream, and the
 580        /// previous partition should be consumed before using the next.
 581        /// </summary>
 582        /// <param name="stream">
 583        /// Stream to wrap.
 584        /// </param>
 585        /// <param name="minCount">
 586        /// Unused, but part of <see cref="GetNextStreamPartition"/> definition.
 587        /// </param>
 588        /// <param name="maxCount">
 589        /// Length of this facade stream.
 590        /// </param>
 591        /// <param name="absolutePosition">
 592        /// Offset of this stream relative to the large stream.
 593        /// </param>
 594        /// <param name="async">
 595        /// Unused, but part of <see cref="GetNextStreamPartition"/> definition.
 596        /// </param>
 597        /// <param name="cancellationToken"></param>
 598        /// <returns>
 599        /// Task containing the stream facade.
 600        /// </returns>
 601        private static Task<SlicedStream> GetStreamedPartitionInternal(
 602            Stream stream,
 603            long minCount,
 604            long maxCount,
 605            long absolutePosition,
 606            bool async,
 607            CancellationToken cancellationToken)
 24608            => Task.FromResult((SlicedStream)WindowStream.GetWindow(stream, maxCount, absolutePosition));
 609        #endregion
 610    }
 611}