< Summary

Class:Azure.Storage.Blobs.ChangeFeed.LazyLoadingBlobStream
Assembly:Azure.Storage.Blobs.ChangeFeed
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs.ChangeFeed\src\LazyLoadingBlobStream.cs
Covered lines:58
Uncovered lines:17
Coverable lines:75
Total lines:250
Line coverage:77.3% (58 of 75)
Covered branches:21
Total branches:24
Branch coverage:87.5% (21 of 24)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-100%100%
.ctor()-100%100%
Read(...)-0%100%
ReadAsync()-100%100%
Initalize()-100%100%
DownloadBlock()-100%50%
ReadInternal()-94.74%83.33%
ValidateReadParameters(...)-100%100%
GetBlobLength(...)-100%100%
get_CanRead()-0%100%
get_CanSeek()-100%100%
get_CanWrite()-0%100%
get_Length()-0%100%
get_Position()-0%100%
set_Position(...)-0%100%
Flush()-0%100%
FlushAsync(...)-0%100%
Seek(...)-0%100%
SetLength(...)-0%100%
Write(...)-0%100%
Dispose(...)-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs.ChangeFeed\src\LazyLoadingBlobStream.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Globalization;
 7using System.IO;
 8using System.Text;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Azure.Core.Pipeline;
 12using Azure.Storage.Blobs.Models;
 13
 14namespace Azure.Storage.Blobs.ChangeFeed
 15{
 16    internal class LazyLoadingBlobStream : Stream
 17    {
 18        /// <summary>
 19        /// BlobClient to make download calls with.
 20        /// </summary>
 21        private readonly BlobClient _blobClient;
 22
 23        /// <summary>
 24        /// The offset within the blob of the next block we will download.
 25        /// </summary>
 26        private long _offset;
 27
 28        /// <summary>
 29        /// The number of bytes we'll download with each download call.
 30        /// </summary>
 31        private readonly long _blockSize;
 32
 33        /// <summary>
 34        /// Underlying Stream.
 35        /// </summary>
 36        private Stream _stream;
 37
 38        /// <summary>
 39        /// If this LazyLoadingBlobStream has been initalized.
 40        /// </summary>
 41        private bool _initalized;
 42
 43        /// <summary>
 44        /// The number of bytes in the last download call.
 45        /// </summary>
 46        private long _lastDownloadBytes;
 47
 48        /// <summary>
 49        /// The current length of the blob.
 50        /// </summary>
 51        private long _blobLength;
 52
 27253        public LazyLoadingBlobStream(BlobClient blobClient, long offset, long blockSize)
 54        {
 27255            _blobClient = blobClient;
 27256            _offset = offset;
 27257            _blockSize = blockSize;
 27258            _initalized = false;
 27259        }
 60
 61        /// <summary>
 62        /// Constructor for mocking.
 63        /// </summary>
 3264        public LazyLoadingBlobStream() { }
 65
 66        /// <inheritdoc/>
 67        public override int Read(
 68            byte[] buffer,
 69            int offset,
 70            int count)
 071            => ReadInternal(
 072                async: false,
 073                buffer,
 074                offset,
 075                count).EnsureCompleted();
 76
 77        /// <inheritdoc/>
 78        public override async Task<int> ReadAsync(
 79            byte[] buffer,
 80            int offset,
 81            int count,
 82            CancellationToken cancellationToken)
 154978483            => await ReadInternal(
 154978484                async: true,
 154978485                buffer,
 154978486                offset,
 154978487                count,
 154978488                cancellationToken).ConfigureAwait(false);
 89
 90        /// <summary>
 91        /// Initalizes this LazyLoadingBlobStream.
 92        /// </summary>
 93        private async Task Initalize(bool async, CancellationToken cancellationToken)
 94        {
 26895            await DownloadBlock(async, cancellationToken).ConfigureAwait(false);
 26896            _initalized = true;
 26897        }
 98
 99        /// <summary>
 100        /// Downloads the next block.
 101        /// </summary>
 102        private async Task DownloadBlock(bool async, CancellationToken cancellationToken)
 103        {
 104            Response<BlobDownloadInfo> response;
 292105            HttpRange range = new HttpRange(_offset, _blockSize);
 106
 292107            response = async
 292108                ? await _blobClient.DownloadAsync(range, cancellationToken: cancellationToken).ConfigureAwait(false)
 292109                : _blobClient.Download(range, cancellationToken: cancellationToken);
 292110            _stream = response.Value.Content;
 292111            _offset += response.Value.ContentLength;
 292112            _lastDownloadBytes = response.Value.ContentLength;
 292113            _blobLength = GetBlobLength(response);
 292114        }
 115
 116        /// <summary>
 117        /// Shared sync and async Read implementation.
 118        /// </summary>
 119        private async Task<int> ReadInternal(
 120            bool async,
 121            byte[] buffer,
 122            int offset,
 123            int count,
 124            CancellationToken cancellationToken = default)
 125        {
 1549784126            ValidateReadParameters(buffer, offset, count);
 127
 1549764128            if (!_initalized)
 129            {
 268130                await Initalize(async, cancellationToken: cancellationToken).ConfigureAwait(false);
 268131                if (_lastDownloadBytes == 0)
 132                {
 0133                    return 0;
 134                }
 135            }
 136
 1549764137            int totalCopiedBytes = 0;
 138            do
 139            {
 1549808140                int copiedBytes = async
 1549808141                    ? await _stream.ReadAsync(buffer, offset, count).ConfigureAwait(false)
 1549808142                    : _stream.Read(buffer, offset, count);
 1549808143                offset += copiedBytes;
 1549808144                count -= copiedBytes;
 1549808145                totalCopiedBytes += copiedBytes;
 146
 147                // We've run out of bytes in the current block.
 1549808148                if (copiedBytes == 0)
 149                {
 150                    // We hit the end of the blob with the last download call.
 196151                    if (_offset == _blobLength)
 152                    {
 172153                        return totalCopiedBytes;
 154                    }
 155
 156                    // Download the next block
 157                    else
 158                    {
 24159                        await DownloadBlock(async, cancellationToken).ConfigureAwait(false);
 160                    }
 161                }
 162            }
 1549636163            while (count > 0);
 1549592164            return totalCopiedBytes;
 1549764165        }
 166
 167        private static void ValidateReadParameters(byte[] buffer, int offset, int count)
 168        {
 1549784169            if (buffer == null)
 170            {
 4171                throw new ArgumentNullException($"{nameof(buffer)}", $"{nameof(buffer)} cannot be null.");
 172            }
 173
 1549780174            if (offset < 0)
 175            {
 4176                throw new ArgumentOutOfRangeException($"{nameof(offset)} cannot be less than 0.");
 177            }
 178
 1549776179            if (offset > buffer.Length)
 180            {
 4181                throw new ArgumentOutOfRangeException($"{nameof(offset)} cannot exceed {nameof(buffer)} length.");
 182            }
 183
 1549772184            if (count < 0)
 185            {
 4186                throw new ArgumentOutOfRangeException($"{nameof(count)} cannot be less than 0.");
 187            }
 188
 1549768189            if (offset + count > buffer.Length)
 190            {
 4191                throw new ArgumentOutOfRangeException($"{nameof(offset)} + {nameof(count)} cannot exceed {nameof(buffer)
 192            }
 1549764193        }
 194
 195        private static long GetBlobLength(Response<BlobDownloadInfo> response)
 196        {
 292197            string lengthString = response.Value.Details.ContentRange;
 292198            string[] split = lengthString.Split('/');
 292199            return long.Parse(split[1], CultureInfo.InvariantCulture);
 200        }
 201
 202        /// <inheritdoc/>
 0203        public override bool CanRead => true;
 204
 205        /// <inheritdoc/>
 264206        public override bool CanSeek => false;
 207
 208        /// <inheritdoc/>
 0209        public override bool CanWrite => throw new NotSupportedException();
 210
 0211        public override long Length => throw new NotSupportedException();
 212
 213        /// <inheritdoc/>
 214        public override long Position {
 0215            get => _stream.Position;
 0216            set => throw new NotSupportedException();
 217        }
 218
 219        /// <inheritdoc/>
 220        public override void Flush()
 221        {
 0222        }
 223
 224        /// <inheritdoc/>
 225        public override Task FlushAsync(CancellationToken cancellationToken)
 226        {
 0227            return Task.CompletedTask;
 228        }
 229
 230        /// <inheritdoc/>
 231        public override long Seek(long offset, SeekOrigin origin)
 232        {
 0233            throw new NotSupportedException();
 234        }
 235
 236        /// <inheritdoc/>
 237        public override void SetLength(long value)
 238        {
 0239            throw new NotSupportedException();
 240        }
 241
 242        /// <inheritdoc/>
 243        public override void Write(byte[] buffer, int offset, int count)
 244        {
 0245            throw new NotSupportedException();
 246        }
 247
 0248        protected override void Dispose(bool disposing) => _stream.Dispose();
 249    }
 250}