< Summary

Class:Azure.Core.Http.Multipart.BufferedReadStream
Assembly:Azure.Storage.Blobs.Batch
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs.Batch\src\Shared\BufferedReadStream.cs
Covered lines:100
Uncovered lines:41
Coverable lines:141
Total lines:440
Line coverage:70.9% (100 of 141)
Covered branches:46
Total branches:68
Branch coverage:67.6% (46 of 68)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-100%100%
.ctor(...)-85.71%50%
get_BufferedData()-100%100%
get_CanRead()-100%50%
get_CanSeek()-100%100%
get_CanTimeout()-0%100%
get_CanWrite()-0%100%
get_Length()-100%100%
get_Position()-100%100%
set_Position(...)-0%0%
Seek(...)-0%0%
SetLength(...)-0%100%
Dispose(...)-100%100%
Flush()-0%100%
FlushAsync(...)-0%100%
Write(...)-0%100%
WriteAsync(...)-0%100%
Read(...)-100%100%
ReadAsync()-100%100%
EnsureBuffered()-100%100%
EnsureBufferedAsync()-100%100%
EnsureBuffered(...)-83.33%80%
EnsureBufferedAsync()-84.62%80%
ReadLine(...)-88.89%83.33%
ReadLineAsync()-88.89%83.33%
ProcessLineChar(...)-100%100%
DecodeLine(...)-100%100%
CheckDisposed()-66.67%50%
ValidateBuffer(...)-75%50%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs.Batch\src\Shared\BufferedReadStream.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4// Copied from https://github.com/aspnet/AspNetCore/tree/master/src/Http/WebUtilities/src
 5
 6using System;
 7using System.Buffers;
 8using System.IO;
 9using System.Text;
 10using System.Threading;
 11using System.Threading.Tasks;
 12
 13#pragma warning disable CA1305  // ToString Locale
 14#pragma warning disable CA1822  // Make member static
 15#pragma warning disable IDE0016 // Simplify null check
 16#pragma warning disable IDE0036 // Modifiers not ordered
 17#pragma warning disable IDE0054 // Use compound assignment
 18#pragma warning disable IDE0059 // Unnecessary assignment
 19
 20namespace Azure.Core.Http.Multipart
 21{
 22    /// <summary>
 23    /// A Stream that wraps another stream and allows reading lines.
 24    /// The data is buffered in memory.
 25    /// </summary>
 26    internal class BufferedReadStream : Stream
 27    {
 28        private const byte CR = (byte)'\r';
 29        private const byte LF = (byte)'\n';
 30
 31        private readonly Stream _inner;
 32        private readonly byte[] _buffer;
 33        private readonly ArrayPool<byte> _bytePool;
 34        private int _bufferOffset = 0;
 35        private int _bufferCount = 0;
 36        private bool _disposed;
 37
 38        /// <summary>
 39        /// Creates a new stream.
 40        /// </summary>
 41        /// <param name="inner">The stream to wrap.</param>
 42        /// <param name="bufferSize">Size of buffer in bytes.</param>
 43        public BufferedReadStream(Stream inner, int bufferSize)
 136044            : this(inner, bufferSize, ArrayPool<byte>.Shared)
 45        {
 136046        }
 47
 48        /// <summary>
 49        /// Creates a new stream.
 50        /// </summary>
 51        /// <param name="inner">The stream to wrap.</param>
 52        /// <param name="bufferSize">Size of buffer in bytes.</param>
 53        /// <param name="bytePool">ArrayPool for the buffer.</param>
 136054        public BufferedReadStream(Stream inner, int bufferSize, ArrayPool<byte> bytePool)
 55        {
 136056            if (inner == null)
 57            {
 058                throw new ArgumentNullException(nameof(inner));
 59            }
 60
 136061            _inner = inner;
 136062            _bytePool = bytePool;
 136063            _buffer = bytePool.Rent(bufferSize);
 136064        }
 65
 66        /// <summary>
 67        /// The currently buffered data.
 68        /// </summary>
 69        public ArraySegment<byte> BufferedData
 70        {
 266071            get { return new ArraySegment<byte>(_buffer, _bufferOffset, _bufferCount); }
 72        }
 73
 74        /// <inheritdoc/>
 75        public override bool CanRead
 76        {
 127277            get { return _inner.CanRead || _bufferCount > 0; }
 78        }
 79
 80        /// <inheritdoc/>
 81        public override bool CanSeek
 82        {
 783683            get { return _inner.CanSeek; }
 84        }
 85
 86        /// <inheritdoc/>
 87        public override bool CanTimeout
 88        {
 089            get { return _inner.CanTimeout; }
 90        }
 91
 92        /// <inheritdoc/>
 93        public override bool CanWrite
 94        {
 095            get { return _inner.CanWrite; }
 96        }
 97
 98        /// <inheritdoc/>
 99        public override long Length
 100        {
 1272101            get { return _inner.Length; }
 102        }
 103
 104        /// <inheritdoc/>
 105        public override long Position
 106        {
 6564107            get { return _inner.Position - _bufferCount; }
 108            set
 109            {
 0110                if (value < 0)
 111                {
 0112                    throw new ArgumentOutOfRangeException(nameof(value), value, "Position must be positive.");
 113                }
 0114                if (value == Position)
 115                {
 0116                    return;
 117                }
 118
 119                // Backwards?
 0120                if (value <= _inner.Position)
 121                {
 122                    // Forward within the buffer?
 0123                    var innerOffset = (int)(_inner.Position - value);
 0124                    if (innerOffset <= _bufferCount)
 125                    {
 126                        // Yes, just skip some of the buffered data
 0127                        _bufferOffset += innerOffset;
 0128                        _bufferCount -= innerOffset;
 129                    }
 130                    else
 131                    {
 132                        // No, reset the buffer
 0133                        _bufferOffset = 0;
 0134                        _bufferCount = 0;
 0135                        _inner.Position = value;
 136                    }
 137                }
 138                else
 139                {
 140                    // Forward, reset the buffer
 0141                    _bufferOffset = 0;
 0142                    _bufferCount = 0;
 0143                    _inner.Position = value;
 144                }
 0145            }
 146        }
 147
 148        /// <inheritdoc/>
 149        public override long Seek(long offset, SeekOrigin origin)
 150        {
 0151            if (origin == SeekOrigin.Begin)
 152            {
 0153                Position = offset;
 154            }
 0155            else if (origin == SeekOrigin.Current)
 156            {
 0157                Position = Position + offset;
 158            }
 159            else // if (origin == SeekOrigin.End)
 160            {
 0161                Position = Length + offset;
 162            }
 0163            return Position;
 164        }
 165
 166        /// <inheritdoc/>
 167        public override void SetLength(long value)
 168        {
 0169            _inner.SetLength(value);
 0170        }
 171
 172        /// <inheritdoc/>
 173        protected override void Dispose(bool disposing)
 174        {
 1272175            if (!_disposed)
 176            {
 1272177                _disposed = true;
 1272178                _bytePool.Return(_buffer);
 179
 1272180                if (disposing)
 181                {
 1272182                    _inner.Dispose();
 183                }
 184            }
 1272185        }
 186
 187        /// <inheritdoc/>
 188        public override void Flush()
 189        {
 0190            _inner.Flush();
 0191        }
 192
 193        /// <inheritdoc/>
 194        public override Task FlushAsync(CancellationToken cancellationToken)
 195        {
 0196            return _inner.FlushAsync(cancellationToken);
 197        }
 198
 199        /// <inheritdoc/>
 200        public override void Write(byte[] buffer, int offset, int count)
 201        {
 0202            _inner.Write(buffer, offset, count);
 0203        }
 204
 205        /// <inheritdoc/>
 206        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
 207        {
 0208            return _inner.WriteAsync(buffer, offset, count, cancellationToken);
 209        }
 210
 211        /// <inheritdoc/>
 212        public override int Read(byte[] buffer, int offset, int count)
 213        {
 3336214            ValidateBuffer(buffer, offset, count);
 215
 216            // Drain buffer
 3336217            if (_bufferCount > 0)
 218            {
 2700219                int toCopy = Math.Min(_bufferCount, count);
 2700220                Buffer.BlockCopy(_buffer, _bufferOffset, buffer, offset, toCopy);
 2700221                _bufferOffset += toCopy;
 2700222                _bufferCount -= toCopy;
 2700223                return toCopy;
 224            }
 225
 636226            return _inner.Read(buffer, offset, count);
 227        }
 228
 229        /// <inheritdoc/>
 230        public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationTo
 231        {
 764232            ValidateBuffer(buffer, offset, count);
 233
 234            // Drain buffer
 764235            if (_bufferCount > 0)
 236            {
 40237                int toCopy = Math.Min(_bufferCount, count);
 40238                Buffer.BlockCopy(_buffer, _bufferOffset, buffer, offset, toCopy);
 40239                _bufferOffset += toCopy;
 40240                _bufferCount -= toCopy;
 40241                return toCopy;
 242            }
 243
 724244            return await _inner.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
 764245        }
 246
 247        /// <summary>
 248        /// Ensures that the buffer is not empty.
 249        /// </summary>
 250        /// <returns>Returns <c>true</c> if the buffer is not empty; <c>false</c> otherwise.</returns>
 251        public bool EnsureBuffered()
 252        {
 112510253            if (_bufferCount > 0)
 254            {
 111220255                return true;
 256            }
 257            // Downshift to make room
 1290258            _bufferOffset = 0;
 1290259            _bufferCount = _inner.Read(_buffer, 0, _buffer.Length);
 1290260            return _bufferCount > 0;
 261        }
 262
 263        /// <summary>
 264        /// Ensures that the buffer is not empty.
 265        /// </summary>
 266        /// <param name="cancellationToken">Cancellation token.</param>
 267        /// <returns>Returns <c>true</c> if the buffer is not empty; <c>false</c> otherwise.</returns>
 268        public async Task<bool> EnsureBufferedAsync(CancellationToken cancellationToken)
 269        {
 176562270            if (_bufferCount > 0)
 271            {
 175268272                return true;
 273            }
 274            // Downshift to make room
 1294275            _bufferOffset = 0;
 1294276            _bufferCount = await _inner.ReadAsync(_buffer, 0, _buffer.Length, cancellationToken).ConfigureAwait(false);
 1294277            return _bufferCount > 0;
 176562278        }
 279
 280        /// <summary>
 281        /// Ensures that a minimum amount of buffered data is available.
 282        /// </summary>
 283        /// <param name="minCount">Minimum amount of buffered data.</param>
 284        /// <returns>Returns <c>true</c> if the minimum amount of buffered data is available; <c>false</c> otherwise.</r
 285        public bool EnsureBuffered(int minCount)
 286        {
 1286287            if (minCount > _buffer.Length)
 288            {
 0289                throw new ArgumentOutOfRangeException(nameof(minCount), minCount, "The value must be smaller than the bu
 290            }
 1318291            while (_bufferCount < minCount)
 292            {
 293                // Downshift to make room
 32294                if (_bufferOffset > 0)
 295                {
 32296                    if (_bufferCount > 0)
 297                    {
 26298                        Buffer.BlockCopy(_buffer, _bufferOffset, _buffer, 0, _bufferCount);
 299                    }
 32300                    _bufferOffset = 0;
 301                }
 32302                int read = _inner.Read(_buffer, _bufferOffset + _bufferCount, _buffer.Length - _bufferCount - _bufferOff
 32303                _bufferCount += read;
 32304                if (read == 0)
 305                {
 0306                    return false;
 307                }
 308            }
 1286309            return true;
 310        }
 311
 312        /// <summary>
 313        /// Ensures that a minimum amount of buffered data is available.
 314        /// </summary>
 315        /// <param name="minCount">Minimum amount of buffered data.</param>
 316        /// <param name="cancellationToken">Cancellation token.</param>
 317        /// <returns>Returns <c>true</c> if the minimum amount of buffered data is available; <c>false</c> otherwise.</r
 318        public async Task<bool> EnsureBufferedAsync(int minCount, CancellationToken cancellationToken)
 319        {
 1374320            if (minCount > _buffer.Length)
 321            {
 0322                throw new ArgumentOutOfRangeException(nameof(minCount), minCount, "The value must be smaller than the bu
 323            }
 1494324            while (_bufferCount < minCount)
 325            {
 326                // Downshift to make room
 120327                if (_bufferOffset > 0)
 328                {
 32329                    if (_bufferCount > 0)
 330                    {
 26331                        Buffer.BlockCopy(_buffer, _bufferOffset, _buffer, 0, _bufferCount);
 332                    }
 32333                    _bufferOffset = 0;
 334                }
 120335                int read = await _inner.ReadAsync(_buffer, _bufferOffset + _bufferCount, _buffer.Length - _bufferCount -
 120336                _bufferCount += read;
 120337                if (read == 0)
 338                {
 0339                    return false;
 340                }
 341            }
 1374342            return true;
 1374343        }
 344
 345        /// <summary>
 346        /// Reads a line. A line is defined as a sequence of characters followed by
 347        /// a carriage return immediately followed by a line feed. The resulting string does not
 348        /// contain the terminating carriage return and line feed.
 349        /// </summary>
 350        /// <param name="lengthLimit">Maximum allowed line length.</param>
 351        /// <returns>A line.</returns>
 352        public string ReadLine(int lengthLimit)
 353        {
 4504354            CheckDisposed();
 4504355            using (var builder = new MemoryStream(200))
 356            {
 9008357                bool foundCR = false, foundCRLF = false;
 358
 116374359                while (!foundCRLF && EnsureBuffered())
 360                {
 111870361                    if (builder.Length > lengthLimit)
 362                    {
 0363                        throw new InvalidDataException($"Line length limit {lengthLimit} exceeded.");
 364                    }
 111870365                    ProcessLineChar(builder, ref foundCR, ref foundCRLF);
 366                }
 367
 4504368                return DecodeLine(builder, foundCRLF);
 369            }
 4504370        }
 371
 372        /// <summary>
 373        /// Reads a line. A line is defined as a sequence of characters followed by
 374        /// a carriage return immediately followed by a line feed. The resulting string does not
 375        /// contain the terminating carriage return and line feed.
 376        /// </summary>
 377        /// <param name="lengthLimit">Maximum allowed line length.</param>
 378        /// <param name="cancellationToken">Cancellation token.</param>
 379        /// <returns>A line.</returns>
 380        public async Task<string> ReadLineAsync(int lengthLimit, CancellationToken cancellationToken)
 381        {
 8404382            CheckDisposed();
 8404383            using (var builder = new MemoryStream(200))
 384            {
 16808385                bool foundCR = false, foundCRLF = false;
 386
 184326387                while (!foundCRLF && await EnsureBufferedAsync(cancellationToken).ConfigureAwait(false))
 388                {
 175922389                    if (builder.Length > lengthLimit)
 390                    {
 0391                        throw new InvalidDataException($"Line length limit {lengthLimit} exceeded.");
 392                    }
 393
 175922394                    ProcessLineChar(builder, ref foundCR, ref foundCRLF);
 395                }
 396
 8404397                return DecodeLine(builder, foundCRLF);
 398            }
 8404399        }
 400
 401        private void ProcessLineChar(MemoryStream builder, ref bool foundCR, ref bool foundCRLF)
 402        {
 287792403            var b = _buffer[_bufferOffset];
 287792404            builder.WriteByte(b);
 287792405            _bufferOffset++;
 287792406            _bufferCount--;
 287792407            if (b == LF && foundCR)
 408            {
 11628409                foundCRLF = true;
 11628410                return;
 411            }
 276164412            foundCR = b == CR;
 276164413        }
 414
 415        private string DecodeLine(MemoryStream builder, bool foundCRLF)
 416        {
 417            // Drop the final CRLF, if any
 12908418            var length = foundCRLF ? builder.Length - 2 : builder.Length;
 12908419            return Encoding.UTF8.GetString(builder.ToArray(), 0, (int)length);
 420        }
 421
 422        private void CheckDisposed()
 423        {
 12908424            if (_disposed)
 425            {
 0426                throw new ObjectDisposedException(nameof(BufferedReadStream));
 427            }
 12908428        }
 429
 430        private void ValidateBuffer(byte[] buffer, int offset, int count)
 431        {
 432            // Delegate most of our validation.
 4100433            var ignored = new ArraySegment<byte>(buffer, offset, count);
 4100434            if (count == 0)
 435            {
 0436                throw new ArgumentOutOfRangeException(nameof(count), "The value must be greater than zero.");
 437            }
 4100438        }
 439    }
 440}