< Summary

Class:Azure.Core.Pipeline.RetriableStream
Assembly:Azure.Storage.Blobs
File(s):C:\Git\azure-sdk-for-net\sdk\core\Azure.Core\src\Shared\RetriableStream.cs
Covered lines:25
Uncovered lines:34
Coverable lines:59
Total lines:205
Line coverage:42.3% (25 of 59)
Covered branches:3
Total branches:20
Branch coverage:15% (3 of 20)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
Create(...)-0%100%
CreateAsync()-0%100%
Create(...)-100%100%
.ctor(...)-75%100%
Seek(...)-0%100%
ReadAsync()-66.67%50%
RetryAsync()-0%0%
Read(...)-57.14%100%
get_CanRead()-100%100%
get_CanSeek()-100%100%
get_Length()-0%0%
get_Position()-0%100%
set_Position(...)-0%100%
EnsureStream(...)-66.67%50%
get_CanWrite()-0%100%
Write(...)-0%100%
SetLength(...)-0%100%
Flush()-0%100%
Dispose(...)-100%50%

File(s)

C:\Git\azure-sdk-for-net\sdk\core\Azure.Core\src\Shared\RetriableStream.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4#nullable disable
 5
 6using System;
 7using System.Collections.Generic;
 8using System.Diagnostics;
 9using System.IO;
 10using System.Runtime.ExceptionServices;
 11using System.Threading;
 12using System.Threading.Tasks;
 13
 14namespace Azure.Core.Pipeline
 15{
 16    internal static class RetriableStream
 17    {
 18        public static Stream Create(
 19            Func<long, Stream> responseFactory,
 20            Func<long, ValueTask<Stream>> asyncResponseFactory,
 21            ResponseClassifier responseClassifier,
 22            int maxRetries)
 23        {
 024            return Create(responseFactory(0), responseFactory, asyncResponseFactory, responseClassifier, maxRetries);
 25        }
 26
 27        public static async Task<Stream> CreateAsync(
 28            Func<long, Stream> responseFactory,
 29            Func<long, ValueTask<Stream>> asyncResponseFactory,
 30            ResponseClassifier responseClassifier,
 31            int maxRetries)
 32        {
 033            return Create(await asyncResponseFactory(0).ConfigureAwait(false), responseFactory, asyncResponseFactory, re
 034        }
 35
 36        public static Stream Create(
 37            Stream initialResponse,
 38            Func<long, Stream> streamFactory,
 39            Func<long, ValueTask<Stream>> asyncResponseFactory,
 40            ResponseClassifier responseClassifier,
 41            int maxRetries)
 42        {
 143243            return new RetriableStreamImpl(initialResponse, streamFactory, asyncResponseFactory, responseClassifier, max
 44        }
 45
 46        private class RetriableStreamImpl : Stream
 47        {
 48            private readonly ResponseClassifier _responseClassifier;
 49
 50            private readonly Func<long, Stream> _streamFactory;
 51
 52            private readonly Func<long, ValueTask<Stream>> _asyncStreamFactory;
 53
 54            private readonly int _maxRetries;
 55
 56            private readonly long _length;
 57            private readonly ExceptionDispatchInfo _lengthException;
 58
 59            private Stream _currentStream;
 60
 61            private long _position;
 62
 63            private int _retryCount;
 64
 65            private List<Exception> _exceptions;
 66
 143267            public RetriableStreamImpl(Stream initialStream, Func<long, Stream> streamFactory, Func<long, ValueTask<Stre
 68            {
 69                try
 70                {
 143271                    _length = EnsureStream(initialStream).Length;
 143272                }
 073                catch (Exception ex)
 74                {
 075                    _lengthException = ExceptionDispatchInfo.Capture(ex);
 076                }
 77
 143278                _currentStream = EnsureStream(initialStream);
 143279                _streamFactory = streamFactory;
 143280                _responseClassifier = responseClassifier;
 143281                _asyncStreamFactory = asyncStreamFactory;
 143282                _maxRetries = maxRetries;
 143283            }
 84
 85            public override long Seek(long offset, SeekOrigin origin)
 86            {
 087                throw new NotSupportedException();
 88            }
 89
 90            public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellati
 91            {
 92                while (true)
 93                {
 94                    try
 95                    {
 155180696                        var result = await _currentStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureA
 155180697                        _position += result;
 155180698                        return result;
 99                    }
 100                    catch (Exception e)
 101                    {
 0102                        await RetryAsync(e, true, cancellationToken).ConfigureAwait(false);
 0103                    }
 104                }
 1551806105            }
 106
 107            private async Task RetryAsync(Exception exception, bool async, CancellationToken cancellationToken)
 108            {
 0109                bool isNonCustomerCancelledException = exception is OperationCanceledException &&
 0110                                                    !cancellationToken.IsCancellationRequested;
 111
 0112                if (!_responseClassifier.IsRetriableException(exception) && !isNonCustomerCancelledException)
 113                {
 0114                    ExceptionDispatchInfo.Capture(exception).Throw();
 115                }
 116
 0117                if (_exceptions == null)
 118                {
 0119                    _exceptions = new List<Exception>();
 120                }
 121
 0122                _exceptions.Add(exception);
 123
 0124                _retryCount++;
 125
 0126                if (_retryCount > _maxRetries)
 127                {
 0128                    throw new AggregateException($"Retry failed after {_retryCount} tries", _exceptions);
 129                }
 130
 0131                _currentStream.Dispose();
 132
 0133                _currentStream = EnsureStream(async ? (await _asyncStreamFactory(_position).ConfigureAwait(false)) : _st
 0134            }
 135
 136            public override int Read(byte[] buffer, int offset, int count)
 137            {
 138                while (true)
 139                {
 140                    try
 141                    {
 42142                        var result = _currentStream.Read(buffer, offset, count);
 42143                        _position += result;
 42144                        return result;
 145
 146                    }
 0147                    catch (Exception e)
 148                    {
 0149                        RetryAsync(e, false, default).EnsureCompleted();
 0150                    }
 151                }
 42152            }
 153
 356154            public override bool CanRead => _currentStream.CanRead;
 432155            public override bool CanSeek { get; } = false;
 156            public override long Length
 157            {
 158                get
 159                {
 0160                    _lengthException?.Throw();
 0161                    return _length;
 162                }
 163            }
 164
 165            public override long Position
 166            {
 0167                get => _position;
 0168                set => throw new NotSupportedException();
 169            }
 170
 171            private static Stream EnsureStream(Stream stream)
 172            {
 2864173                if (stream == null)
 174                {
 0175                    throw new InvalidOperationException("The response didn't have content");
 176                }
 177
 2864178                return stream;
 179            }
 180
 0181            public override bool CanWrite => false;
 182
 183            public override void Write(byte[] buffer, int offset, int count)
 184            {
 0185                throw new NotSupportedException();
 186            }
 187
 188            public override void SetLength(long value)
 189            {
 0190                throw new NotSupportedException();
 191            }
 192
 193            public override void Flush()
 194            {
 195                // Flush is allowed on read-only stream
 0196            }
 197
 198            protected override void Dispose(bool disposing)
 199            {
 648200                base.Dispose(disposing);
 648201                _currentStream?.Dispose();
 648202            }
 203        }
 204    }
 205}