< Summary

Class:Azure.Storage.Shared.PooledMemoryStream
Assembly:Azure.Storage.Files.DataLake
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Common\src\Shared\PooledMemoryStream.cs
Covered lines:97
Uncovered lines:15
Coverable lines:112
Total lines:358
Line coverage:86.6% (97 of 112)
Covered branches:39
Total branches:50
Branch coverage:78% (39 of 50)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Buffer()-100%100%
get_DataLength()-100%100%
get_MaxArraySize()-100%100%
get_ArrayPool()-100%100%
get_AbsolutePosition()-100%100%
get_BufferSet()-100%100%
.ctor(...)-100%100%
BufferStreamPartitionInternal()-100%93.75%
ReadLoopInternal()-84.62%80%
get_CanRead()-100%100%
get_CanSeek()-100%100%
get_CanWrite()-0%100%
get_Length()-100%100%
get_Position()-100%100%
Flush()-0%100%
Read(...)-100%100%
GetBufferFromPosition()-75%50%
Seek(...)-0%0%
SetLength(...)-0%100%
Write(...)-0%100%
Dispose(...)-100%100%
AssertPositionInBounds()-66.67%50%
GetLatestBufferWithAvailableSpaceOrDefault()-100%100%
Min(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Common\src\Shared\PooledMemoryStream.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Buffers;
 6using System.Collections.Generic;
 7using System.IO;
 8using System.Linq;
 9using System.Threading;
 10using System.Threading.Tasks;
 11
 12namespace Azure.Storage.Shared
 13{
 14    /// <summary>
 15    /// Functions like a readable <see cref="MemoryStream"/> but uses an ArrayPool to supply the backing memory.
 16    /// This stream support buffering long sizes.
 17    /// </summary>
 18    internal class PooledMemoryStream : SlicedStream
 19    {
 20        private const int DefaultMaxArrayPoolRentalSize = 128 * Constants.MB;
 21
 22        private class BufferPartition
 23        {
 24            /// <summary>
 25            /// The buffer for this partition.
 26            /// </summary>
 58027            public byte[] Buffer { get; set; }
 28
 29            /// <summary>
 30            /// Offset at which known data stops and undefined state begins.
 31            /// </summary>
 276032            public int DataLength { get; set; }
 33        }
 34
 35        /// <summary>
 36        /// Boundary at which point we start requesting multiple arrays for our buffer.
 37        /// </summary>
 21038        public int MaxArraySize { get; }
 39
 40        /// <summary>
 41        /// The backing array pool.
 42        /// </summary>
 18843        public ArrayPool<byte> ArrayPool { get; }
 44
 45        /// <summary>
 46        /// Absolute position of this stream in the larger stream it was chunked from.
 47        /// </summary>
 37648        public override long AbsolutePosition { get; }
 49
 50        /// <summary>
 51        /// List of arrays making up the overall buffer. Since ArrayPool may give us a larger array than needed,
 52        /// each array is paired with a count of the space actually used in the array. This <b>should</b> only
 53        /// be important for the final buffer.
 54        /// </summary>
 348055        private List<BufferPartition> BufferSet { get; } = new List<BufferPartition>();
 56
 21057        private PooledMemoryStream(ArrayPool<byte> arrayPool, long absolutePosition, int maxArraySize)
 58        {
 21059            AbsolutePosition = absolutePosition;
 21060            ArrayPool = arrayPool;
 21061            MaxArraySize = maxArraySize;
 21062        }
 63
 64        /// <summary>
 65        /// Buffers a portion of the given stream, returning the buffered stream partition.
 66        /// </summary>
 67        /// <param name="stream">
 68        /// Stream to buffer from.
 69        /// </param>
 70        /// <param name="minCount">
 71        /// Minimum number of bytes to buffer. This method will not return until at least this many bytes have been read
 72        /// </param>
 73        /// <param name="maxCount">
 74        /// Maximum number of bytes to buffer.
 75        /// </param>
 76        /// <param name="absolutePosition">
 77        /// Current position of the stream, since <see cref="Stream.Position"/> throws if not seekable.
 78        /// </param>
 79        /// <param name="arrayPool">
 80        /// Pool to rent buffer space from.
 81        /// </param>
 82        /// <param name="maxArrayPoolRentalSize">
 83        /// Max size we can request from the array pool.
 84        /// </param>
 85        /// <param name="async">
 86        /// Whether to perform this operation asynchronously.
 87        /// </param>
 88        /// <param name="cancellationToken">
 89        /// Cancellation token.
 90        /// </param>
 91        /// <returns>
 92        /// The buffered stream partition with memory backed by an array pool.
 93        /// </returns>
 94        internal static async Task<PooledMemoryStream> BufferStreamPartitionInternal(
 95            Stream stream,
 96            long minCount,
 97            long maxCount,
 98            long absolutePosition,
 99            ArrayPool<byte> arrayPool,
 100            int? maxArrayPoolRentalSize,
 101            bool async,
 102            CancellationToken cancellationToken)
 103        {
 210104            long totalRead = 0;
 210105            var streamPartition = new PooledMemoryStream(arrayPool, absolutePosition, maxArrayPoolRentalSize ?? DefaultM
 106
 107            // max count to write into a single array
 108            int maxCountIndividualBuffer;
 109            // min count to write into a single array
 110            int minCountIndividualBuffer;
 111            // the amount that was written into the current array
 112            int readIndividualBuffer;
 113            do
 114            {
 115                // buffer to write to
 116                byte[] buffer;
 117                // offset to start writing at
 118                int offset;
 218119                BufferPartition latestBuffer = streamPartition.GetLatestBufferWithAvailableSpaceOrDefault();
 120                // whether we got a brand new buffer to write into
 121                bool newbuffer;
 218122                if (latestBuffer != default)
 123                {
 8124                    buffer = latestBuffer.Buffer;
 8125                    offset = latestBuffer.DataLength;
 8126                    newbuffer = false;
 127                }
 128                else
 129                {
 210130                    buffer = arrayPool.Rent((int)Math.Min(maxCount - totalRead, streamPartition.MaxArraySize));
 210131                    offset = 0;
 210132                    newbuffer = true;
 133                }
 134
 135                // limit max and min count for this buffer by buffer length
 218136                maxCountIndividualBuffer = (int)Math.Min(maxCount - totalRead, buffer.Length - offset);
 137                // definitionally limited by max; we won't ever have a swapped min/max range
 218138                minCountIndividualBuffer = (int)Math.Min(minCount - totalRead, maxCountIndividualBuffer);
 139
 218140                readIndividualBuffer = await ReadLoopInternal(
 218141                    stream,
 218142                    buffer,
 218143                    offset: offset,
 218144                    minCountIndividualBuffer,
 218145                    maxCountIndividualBuffer,
 218146                    async,
 218147                    cancellationToken).ConfigureAwait(false);
 148                // if nothing was placed in a brand new array
 218149                if (readIndividualBuffer == 0 && newbuffer)
 150                {
 22151                    arrayPool.Return(buffer);
 152                }
 153                // if brand new array and we did place data in it
 196154                else if (newbuffer)
 155                {
 188156                    streamPartition.BufferSet.Add(new BufferPartition
 188157                    {
 188158                        Buffer = buffer,
 188159                        DataLength = readIndividualBuffer
 188160                    });
 161                }
 162                // added to an existing array that was not entirely filled
 163                else
 164                {
 8165                    latestBuffer.DataLength += readIndividualBuffer;
 166                }
 167
 218168                totalRead += readIndividualBuffer;
 169
 170            /* If we filled the buffer this loop, then quitting on min count is pointless. The point of quitting
 171             * on min count is when the source stream doesn't have available bytes and we've reached an amount worth
 172             * sending instead of blocking on. If we filled the available array, we don't actually know whether more
 173             * data is available yet, as we limited our read for reasons outside the stream state. We should therefore
 174             * try another read regardless of whether we hit min count.
 175             */
 436176            } while (
 218177                // stream is done if this value is zero; no other check matters
 218178                readIndividualBuffer != 0 &&
 218179                // stop filling the partition if we've hit the max size of the partition
 218180                totalRead < maxCount &&
 218181                // stop filling the partition if we've reached min count and we know we've hit at least a pause in the s
 218182                (totalRead < minCount || readIndividualBuffer == maxCountIndividualBuffer));
 183
 210184            return streamPartition;
 210185        }
 186
 187        /// <summary>
 188        /// Loops Read() calls into buffer until minCount is reached or stream returns 0.
 189        /// </summary>
 190        /// <returns>Bytes read.</returns>
 191        /// <remarks>
 192        /// This method may have read bytes even if it has reached the confirmed end of stream. You will have to call
 193        /// this method again and read zero bytes to get that confirmation.
 194        /// </remarks>
 195        private static async Task<int> ReadLoopInternal(Stream stream, byte[] buffer, int offset, int minCount, int maxC
 196        {
 218197            if (minCount > maxCount)
 198            {
 0199                throw new ArgumentException($"{nameof(minCount)} cannot be greater than {nameof(maxCount)}.");
 200            }
 218201            if (maxCount <= 0)
 202            {
 0203                throw new ArgumentException("Cannot read a non-positive number of bytes.");
 204            }
 205
 218206            int totalRead = 0;
 207            do
 208            {
 5066209                int read = async
 5066210                    ? await stream.ReadAsync(buffer, offset + totalRead, maxCount - totalRead, cancellationToken).Config
 5066211                    : stream.Read(buffer, offset + totalRead, maxCount - totalRead);
 212                // either we have read maxCount in total or the stream has ended
 5066213                if (read == 0)
 214                {
 215                    break;
 216                }
 5028217                totalRead += read;
 218            // we always request the number that will bring our total read to maxCount
 219            // if the stream can only give us so much at the moment and we've at least hit minCount, we can exit
 5028220            } while (totalRead < minCount);
 218221            return totalRead;
 218222        }
 223
 188224        public override bool CanRead => true;
 225
 188226        public override bool CanSeek => true;
 227
 0228        public override bool CanWrite => false;
 229
 4464230        public override long Length => BufferSet.Sum(tuple => (long)tuple.DataLength);
 231
 2444232        public override long Position { get; set; }
 233
 234        public override void Flush()
 235        {
 236            // no-op, just like MemoryStream
 0237        }
 238
 239        public override int Read(byte[] buffer, int offset, int count)
 240        {
 376241            if (Position >= Length)
 242            {
 188243                return 0;
 244            }
 245
 188246            int read = 0;
 376247            while (read < count && Position < Length)
 248            {
 188249                (byte[] currentBuffer, int bufferCount, long offsetOfBuffer) = GetBufferFromPosition();
 250
 188251                int toCopy = (int)Min(
 188252                    Length - Position,
 188253                    bufferCount - (Position - offsetOfBuffer),
 188254                    count - read);
 188255                Array.Copy(currentBuffer, Position - offsetOfBuffer, buffer, read, toCopy);
 188256                read += toCopy;
 188257                Position += toCopy;
 258            }
 259
 188260            return read;
 261        }
 262
 263        /// <summary>
 264        /// According the the current <see cref="Position"/> of the stream, gets the correct buffer containing the byte
 265        /// at that position, as well as the stream position represented by the start of the array.
 266        /// Position - offsetOfBuffer is the index in the returned array of the current byte.
 267        /// </summary>
 268        /// <returns></returns>
 269        private (byte[] currentBuffer, int bufferCount, long offsetOfBuffer) GetBufferFromPosition()
 270        {
 188271            AssertPositionInBounds();
 272
 188273            long countingPosition = 0;
 564274            foreach (var tuple in BufferSet)
 275            {
 188276                if (countingPosition + tuple.DataLength <= Position)
 277                {
 0278                    countingPosition += tuple.DataLength;
 279                }
 280                else
 281                {
 188282                    return (tuple.Buffer, tuple.DataLength, countingPosition);
 283                }
 284            }
 285
 286            /* this.Length is defined as the sum of all counts.
 287             * We already throw if this.Position >= this.Length.
 288             * We can only get here if this.Position is >= the sum of all counts.
 289             * We will never get here. */
 0290            throw new InvalidOperationException("Incorrect stream partition length.");
 188291        }
 292
 293        public override long Seek(long offset, SeekOrigin origin)
 294        {
 295            switch (origin)
 296            {
 297                case SeekOrigin.Begin:
 0298                    Position = offset;
 0299                    break;
 300                case SeekOrigin.Current:
 0301                    Position += offset;
 0302                    break;
 303                case SeekOrigin.End:
 0304                    Position = Length + offset;
 305                    break;
 306            }
 307
 0308            return Position;
 309        }
 310
 311        public override void SetLength(long value)
 312        {
 0313            throw new NotSupportedException();
 314        }
 315
 316        public override void Write(byte[] buffer, int offset, int count)
 317        {
 0318            throw new NotSupportedException();
 319        }
 320
 321        protected override void Dispose(bool disposing)
 322        {
 752323            foreach (var buffer in BufferSet)
 324            {
 188325                ArrayPool.Return(buffer.Buffer);
 326            }
 188327            BufferSet.Clear();
 188328        }
 329
 330        private void AssertPositionInBounds()
 331        {
 188332            if (Position >= Length || Position < 0)
 333            {
 0334                throw new InvalidOperationException("Cannot read outside the bounds of this stream.");
 335            }
 188336        }
 337
 338        private BufferPartition GetLatestBufferWithAvailableSpaceOrDefault()
 339        {
 218340            var latestBuffer = BufferSet.LastOrDefault();
 341
 218342            if (latestBuffer == default || latestBuffer.DataLength >= latestBuffer.Buffer.Length)
 343            {
 210344                return default;
 345            }
 346
 8347            return latestBuffer;
 348        }
 349
 350        private static long Min(long val1, long val2, long val3)
 351        {
 188352            long result = Math.Min(val1, val2);
 188353            result = Math.Min(result, val3);
 354
 188355            return result;
 356        }
 357    }
 358}