| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Buffers; |
| | 6 | | using System.Collections.Generic; |
| | 7 | | using System.IO; |
| | 8 | | using System.Linq; |
| | 9 | | using System.Threading; |
| | 10 | | using System.Threading.Tasks; |
| | 11 | |
|
| | 12 | | namespace Azure.Storage.Shared |
| | 13 | | { |
| | 14 | | /// <summary> |
| | 15 | | /// Functions like a readable <see cref="MemoryStream"/> but uses an ArrayPool to supply the backing memory. |
| | 16 | | /// This stream support buffering long sizes. |
| | 17 | | /// </summary> |
| | 18 | | internal class PooledMemoryStream : SlicedStream |
| | 19 | | { |
| | 20 | | private const int DefaultMaxArrayPoolRentalSize = 128 * Constants.MB; |
| | 21 | |
|
| | 22 | | private class BufferPartition |
| | 23 | | { |
| | 24 | | /// <summary> |
| | 25 | | /// The buffer for this partition. |
| | 26 | | /// </summary> |
| 580 | 27 | | public byte[] Buffer { get; set; } |
| | 28 | |
|
| | 29 | | /// <summary> |
| | 30 | | /// Offset at which known data stops and undefined state begins. |
| | 31 | | /// </summary> |
| 2760 | 32 | | public int DataLength { get; set; } |
| | 33 | | } |
| | 34 | |
|
| | 35 | | /// <summary> |
| | 36 | | /// Boundary at which point we start requesting multiple arrays for our buffer. |
| | 37 | | /// </summary> |
| 210 | 38 | | public int MaxArraySize { get; } |
| | 39 | |
|
| | 40 | | /// <summary> |
| | 41 | | /// The backing array pool. |
| | 42 | | /// </summary> |
| 188 | 43 | | public ArrayPool<byte> ArrayPool { get; } |
| | 44 | |
|
| | 45 | | /// <summary> |
| | 46 | | /// Absolute position of this stream in the larger stream it was chunked from. |
| | 47 | | /// </summary> |
| 376 | 48 | | public override long AbsolutePosition { get; } |
| | 49 | |
|
| | 50 | | /// <summary> |
| | 51 | | /// List of arrays making up the overall buffer. Since ArrayPool may give us a larger array than needed, |
| | 52 | | /// each array is paired with a count of the space actually used in the array. This <b>should</b> only |
| | 53 | | /// be important for the final buffer. |
| | 54 | | /// </summary> |
| 3480 | 55 | | private List<BufferPartition> BufferSet { get; } = new List<BufferPartition>(); |
| | 56 | |
|
| 210 | 57 | | private PooledMemoryStream(ArrayPool<byte> arrayPool, long absolutePosition, int maxArraySize) |
| | 58 | | { |
| 210 | 59 | | AbsolutePosition = absolutePosition; |
| 210 | 60 | | ArrayPool = arrayPool; |
| 210 | 61 | | MaxArraySize = maxArraySize; |
| 210 | 62 | | } |
| | 63 | |
|
| | 64 | | /// <summary> |
| | 65 | | /// Buffers a portion of the given stream, returning the buffered stream partition. |
| | 66 | | /// </summary> |
| | 67 | | /// <param name="stream"> |
| | 68 | | /// Stream to buffer from. |
| | 69 | | /// </param> |
| | 70 | | /// <param name="minCount"> |
| | 71 | | /// Minimum number of bytes to buffer. This method will not return until at least this many bytes have been read |
| | 72 | | /// </param> |
| | 73 | | /// <param name="maxCount"> |
| | 74 | | /// Maximum number of bytes to buffer. |
| | 75 | | /// </param> |
| | 76 | | /// <param name="absolutePosition"> |
| | 77 | | /// Current position of the stream, since <see cref="Stream.Position"/> throws if not seekable. |
| | 78 | | /// </param> |
| | 79 | | /// <param name="arrayPool"> |
| | 80 | | /// Pool to rent buffer space from. |
| | 81 | | /// </param> |
| | 82 | | /// <param name="maxArrayPoolRentalSize"> |
| | 83 | | /// Max size we can request from the array pool. |
| | 84 | | /// </param> |
| | 85 | | /// <param name="async"> |
| | 86 | | /// Whether to perform this operation asynchronously. |
| | 87 | | /// </param> |
| | 88 | | /// <param name="cancellationToken"> |
| | 89 | | /// Cancellation token. |
| | 90 | | /// </param> |
| | 91 | | /// <returns> |
| | 92 | | /// The buffered stream partition with memory backed by an array pool. |
| | 93 | | /// </returns> |
| | 94 | | internal static async Task<PooledMemoryStream> BufferStreamPartitionInternal( |
| | 95 | | Stream stream, |
| | 96 | | long minCount, |
| | 97 | | long maxCount, |
| | 98 | | long absolutePosition, |
| | 99 | | ArrayPool<byte> arrayPool, |
| | 100 | | int? maxArrayPoolRentalSize, |
| | 101 | | bool async, |
| | 102 | | CancellationToken cancellationToken) |
| | 103 | | { |
| 210 | 104 | | long totalRead = 0; |
| 210 | 105 | | var streamPartition = new PooledMemoryStream(arrayPool, absolutePosition, maxArrayPoolRentalSize ?? DefaultM |
| | 106 | |
|
| | 107 | | // max count to write into a single array |
| | 108 | | int maxCountIndividualBuffer; |
| | 109 | | // min count to write into a single array |
| | 110 | | int minCountIndividualBuffer; |
| | 111 | | // the amount that was written into the current array |
| | 112 | | int readIndividualBuffer; |
| | 113 | | do |
| | 114 | | { |
| | 115 | | // buffer to write to |
| | 116 | | byte[] buffer; |
| | 117 | | // offset to start writing at |
| | 118 | | int offset; |
| 218 | 119 | | BufferPartition latestBuffer = streamPartition.GetLatestBufferWithAvailableSpaceOrDefault(); |
| | 120 | | // whether we got a brand new buffer to write into |
| | 121 | | bool newbuffer; |
| 218 | 122 | | if (latestBuffer != default) |
| | 123 | | { |
| 8 | 124 | | buffer = latestBuffer.Buffer; |
| 8 | 125 | | offset = latestBuffer.DataLength; |
| 8 | 126 | | newbuffer = false; |
| | 127 | | } |
| | 128 | | else |
| | 129 | | { |
| 210 | 130 | | buffer = arrayPool.Rent((int)Math.Min(maxCount - totalRead, streamPartition.MaxArraySize)); |
| 210 | 131 | | offset = 0; |
| 210 | 132 | | newbuffer = true; |
| | 133 | | } |
| | 134 | |
|
| | 135 | | // limit max and min count for this buffer by buffer length |
| 218 | 136 | | maxCountIndividualBuffer = (int)Math.Min(maxCount - totalRead, buffer.Length - offset); |
| | 137 | | // definitionally limited by max; we won't ever have a swapped min/max range |
| 218 | 138 | | minCountIndividualBuffer = (int)Math.Min(minCount - totalRead, maxCountIndividualBuffer); |
| | 139 | |
|
| 218 | 140 | | readIndividualBuffer = await ReadLoopInternal( |
| 218 | 141 | | stream, |
| 218 | 142 | | buffer, |
| 218 | 143 | | offset: offset, |
| 218 | 144 | | minCountIndividualBuffer, |
| 218 | 145 | | maxCountIndividualBuffer, |
| 218 | 146 | | async, |
| 218 | 147 | | cancellationToken).ConfigureAwait(false); |
| | 148 | | // if nothing was placed in a brand new array |
| 218 | 149 | | if (readIndividualBuffer == 0 && newbuffer) |
| | 150 | | { |
| 22 | 151 | | arrayPool.Return(buffer); |
| | 152 | | } |
| | 153 | | // if brand new array and we did place data in it |
| 196 | 154 | | else if (newbuffer) |
| | 155 | | { |
| 188 | 156 | | streamPartition.BufferSet.Add(new BufferPartition |
| 188 | 157 | | { |
| 188 | 158 | | Buffer = buffer, |
| 188 | 159 | | DataLength = readIndividualBuffer |
| 188 | 160 | | }); |
| | 161 | | } |
| | 162 | | // added to an existing array that was not entirely filled |
| | 163 | | else |
| | 164 | | { |
| 8 | 165 | | latestBuffer.DataLength += readIndividualBuffer; |
| | 166 | | } |
| | 167 | |
|
| 218 | 168 | | totalRead += readIndividualBuffer; |
| | 169 | |
|
| | 170 | | /* If we filled the buffer this loop, then quitting on min count is pointless. The point of quitting |
| | 171 | | * on min count is when the source stream doesn't have available bytes and we've reached an amount worth |
| | 172 | | * sending instead of blocking on. If we filled the available array, we don't actually know whether more |
| | 173 | | * data is available yet, as we limited our read for reasons outside the stream state. We should therefore |
| | 174 | | * try another read regardless of whether we hit min count. |
| | 175 | | */ |
| 436 | 176 | | } while ( |
| 218 | 177 | | // stream is done if this value is zero; no other check matters |
| 218 | 178 | | readIndividualBuffer != 0 && |
| 218 | 179 | | // stop filling the partition if we've hit the max size of the partition |
| 218 | 180 | | totalRead < maxCount && |
| 218 | 181 | | // stop filling the partition if we've reached min count and we know we've hit at least a pause in the s |
| 218 | 182 | | (totalRead < minCount || readIndividualBuffer == maxCountIndividualBuffer)); |
| | 183 | |
|
| 210 | 184 | | return streamPartition; |
| 210 | 185 | | } |
| | 186 | |
|
| | 187 | | /// <summary> |
| | 188 | | /// Loops Read() calls into buffer until minCount is reached or stream returns 0. |
| | 189 | | /// </summary> |
| | 190 | | /// <returns>Bytes read.</returns> |
| | 191 | | /// <remarks> |
| | 192 | | /// This method may have read bytes even if it has reached the confirmed end of stream. You will have to call |
| | 193 | | /// this method again and read zero bytes to get that confirmation. |
| | 194 | | /// </remarks> |
| | 195 | | private static async Task<int> ReadLoopInternal(Stream stream, byte[] buffer, int offset, int minCount, int maxC |
| | 196 | | { |
| 218 | 197 | | if (minCount > maxCount) |
| | 198 | | { |
| 0 | 199 | | throw new ArgumentException($"{nameof(minCount)} cannot be greater than {nameof(maxCount)}."); |
| | 200 | | } |
| 218 | 201 | | if (maxCount <= 0) |
| | 202 | | { |
| 0 | 203 | | throw new ArgumentException("Cannot read a non-positive number of bytes."); |
| | 204 | | } |
| | 205 | |
|
| 218 | 206 | | int totalRead = 0; |
| | 207 | | do |
| | 208 | | { |
| 5066 | 209 | | int read = async |
| 5066 | 210 | | ? await stream.ReadAsync(buffer, offset + totalRead, maxCount - totalRead, cancellationToken).Config |
| 5066 | 211 | | : stream.Read(buffer, offset + totalRead, maxCount - totalRead); |
| | 212 | | // either we have read maxCount in total or the stream has ended |
| 5066 | 213 | | if (read == 0) |
| | 214 | | { |
| | 215 | | break; |
| | 216 | | } |
| 5028 | 217 | | totalRead += read; |
| | 218 | | // we always request the number that will bring our total read to maxCount |
| | 219 | | // if the stream can only give us so much at the moment and we've at least hit minCount, we can exit |
| 5028 | 220 | | } while (totalRead < minCount); |
| 218 | 221 | | return totalRead; |
| 218 | 222 | | } |
| | 223 | |
|
| 188 | 224 | | public override bool CanRead => true; |
| | 225 | |
|
| 188 | 226 | | public override bool CanSeek => true; |
| | 227 | |
|
| 0 | 228 | | public override bool CanWrite => false; |
| | 229 | |
|
| 4464 | 230 | | public override long Length => BufferSet.Sum(tuple => (long)tuple.DataLength); |
| | 231 | |
|
| 2444 | 232 | | public override long Position { get; set; } |
| | 233 | |
|
| | 234 | | public override void Flush() |
| | 235 | | { |
| | 236 | | // no-op, just like MemoryStream |
| 0 | 237 | | } |
| | 238 | |
|
| | 239 | | public override int Read(byte[] buffer, int offset, int count) |
| | 240 | | { |
| 376 | 241 | | if (Position >= Length) |
| | 242 | | { |
| 188 | 243 | | return 0; |
| | 244 | | } |
| | 245 | |
|
| 188 | 246 | | int read = 0; |
| 376 | 247 | | while (read < count && Position < Length) |
| | 248 | | { |
| 188 | 249 | | (byte[] currentBuffer, int bufferCount, long offsetOfBuffer) = GetBufferFromPosition(); |
| | 250 | |
|
| 188 | 251 | | int toCopy = (int)Min( |
| 188 | 252 | | Length - Position, |
| 188 | 253 | | bufferCount - (Position - offsetOfBuffer), |
| 188 | 254 | | count - read); |
| 188 | 255 | | Array.Copy(currentBuffer, Position - offsetOfBuffer, buffer, read, toCopy); |
| 188 | 256 | | read += toCopy; |
| 188 | 257 | | Position += toCopy; |
| | 258 | | } |
| | 259 | |
|
| 188 | 260 | | return read; |
| | 261 | | } |
| | 262 | |
|
| | 263 | | /// <summary> |
| | 264 | | /// According the the current <see cref="Position"/> of the stream, gets the correct buffer containing the byte |
| | 265 | | /// at that position, as well as the stream position represented by the start of the array. |
| | 266 | | /// Position - offsetOfBuffer is the index in the returned array of the current byte. |
| | 267 | | /// </summary> |
| | 268 | | /// <returns></returns> |
| | 269 | | private (byte[] currentBuffer, int bufferCount, long offsetOfBuffer) GetBufferFromPosition() |
| | 270 | | { |
| 188 | 271 | | AssertPositionInBounds(); |
| | 272 | |
|
| 188 | 273 | | long countingPosition = 0; |
| 564 | 274 | | foreach (var tuple in BufferSet) |
| | 275 | | { |
| 188 | 276 | | if (countingPosition + tuple.DataLength <= Position) |
| | 277 | | { |
| 0 | 278 | | countingPosition += tuple.DataLength; |
| | 279 | | } |
| | 280 | | else |
| | 281 | | { |
| 188 | 282 | | return (tuple.Buffer, tuple.DataLength, countingPosition); |
| | 283 | | } |
| | 284 | | } |
| | 285 | |
|
| | 286 | | /* this.Length is defined as the sum of all counts. |
| | 287 | | * We already throw if this.Position >= this.Length. |
| | 288 | | * We can only get here if this.Position is >= the sum of all counts. |
| | 289 | | * We will never get here. */ |
| 0 | 290 | | throw new InvalidOperationException("Incorrect stream partition length."); |
| 188 | 291 | | } |
| | 292 | |
|
| | 293 | | public override long Seek(long offset, SeekOrigin origin) |
| | 294 | | { |
| | 295 | | switch (origin) |
| | 296 | | { |
| | 297 | | case SeekOrigin.Begin: |
| 0 | 298 | | Position = offset; |
| 0 | 299 | | break; |
| | 300 | | case SeekOrigin.Current: |
| 0 | 301 | | Position += offset; |
| 0 | 302 | | break; |
| | 303 | | case SeekOrigin.End: |
| 0 | 304 | | Position = Length + offset; |
| | 305 | | break; |
| | 306 | | } |
| | 307 | |
|
| 0 | 308 | | return Position; |
| | 309 | | } |
| | 310 | |
|
| | 311 | | public override void SetLength(long value) |
| | 312 | | { |
| 0 | 313 | | throw new NotSupportedException(); |
| | 314 | | } |
| | 315 | |
|
| | 316 | | public override void Write(byte[] buffer, int offset, int count) |
| | 317 | | { |
| 0 | 318 | | throw new NotSupportedException(); |
| | 319 | | } |
| | 320 | |
|
| | 321 | | protected override void Dispose(bool disposing) |
| | 322 | | { |
| 752 | 323 | | foreach (var buffer in BufferSet) |
| | 324 | | { |
| 188 | 325 | | ArrayPool.Return(buffer.Buffer); |
| | 326 | | } |
| 188 | 327 | | BufferSet.Clear(); |
| 188 | 328 | | } |
| | 329 | |
|
| | 330 | | private void AssertPositionInBounds() |
| | 331 | | { |
| 188 | 332 | | if (Position >= Length || Position < 0) |
| | 333 | | { |
| 0 | 334 | | throw new InvalidOperationException("Cannot read outside the bounds of this stream."); |
| | 335 | | } |
| 188 | 336 | | } |
| | 337 | |
|
| | 338 | | private BufferPartition GetLatestBufferWithAvailableSpaceOrDefault() |
| | 339 | | { |
| 218 | 340 | | var latestBuffer = BufferSet.LastOrDefault(); |
| | 341 | |
|
| 218 | 342 | | if (latestBuffer == default || latestBuffer.DataLength >= latestBuffer.Buffer.Length) |
| | 343 | | { |
| 210 | 344 | | return default; |
| | 345 | | } |
| | 346 | |
|
| 8 | 347 | | return latestBuffer; |
| | 348 | | } |
| | 349 | |
|
| | 350 | | private static long Min(long val1, long val2, long val3) |
| | 351 | | { |
| 188 | 352 | | long result = Math.Min(val1, val2); |
| 188 | 353 | | result = Math.Min(result, val3); |
| | 354 | |
|
| 188 | 355 | | return result; |
| | 356 | | } |
| | 357 | | } |
| | 358 | | } |