| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Buffers; |
| | 6 | | using System.Collections.Generic; |
| | 7 | | using System.IO; |
| | 8 | | using System.Runtime.CompilerServices; |
| | 9 | | using System.Threading; |
| | 10 | | using System.Threading.Tasks; |
| | 11 | | using Azure.Core.Pipeline; |
| | 12 | | using Azure.Storage.Shared; |
| | 13 | |
|
| | 14 | | namespace Azure.Storage |
| | 15 | | { |
| | 16 | | internal class PartitionedUploader<TServiceSpecificArgs, TCompleteUploadReturn> |
| | 17 | | { |
| | 18 | | #region Definitions |
| | 19 | | // delegte for getting a partition from a stream based on the selected data management stragegy |
| | 20 | | private delegate Task<SlicedStream> GetNextStreamPartition( |
| | 21 | | Stream stream, |
| | 22 | | long minCount, |
| | 23 | | long maxCount, |
| | 24 | | long absolutePosition, |
| | 25 | | bool async, |
| | 26 | | CancellationToken cancellationToken); |
| | 27 | |
|
| | 28 | | // injected behaviors for services to use partitioned uploads |
| | 29 | | public delegate DiagnosticScope CreateScope(string operationName); |
| | 30 | | public delegate Task InitializeDestinationInternal(TServiceSpecificArgs args, bool async, CancellationToken canc |
| | 31 | | public delegate Task<Response<TCompleteUploadReturn>> SingleUploadInternal( |
| | 32 | | Stream contentStream, |
| | 33 | | TServiceSpecificArgs args, |
| | 34 | | IProgress<long> progressHandler, |
| | 35 | | string operationName, |
| | 36 | | bool async, |
| | 37 | | CancellationToken cancellationToken); |
| | 38 | | public delegate Task UploadPartitionInternal(Stream contentStream, |
| | 39 | | long offset, |
| | 40 | | TServiceSpecificArgs args, |
| | 41 | | IProgress<long> progressHandler, |
| | 42 | | bool async, |
| | 43 | | CancellationToken cancellationToken); |
| | 44 | | public delegate Task<Response<TCompleteUploadReturn>> CommitPartitionedUploadInternal( |
| | 45 | | List<(long Offset, long Size)> partitions, |
| | 46 | | TServiceSpecificArgs args, |
| | 47 | | bool async, |
| | 48 | | CancellationToken cancellationToken); |
| | 49 | |
|
| | 50 | | public struct Behaviors |
| | 51 | | { |
| 384 | 52 | | public InitializeDestinationInternal InitializeDestination { get; set; } |
| 384 | 53 | | public SingleUploadInternal SingleUpload { get; set; } |
| 384 | 54 | | public UploadPartitionInternal UploadPartition { get; set; } |
| 384 | 55 | | public CommitPartitionedUploadInternal CommitPartitionedUpload { get; set; } |
| 384 | 56 | | public CreateScope Scope { get; set; } |
| | 57 | | } |
| | 58 | |
|
| 0 | 59 | | public static readonly InitializeDestinationInternal InitializeNoOp = (args, async, cancellationToken) => Task.C |
| | 60 | | #endregion |
| | 61 | |
|
| | 62 | | private readonly InitializeDestinationInternal _initializeDestinationInternal; |
| | 63 | | private readonly SingleUploadInternal _singleUploadInternal; |
| | 64 | | private readonly UploadPartitionInternal _uploadPartitionInternal; |
| | 65 | | private readonly CommitPartitionedUploadInternal _commitPartitionedUploadInternal; |
| | 66 | | private readonly CreateScope _createScope; |
| | 67 | |
|
| | 68 | | /// <summary> |
| | 69 | | /// The maximum number of simultaneous workers. |
| | 70 | | /// </summary> |
| | 71 | | private readonly int _maxWorkerCount; |
| | 72 | |
|
| | 73 | | /// <summary> |
| | 74 | | /// A pool of memory we use to partition the stream into blocks. |
| | 75 | | /// </summary> |
| | 76 | | private readonly ArrayPool<byte> _arrayPool; |
| | 77 | |
|
| | 78 | | /// <summary> |
| | 79 | | /// The size we use to determine whether to upload as a one-off request or |
| | 80 | | /// a partitioned/committed upload |
| | 81 | | /// </summary> |
| | 82 | | private readonly long _singleUploadThreshold; |
| | 83 | |
|
| | 84 | | /// <summary> |
| | 85 | | /// The size of each staged block. If null, we'll change between 4MB |
| | 86 | | /// and 8MB depending on the size of the content. |
| | 87 | | /// </summary> |
| | 88 | | private readonly long? _blockSize; |
| | 89 | |
|
| | 90 | | /// <summary> |
| | 91 | | /// The name of the calling operaiton. |
| | 92 | | /// </summary> |
| | 93 | | private readonly string _operationName; |
| | 94 | |
|
| 192 | 95 | | public PartitionedUploader( |
| 192 | 96 | | Behaviors behaviors, |
| 192 | 97 | | StorageTransferOptions transferOptions, |
| 192 | 98 | | ArrayPool<byte> arrayPool = null, |
| 192 | 99 | | string operationName = null) |
| | 100 | | { |
| | 101 | | // initialize isn't required for all services and can use a no-op; rest are required |
| 192 | 102 | | _initializeDestinationInternal = behaviors.InitializeDestination ?? InitializeNoOp; |
| 192 | 103 | | _singleUploadInternal = behaviors.SingleUpload |
| 192 | 104 | | ?? throw Errors.ArgumentNull(nameof(behaviors.SingleUpload)); |
| 192 | 105 | | _uploadPartitionInternal = behaviors.UploadPartition |
| 192 | 106 | | ?? throw Errors.ArgumentNull(nameof(behaviors.UploadPartition)); |
| 192 | 107 | | _commitPartitionedUploadInternal = behaviors.CommitPartitionedUpload |
| 192 | 108 | | ?? throw Errors.ArgumentNull(nameof(behaviors.CommitPartitionedUpload)); |
| 192 | 109 | | _createScope = behaviors.Scope |
| 192 | 110 | | ?? throw Errors.ArgumentNull(nameof(behaviors.Scope)); |
| | 111 | |
|
| 192 | 112 | | _arrayPool = arrayPool ?? ArrayPool<byte>.Shared; |
| | 113 | |
|
| | 114 | | // Set _maxWorkerCount |
| 192 | 115 | | if (transferOptions.MaximumConcurrency.HasValue |
| 192 | 116 | | && transferOptions.MaximumConcurrency > 0) |
| | 117 | | { |
| 0 | 118 | | _maxWorkerCount = transferOptions.MaximumConcurrency.Value; |
| | 119 | | } |
| | 120 | | else |
| | 121 | | { |
| 192 | 122 | | _maxWorkerCount = Constants.Blob.Block.DefaultConcurrentTransfersCount; |
| | 123 | | } |
| | 124 | |
|
| | 125 | | // Set _singleUploadThreshold |
| 192 | 126 | | if (transferOptions.InitialTransferSize.HasValue |
| 192 | 127 | | && transferOptions.InitialTransferSize.Value > 0) |
| | 128 | | { |
| 4 | 129 | | _singleUploadThreshold = Math.Min(transferOptions.InitialTransferSize.Value, Constants.Blob.Block.MaxUpl |
| | 130 | | } |
| | 131 | | else |
| | 132 | | { |
| 188 | 133 | | _singleUploadThreshold = Constants.Blob.Block.MaxUploadBytes; |
| | 134 | | } |
| | 135 | |
|
| | 136 | | // Set _blockSize |
| 192 | 137 | | if (transferOptions.MaximumTransferSize.HasValue |
| 192 | 138 | | && transferOptions.MaximumTransferSize > 0) |
| | 139 | | { |
| 16 | 140 | | _blockSize = Math.Min( |
| 16 | 141 | | Constants.Blob.Block.MaxStageBytes, |
| 16 | 142 | | transferOptions.MaximumTransferSize.Value); |
| | 143 | | } |
| | 144 | |
|
| 192 | 145 | | _operationName = operationName; |
| 192 | 146 | | } |
| | 147 | |
|
| | 148 | | public async Task<Response<TCompleteUploadReturn>> UploadInternal( |
| | 149 | | Stream content, |
| | 150 | | TServiceSpecificArgs args, |
| | 151 | | IProgress<long> progressHandler, |
| | 152 | | bool async, |
| | 153 | | CancellationToken cancellationToken = default) |
| | 154 | | { |
| 192 | 155 | | if (content == default) |
| | 156 | | { |
| 0 | 157 | | throw Errors.ArgumentNull(nameof(content)); |
| | 158 | | } |
| | 159 | |
|
| 192 | 160 | | await _initializeDestinationInternal(args, async, cancellationToken).ConfigureAwait(false); |
| | 161 | |
|
| | 162 | | // some strategies are unavailable if we don't know the stream length, and some can still work |
| | 163 | | // we may introduce separately provided stream lengths in the future for unseekable streams with |
| | 164 | | // an expected length |
| 184 | 165 | | long? length = GetLengthOrDefault(content); |
| | 166 | |
|
| | 167 | | // If we know the length and it's small enough |
| 184 | 168 | | if (length < _singleUploadThreshold) |
| | 169 | | { |
| | 170 | | // Upload it in a single request |
| 160 | 171 | | return await _singleUploadInternal( |
| 160 | 172 | | content, |
| 160 | 173 | | args, |
| 160 | 174 | | progressHandler, |
| 160 | 175 | | _operationName, |
| 160 | 176 | | async, |
| 160 | 177 | | cancellationToken) |
| 160 | 178 | | .ConfigureAwait(false); |
| | 179 | | } |
| | 180 | |
|
| | 181 | | // If the caller provided an explicit block size, we'll use it. |
| | 182 | | // Otherwise we'll adjust dynamically based on the size of the |
| | 183 | | // content. |
| 24 | 184 | | long blockSize = _blockSize != null |
| 24 | 185 | | ? _blockSize.Value |
| 24 | 186 | | : length < Constants.LargeUploadThreshold ? |
| 24 | 187 | | Constants.DefaultBufferSize : |
| 24 | 188 | | Constants.LargeBufferSize; |
| | 189 | |
|
| | 190 | | // Otherwise stage individual blocks |
| | 191 | |
|
| | 192 | | /* We only support parallel upload in an async context to avoid issues in our overall sync story. |
| | 193 | | * We're branching on both async and max worker count, where 3 combinations lead to |
| | 194 | | * UploadInSequenceInternal and 1 combination leads to UploadInParallelAsync. We are guaranteed |
| | 195 | | * to be in an async context when we call UploadInParallelAsync, even though the analyzer can't |
| | 196 | | * detext this, and we properly pass in the async context in the else case when we haven't |
| | 197 | | * explicitly checked. |
| | 198 | | */ |
| | 199 | | #pragma warning disable AZC0109 // Misuse of 'async' parameter. |
| | 200 | | #pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope. |
| 24 | 201 | | if (async && _maxWorkerCount > 1) |
| | 202 | | { |
| 12 | 203 | | return await UploadInParallelAsync( |
| 12 | 204 | | content, |
| 12 | 205 | | length, |
| 12 | 206 | | blockSize, |
| 12 | 207 | | args, |
| 12 | 208 | | progressHandler, |
| 12 | 209 | | cancellationToken) |
| 12 | 210 | | .ConfigureAwait(false); |
| | 211 | | } |
| | 212 | | #pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope. |
| | 213 | | #pragma warning restore AZC0109 // Misuse of 'async' parameter. |
| | 214 | | else |
| | 215 | | { |
| 12 | 216 | | return await UploadInSequenceInternal( |
| 12 | 217 | | content, |
| 12 | 218 | | length, |
| 12 | 219 | | blockSize, |
| 12 | 220 | | args, |
| 12 | 221 | | progressHandler, |
| 12 | 222 | | async: async, |
| 12 | 223 | | cancellationToken).ConfigureAwait(false); |
| | 224 | | } |
| 184 | 225 | | } |
| | 226 | |
|
| | 227 | | private async Task<Response<TCompleteUploadReturn>> UploadInSequenceInternal( |
| | 228 | | Stream content, |
| | 229 | | long? contentLength, |
| | 230 | | long partitionSize, |
| | 231 | | TServiceSpecificArgs args, |
| | 232 | | IProgress<long> progressHandler, |
| | 233 | | bool async, |
| | 234 | | CancellationToken cancellationToken) |
| | 235 | | { |
| | 236 | | // Wrap the staging and commit calls in an Upload span for |
| | 237 | | // distributed tracing |
| 12 | 238 | | DiagnosticScope scope = _createScope(_operationName); |
| | 239 | | try |
| | 240 | | { |
| 12 | 241 | | scope.Start(); |
| | 242 | |
|
| | 243 | | // Wrap progressHandler in a AggregatingProgressIncrementer to prevent |
| | 244 | | // progress from being reset with each stage blob operation. |
| 12 | 245 | | if (progressHandler != null) |
| | 246 | | { |
| 12 | 247 | | progressHandler = new AggregatingProgressIncrementer(progressHandler); |
| | 248 | | } |
| | 249 | |
|
| | 250 | | // The list tracking blocks IDs we're going to commit |
| 12 | 251 | | List<(long Offset, long Size)> partitions = new List<(long, long)>(); |
| | 252 | |
|
| | 253 | | /* Streamed partitions only work if we can seek the stream; we need retries on |
| | 254 | | * individual uploads. |
| | 255 | | */ |
| 12 | 256 | | GetNextStreamPartition partitionGetter = content.CanSeek |
| 12 | 257 | | ? (GetNextStreamPartition)GetStreamedPartitionInternal |
| 12 | 258 | | : /* redundant cast */GetBufferedPartitionInternal; |
| | 259 | |
|
| | 260 | | // Partition the stream into individual blocks and stage them |
| 12 | 261 | | if (async) |
| | 262 | | { |
| 0 | 263 | | await foreach (SlicedStream block in GetPartitionsAsync( |
| 0 | 264 | | content, |
| 0 | 265 | | contentLength, |
| 0 | 266 | | partitionSize, |
| 0 | 267 | | partitionGetter, |
| 0 | 268 | | async: true, |
| 0 | 269 | | cancellationToken).ConfigureAwait(false)) |
| | 270 | | { |
| 0 | 271 | | await StagePartitionAndDisposeInternal( |
| 0 | 272 | | block, |
| 0 | 273 | | block.AbsolutePosition, |
| 0 | 274 | | args, |
| 0 | 275 | | progressHandler, |
| 0 | 276 | | async: true, |
| 0 | 277 | | cancellationToken).ConfigureAwait(false); |
| | 278 | |
|
| 0 | 279 | | partitions.Add((block.AbsolutePosition, block.Length)); |
| 0 | 280 | | } |
| | 281 | | } |
| | 282 | | else |
| | 283 | | { |
| 216 | 284 | | foreach (SlicedStream block in GetPartitionsAsync( |
| 12 | 285 | | content, |
| 12 | 286 | | contentLength, |
| 12 | 287 | | partitionSize, |
| 12 | 288 | | partitionGetter, |
| 12 | 289 | | async: false, |
| 12 | 290 | | cancellationToken).EnsureSyncEnumerable()) |
| | 291 | | { |
| 96 | 292 | | StagePartitionAndDisposeInternal( |
| 96 | 293 | | block, |
| 96 | 294 | | block.AbsolutePosition, |
| 96 | 295 | | args, |
| 96 | 296 | | progressHandler, |
| 96 | 297 | | async: false, |
| 96 | 298 | | cancellationToken).EnsureCompleted(); |
| | 299 | |
|
| 96 | 300 | | partitions.Add((block.AbsolutePosition, block.Length)); |
| | 301 | | } |
| | 302 | | } |
| | 303 | |
|
| | 304 | | // Commit the block list after everything has been staged to |
| | 305 | | // complete the upload |
| 12 | 306 | | return await _commitPartitionedUploadInternal( |
| 12 | 307 | | partitions, |
| 12 | 308 | | args, |
| 12 | 309 | | async, |
| 12 | 310 | | cancellationToken).ConfigureAwait(false); |
| | 311 | | } |
| 0 | 312 | | catch (Exception ex) |
| | 313 | | { |
| 0 | 314 | | scope.Failed(ex); |
| 0 | 315 | | throw; |
| | 316 | | } |
| | 317 | | finally |
| | 318 | | { |
| 12 | 319 | | scope.Dispose(); |
| | 320 | | } |
| 12 | 321 | | } |
| | 322 | |
|
| | 323 | | private async Task<Response<TCompleteUploadReturn>> UploadInParallelAsync( |
| | 324 | | Stream content, |
| | 325 | | long? contentLength, |
| | 326 | | long blockSize, |
| | 327 | | TServiceSpecificArgs args, |
| | 328 | | IProgress<long> progressHandler, |
| | 329 | | CancellationToken cancellationToken) |
| | 330 | | { |
| | 331 | | // Wrap the staging and commit calls in an Upload span for |
| | 332 | | // distributed tracing |
| 12 | 333 | | DiagnosticScope scope = _createScope(_operationName); |
| | 334 | | try |
| | 335 | | { |
| 12 | 336 | | scope.Start(); |
| | 337 | |
|
| | 338 | | // Wrap progressHandler in a AggregatingProgressIncrementer to prevent |
| | 339 | | // progress from being reset with each stage blob operation. |
| 12 | 340 | | if (progressHandler != null) |
| | 341 | | { |
| 12 | 342 | | progressHandler = new AggregatingProgressIncrementer(progressHandler); |
| | 343 | | } |
| | 344 | |
|
| | 345 | | // The list tracking blocks IDs we're going to commit |
| 12 | 346 | | List<(long Offset, long Size)> partitions = new List<(long, long)>(); |
| | 347 | |
|
| | 348 | | // A list of tasks that are currently executing which will |
| | 349 | | // always be smaller than _maxWorkerCount |
| 12 | 350 | | List<Task> runningTasks = new List<Task>(); |
| | 351 | |
|
| | 352 | | // Partition the stream into individual blocks |
| 216 | 353 | | await foreach (SlicedStream block in GetPartitionsAsync( |
| 12 | 354 | | content, |
| 12 | 355 | | contentLength, |
| 12 | 356 | | blockSize, |
| 12 | 357 | | GetBufferedPartitionInternal, // we always buffer for upload in parallel from stream |
| 12 | 358 | | async: true, |
| 12 | 359 | | cancellationToken).ConfigureAwait(false)) |
| | 360 | | { |
| | 361 | | /* We need to do this first! Length is calculated on the fly based on stream buffer |
| | 362 | | * contents; We need to record the partition data first before consuming the stream |
| | 363 | | * asynchronously. */ |
| 96 | 364 | | partitions.Add((block.AbsolutePosition, block.Length)); |
| | 365 | |
|
| | 366 | | // Start staging the next block (but don't await the Task!) |
| 96 | 367 | | Task task = StagePartitionAndDisposeInternal( |
| 96 | 368 | | block, |
| 96 | 369 | | block.AbsolutePosition, |
| 96 | 370 | | args, |
| 96 | 371 | | progressHandler, |
| 96 | 372 | | async: true, |
| 96 | 373 | | cancellationToken); |
| | 374 | |
|
| | 375 | | // Add the block to our task and commit lists |
| 96 | 376 | | runningTasks.Add(task); |
| | 377 | |
|
| | 378 | | // If we run out of workers |
| 96 | 379 | | if (runningTasks.Count >= _maxWorkerCount) |
| | 380 | | { |
| | 381 | | // Wait for at least one of them to finish |
| 32 | 382 | | await Task.WhenAny(runningTasks).ConfigureAwait(false); |
| | 383 | |
|
| | 384 | | // Clear any completed blocks from the task list |
| 384 | 385 | | for (int i = 0; i < runningTasks.Count; i++) |
| | 386 | | { |
| 160 | 387 | | Task runningTask = runningTasks[i]; |
| 160 | 388 | | if (!runningTask.IsCompleted) |
| | 389 | | { |
| | 390 | | continue; |
| | 391 | | } |
| | 392 | |
|
| 74 | 393 | | await runningTask.ConfigureAwait(false); |
| 74 | 394 | | runningTasks.RemoveAt(i); |
| 74 | 395 | | i--; |
| | 396 | | } |
| | 397 | | } |
| | 398 | | } |
| | 399 | |
|
| | 400 | | // Wait for all the remaining blocks to finish staging and then |
| | 401 | | // commit the block list to complete the upload |
| 12 | 402 | | await Task.WhenAll(runningTasks).ConfigureAwait(false); |
| | 403 | |
|
| | 404 | | // Calling internal method for easier mocking in PartitionedUploaderTests |
| 12 | 405 | | return await _commitPartitionedUploadInternal( |
| 12 | 406 | | partitions, |
| 12 | 407 | | args, |
| 12 | 408 | | async: true, |
| 12 | 409 | | cancellationToken) |
| 12 | 410 | | .ConfigureAwait(false); |
| | 411 | | } |
| 0 | 412 | | catch (Exception ex) |
| | 413 | | { |
| 0 | 414 | | scope.Failed(ex); |
| 0 | 415 | | throw; |
| | 416 | | } |
| | 417 | | finally |
| | 418 | | { |
| 12 | 419 | | scope.Dispose(); |
| | 420 | | } |
| 12 | 421 | | } |
| | 422 | |
|
| | 423 | | /// <summary> |
| | 424 | | /// Wraps both the async method and dispose call in one task. |
| | 425 | | /// </summary> |
| | 426 | | private async Task StagePartitionAndDisposeInternal( |
| | 427 | | SlicedStream partition, |
| | 428 | | long offset, |
| | 429 | | TServiceSpecificArgs args, |
| | 430 | | IProgress<long> progressHandler, |
| | 431 | | bool async, |
| | 432 | | CancellationToken cancellationToken) |
| | 433 | | { |
| | 434 | | try |
| | 435 | | { |
| 192 | 436 | | await _uploadPartitionInternal( |
| 192 | 437 | | partition, |
| 192 | 438 | | offset, |
| 192 | 439 | | args, |
| 192 | 440 | | progressHandler, |
| 192 | 441 | | async, |
| 192 | 442 | | cancellationToken) |
| 192 | 443 | | .ConfigureAwait(false); |
| 192 | 444 | | } |
| | 445 | | finally |
| | 446 | | { |
| | 447 | | // Return the memory used by the block to our ArrayPool as soon |
| | 448 | | // as we've staged it |
| 192 | 449 | | partition.Dispose(); |
| | 450 | | } |
| 192 | 451 | | } |
| | 452 | |
|
| | 453 | | /// <summary> |
| | 454 | | /// Some streams will throw if you try to access their length so we wrap |
| | 455 | | /// the check in a TryGet helper. |
| | 456 | | /// </summary> |
| | 457 | | private static long? GetLengthOrDefault(Stream content) |
| | 458 | | { |
| | 459 | | try |
| | 460 | | { |
| 184 | 461 | | if (content.CanSeek) |
| | 462 | | { |
| 164 | 463 | | return content.Length; |
| | 464 | | } |
| 20 | 465 | | } |
| 0 | 466 | | catch (NotSupportedException) |
| | 467 | | { |
| 0 | 468 | | } |
| 20 | 469 | | return default; |
| 164 | 470 | | } |
| | 471 | |
|
| | 472 | | #region Stream Splitters |
| | 473 | | /// <summary> |
| | 474 | | /// Partition a stream into a series of blocks buffered as needed by an array pool. |
| | 475 | | /// </summary> |
| | 476 | | private static async IAsyncEnumerable<SlicedStream> GetPartitionsAsync( |
| | 477 | | Stream stream, |
| | 478 | | long? streamLength, |
| | 479 | | long blockSize, |
| | 480 | | GetNextStreamPartition getNextPartition, |
| | 481 | | bool async, |
| | 482 | | [EnumeratorCancellation] CancellationToken cancellationToken) |
| | 483 | | { |
| | 484 | | // The minimum amount of data we'll accept from a stream before |
| | 485 | | // splitting another block. Code that sets `blockSize` will always |
| | 486 | | // set it to a positive number. Min() only avoids edge case where |
| | 487 | | // user sets their block size to 1. |
| 24 | 488 | | long acceptableBlockSize = Math.Max(1, blockSize / 2); |
| | 489 | |
|
| | 490 | | // if we know the data length, assert boundaries before spending resources uploading beyond service capabili |
| 24 | 491 | | if (streamLength.HasValue) |
| | 492 | | { |
| | 493 | | // service has a max block count per blob |
| | 494 | | // block size * block count limit = max data length to upload |
| | 495 | | // if stream length is longer than specified max block size allows, can't upload |
| 4 | 496 | | long minRequiredBlockSize = (long)Math.Ceiling((double)streamLength.Value / Constants.Blob.Block.MaxBloc |
| 4 | 497 | | if (blockSize < minRequiredBlockSize) |
| | 498 | | { |
| 0 | 499 | | throw Errors.InsufficientStorageTransferOptions(streamLength.Value, blockSize, minRequiredBlockSize) |
| | 500 | | } |
| | 501 | | // bring min up to our min required by the service |
| 4 | 502 | | acceptableBlockSize = Math.Max(acceptableBlockSize, minRequiredBlockSize); |
| | 503 | | } |
| | 504 | |
|
| | 505 | | long read; |
| 24 | 506 | | long absolutePosition = 0; |
| | 507 | | do |
| | 508 | | { |
| 216 | 509 | | SlicedStream partition = await getNextPartition( |
| 216 | 510 | | stream, |
| 216 | 511 | | acceptableBlockSize, |
| 216 | 512 | | blockSize, |
| 216 | 513 | | absolutePosition, |
| 216 | 514 | | async, |
| 216 | 515 | | cancellationToken).ConfigureAwait(false); |
| 216 | 516 | | read = partition.Length; |
| 216 | 517 | | absolutePosition += read; |
| | 518 | |
|
| | 519 | | // If we read anything, turn it into a StreamPartition and |
| | 520 | | // return it for staging |
| 216 | 521 | | if (partition.Length != 0) |
| | 522 | | { |
| | 523 | | // The StreamParitition is disposable and it'll be the |
| | 524 | | // user's responsibility to return the bytes used to our |
| | 525 | | // ArrayPool |
| 192 | 526 | | yield return partition; |
| | 527 | | } |
| | 528 | |
|
| | 529 | | // Continue reading blocks until we've exhausted the stream |
| 216 | 530 | | } while (read != 0); |
| 24 | 531 | | } |
| | 532 | |
|
| | 533 | | /// <summary> |
| | 534 | | /// Gets a partition from the current location of the given stream. |
| | 535 | | /// |
| | 536 | | /// This partition is buffered and it is safe to get many before using any of them. |
| | 537 | | /// </summary> |
| | 538 | | /// <param name="stream"> |
| | 539 | | /// Stream to buffer a partition from. |
| | 540 | | /// </param> |
| | 541 | | /// <param name="minCount"> |
| | 542 | | /// Minimum amount of data to wait on before finalizing buffer. |
| | 543 | | /// </param> |
| | 544 | | /// <param name="maxCount"> |
| | 545 | | /// Max amount of data to buffer before cutting off for the next. |
| | 546 | | /// </param> |
| | 547 | | /// <param name="absolutePosition"> |
| | 548 | | /// Offset of this stream relative to the large stream. |
| | 549 | | /// </param> |
| | 550 | | /// <param name="async"> |
| | 551 | | /// Whether to buffer this partition asynchronously. |
| | 552 | | /// </param> |
| | 553 | | /// <param name="cancellationToken"> |
| | 554 | | /// Cancellation token. |
| | 555 | | /// </param> |
| | 556 | | /// <returns> |
| | 557 | | /// Task containing the buffered stream partition. |
| | 558 | | /// </returns> |
| | 559 | | private async Task<SlicedStream> GetBufferedPartitionInternal( |
| | 560 | | Stream stream, |
| | 561 | | long minCount, |
| | 562 | | long maxCount, |
| | 563 | | long absolutePosition, |
| | 564 | | bool async, |
| | 565 | | CancellationToken cancellationToken) |
| 210 | 566 | | => await PooledMemoryStream.BufferStreamPartitionInternal( |
| 210 | 567 | | stream, |
| 210 | 568 | | minCount, |
| 210 | 569 | | maxCount, |
| 210 | 570 | | absolutePosition, |
| 210 | 571 | | _arrayPool, |
| 210 | 572 | | maxArrayPoolRentalSize: default, |
| 210 | 573 | | async, |
| 210 | 574 | | cancellationToken).ConfigureAwait(false); |
| | 575 | |
|
| | 576 | | /// <summary> |
| | 577 | | /// Gets a partition from the current location of the given stream. |
| | 578 | | /// |
| | 579 | | /// This partition is a facade over the existing stream, and the |
| | 580 | | /// previous partition should be consumed before using the next. |
| | 581 | | /// </summary> |
| | 582 | | /// <param name="stream"> |
| | 583 | | /// Stream to wrap. |
| | 584 | | /// </param> |
| | 585 | | /// <param name="minCount"> |
| | 586 | | /// Unused, but part of <see cref="GetNextStreamPartition"/> definition. |
| | 587 | | /// </param> |
| | 588 | | /// <param name="maxCount"> |
| | 589 | | /// Length of this facade stream. |
| | 590 | | /// </param> |
| | 591 | | /// <param name="absolutePosition"> |
| | 592 | | /// Offset of this stream relative to the large stream. |
| | 593 | | /// </param> |
| | 594 | | /// <param name="async"> |
| | 595 | | /// Unused, but part of <see cref="GetNextStreamPartition"/> definition. |
| | 596 | | /// </param> |
| | 597 | | /// <param name="cancellationToken"></param> |
| | 598 | | /// <returns> |
| | 599 | | /// Task containing the stream facade. |
| | 600 | | /// </returns> |
| | 601 | | private static Task<SlicedStream> GetStreamedPartitionInternal( |
| | 602 | | Stream stream, |
| | 603 | | long minCount, |
| | 604 | | long maxCount, |
| | 605 | | long absolutePosition, |
| | 606 | | bool async, |
| | 607 | | CancellationToken cancellationToken) |
| 6 | 608 | | => Task.FromResult((SlicedStream)WindowStream.GetWindow(stream, maxCount, absolutePosition)); |
| | 609 | | #endregion |
| | 610 | | } |
| | 611 | | } |