| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.IO; |
| | 6 | | using System.Threading; |
| | 7 | | using System.Threading.Tasks; |
| | 8 | | using Azure.Core.Pipeline; |
| | 9 | |
|
| | 10 | | #pragma warning disable SA1402 // File may only contain a single type |
| | 11 | | // branching logic on wrapping seekable vs unseekable streams has been handled via inheritance |
| | 12 | |
|
| | 13 | | namespace Azure.Storage.Shared |
| | 14 | | { |
| | 15 | | /// <summary> |
| | 16 | | /// Exposes a predetermined slice of a larger stream using the same Stream interface. |
| | 17 | | /// There should not be access to the base stream while this facade is in use. |
| | 18 | | /// </summary> |
| | 19 | | internal abstract class WindowStream : SlicedStream |
| | 20 | | { |
| 64 | 21 | | private Stream InnerStream { get; } |
| | 22 | |
|
| 80 | 23 | | public override long AbsolutePosition { get; } |
| | 24 | |
|
| 16 | 25 | | public override bool CanRead => true; |
| | 26 | |
|
| 0 | 27 | | public override bool CanWrite => false; |
| | 28 | |
|
| | 29 | | /// <summary> |
| | 30 | | /// Constructs a window of an underlying stream. |
| | 31 | | /// </summary> |
| | 32 | | /// <param name="stream"> |
| | 33 | | /// Potentialy unseekable stream to expose a window of. |
| | 34 | | /// </param> |
| | 35 | | /// <param name="absolutePosition"> |
| | 36 | | /// The offset of this stream from the start of the wrapped stream. |
| | 37 | | /// </param> |
| 24 | 38 | | private WindowStream(Stream stream, long absolutePosition) |
| | 39 | | { |
| 24 | 40 | | InnerStream = stream; |
| 24 | 41 | | AbsolutePosition = absolutePosition; |
| 24 | 42 | | } |
| | 43 | |
|
| | 44 | | public static WindowStream GetWindow(Stream stream, long maxWindowLength, long absolutePosition = default) |
| | 45 | | { |
| 24 | 46 | | if (stream.CanSeek) |
| | 47 | | { |
| 24 | 48 | | return new SeekableWindowStream(stream, maxWindowLength); |
| | 49 | | } |
| | 50 | | else |
| | 51 | | { |
| 0 | 52 | | return new UnseekableWindowStream(stream, maxWindowLength, absolutePosition); |
| | 53 | | } |
| | 54 | | } |
| | 55 | |
|
| | 56 | | public override void Flush() |
| | 57 | | { |
| 0 | 58 | | throw new NotSupportedException(); |
| | 59 | | } |
| | 60 | |
|
| 0 | 61 | | public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); |
| | 62 | |
|
| 0 | 63 | | public override void WriteByte(byte value) => throw new NotSupportedException(); |
| | 64 | |
|
| | 65 | | /// <inheritdoc/> |
| | 66 | | /// <remarks> |
| | 67 | | /// Implementing this method takes advantage of any optimizations in the wrapped stream's implementation. |
| | 68 | | /// </remarks> |
| | 69 | | public override int ReadByte() |
| | 70 | | { |
| 0 | 71 | | if (AdjustCount(1) <= 0) |
| | 72 | | { |
| 0 | 73 | | return -1; |
| | 74 | | } |
| | 75 | |
|
| 0 | 76 | | int val = InnerStream.ReadByte(); |
| 0 | 77 | | if (val != -1) |
| | 78 | | { |
| 0 | 79 | | ReportInnerStreamRead(1); |
| | 80 | | } |
| | 81 | |
|
| 0 | 82 | | return val; |
| | 83 | | } |
| | 84 | |
|
| | 85 | | public override int Read(byte[] buffer, int offset, int count) |
| 16 | 86 | | => ReadInternal(buffer, offset, count, async: false, cancellationToken: default).EnsureCompleted(); |
| | 87 | |
|
| | 88 | | public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationTo |
| 16 | 89 | | => await ReadInternal(buffer, offset, count, async: true, cancellationToken).ConfigureAwait(false); |
| | 90 | |
|
| | 91 | | private async Task<int> ReadInternal(byte[] buffer, int offset, int count, bool async, CancellationToken cancell |
| | 92 | | { |
| 32 | 93 | | count = AdjustCount(count); |
| 32 | 94 | | if (count <= 0) |
| | 95 | | { |
| 16 | 96 | | return 0; |
| | 97 | | } |
| | 98 | |
|
| 16 | 99 | | int result = async |
| 16 | 100 | | ? await InnerStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false) |
| 16 | 101 | | : InnerStream.Read(buffer, offset, count); |
| | 102 | |
|
| 16 | 103 | | ReportInnerStreamRead(result); |
| 16 | 104 | | return result; |
| 32 | 105 | | } |
| | 106 | |
|
| | 107 | | protected abstract int AdjustCount(int count); |
| | 108 | |
|
| | 109 | | protected abstract void ReportInnerStreamRead(int resultRead); |
| | 110 | |
|
| | 111 | | /// <summary> |
| | 112 | | /// Exposes a predetermined slice of a larger, unseekable stream using the same Stream |
| | 113 | | /// interface. There should not be access to the base stream while this facade is in use. |
| | 114 | | /// This stream wrapper is unseekable. To wrap a partition of an unseekable stream where |
| | 115 | | /// the partition is seekable, see <see cref="PooledMemoryStream"/>. |
| | 116 | | /// </summary> |
| | 117 | | private class UnseekableWindowStream : WindowStream |
| | 118 | | { |
| | 119 | | private long _position = 0; |
| | 120 | |
|
| 0 | 121 | | private long MaxLength { get; } |
| | 122 | |
|
| 0 | 123 | | public UnseekableWindowStream(Stream stream, long maxWindowLength, long absolutePosition) : base(stream, abs |
| | 124 | | { |
| 0 | 125 | | MaxLength = maxWindowLength; |
| 0 | 126 | | } |
| | 127 | |
|
| 0 | 128 | | public override bool CanSeek => false; |
| | 129 | |
|
| 0 | 130 | | public override long Length => throw new NotSupportedException(); |
| | 131 | |
|
| 0 | 132 | | public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedExcep |
| | 133 | |
|
| 0 | 134 | | public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); |
| | 135 | |
|
| 0 | 136 | | public override void SetLength(long value) => throw new NotSupportedException(); |
| | 137 | |
|
| | 138 | | protected override int AdjustCount(int count) |
| 0 | 139 | | => (int)Math.Min(count, MaxLength - _position); |
| | 140 | |
|
| | 141 | | protected override void ReportInnerStreamRead(int resultRead) |
| 0 | 142 | | => _position += resultRead; |
| | 143 | | } |
| | 144 | |
|
| | 145 | | /// <summary> |
| | 146 | | /// Exposes a predetermined slice of a larger, seekable stream using the same Stream |
| | 147 | | /// interface. There should not be access to the base stream while this facade is in use. |
| | 148 | | /// This stream wrapper is sseekable. To wrap a partition of an unseekable stream where |
| | 149 | | /// the partition is seekable, see <see cref="PooledMemoryStream"/>. |
| | 150 | | /// </summary> |
| | 151 | | private class SeekableWindowStream : WindowStream |
| | 152 | | { |
| 24 | 153 | | public SeekableWindowStream(Stream stream, long maxWindowLength) : base(stream, stream.Position) |
| | 154 | | { |
| | 155 | | // accessing the stream's Position in the constructor acts as our validator that we're wrapping a seekab |
| 24 | 156 | | Length = Math.Min( |
| 24 | 157 | | stream.Length - stream.Position, |
| 24 | 158 | | maxWindowLength); |
| 24 | 159 | | } |
| | 160 | |
|
| 16 | 161 | | public override bool CanSeek => true; |
| | 162 | |
|
| 144 | 163 | | public override long Length { get; } |
| | 164 | |
|
| | 165 | | public override long Position |
| | 166 | | { |
| 48 | 167 | | get => InnerStream.Position - AbsolutePosition; |
| 0 | 168 | | set => InnerStream.Position = AbsolutePosition + value; |
| | 169 | | } |
| | 170 | |
|
| | 171 | | public override long Seek(long offset, SeekOrigin origin) |
| | 172 | | { |
| | 173 | | switch (origin) |
| | 174 | | { |
| | 175 | | case SeekOrigin.Begin: |
| 0 | 176 | | InnerStream.Seek(AbsolutePosition + offset, SeekOrigin.Begin); |
| 0 | 177 | | break; |
| | 178 | | case SeekOrigin.Current: |
| 0 | 179 | | InnerStream.Seek(InnerStream.Position + offset, SeekOrigin.Current); |
| 0 | 180 | | break; |
| | 181 | | case SeekOrigin.End: |
| 0 | 182 | | InnerStream.Seek((AbsolutePosition + this.Length) - InnerStream.Length + offset, SeekOrigin.End) |
| | 183 | | break; |
| | 184 | | } |
| 0 | 185 | | return Position; |
| | 186 | | } |
| | 187 | |
|
| 0 | 188 | | public override void SetLength(long value) => throw new NotSupportedException(); |
| | 189 | |
|
| | 190 | | protected override int AdjustCount(int count) |
| 32 | 191 | | => (int)Math.Min(count, Length - Position); |
| | 192 | |
|
| | 193 | | protected override void ReportInnerStreamRead(int resultRead) |
| | 194 | | { |
| | 195 | | // no-op, inner stream took care of position adjustment |
| 16 | 196 | | } |
| | 197 | | } |
| | 198 | | } |
| | 199 | |
|
| | 200 | | internal static partial class StreamExtensions |
| | 201 | | { |
| | 202 | | /// <summary> |
| | 203 | | /// Some streams will throw if you try to access their length so we wrap |
| | 204 | | /// the check in a TryGet helper. |
| | 205 | | /// </summary> |
| | 206 | | public static long? GetPositionOrDefault(this Stream content) |
| | 207 | | { |
| | 208 | | if (content == null) |
| | 209 | | { |
| | 210 | | /* Returning 0 instead of default puts us on the quick and clean one-shot upload, |
| | 211 | | * which produces more consistent fail state with how a 1-1 method on the convenience |
| | 212 | | * layer would fail. |
| | 213 | | */ |
| | 214 | | return 0; |
| | 215 | | } |
| | 216 | | try |
| | 217 | | { |
| | 218 | | if (content.CanSeek) |
| | 219 | | { |
| | 220 | | return content.Position; |
| | 221 | | } |
| | 222 | | } |
| | 223 | | catch (NotSupportedException) |
| | 224 | | { |
| | 225 | | } |
| | 226 | | return default; |
| | 227 | | } |
| | 228 | | } |
| | 229 | | } |