|   |  | 1 |  | // Copyright (c) Microsoft Corporation. All rights reserved. | 
|   |  | 2 |  | // Licensed under the MIT License. | 
|   |  | 3 |  |  | 
|   |  | 4 |  | using System; | 
|   |  | 5 |  | using System.Collections.Generic; | 
|   |  | 6 |  | using System.IO; | 
|   |  | 7 |  | using System.Linq; | 
|   |  | 8 |  | using System.Threading.Tasks; | 
|   |  | 9 |  | using System.Text.Json; | 
|   |  | 10 |  | using System.Threading; | 
|   |  | 11 |  |  | 
|   |  | 12 |  | namespace Azure.Storage.Internal.Avro | 
|   |  | 13 |  | { | 
|   |  | 14 |  |     internal class AvroReader : IDisposable | 
|   |  | 15 |  |     { | 
|   |  | 16 |  |         /// <summary> | 
|   |  | 17 |  |         /// Stream containing the body of the Avro file. | 
|   |  | 18 |  |         /// </summary> | 
|   |  | 19 |  |         private readonly Stream _dataStream; | 
|   |  | 20 |  |  | 
|   |  | 21 |  |         /// <summary> | 
|   |  | 22 |  |         /// Stream containing the header of the Avro file. | 
|   |  | 23 |  |         /// </summary> | 
|   |  | 24 |  |         private readonly Stream _headerStream; | 
|   |  | 25 |  |  | 
|   |  | 26 |  |         /// <summary> | 
|   |  | 27 |  |         /// Sync marker. | 
|   |  | 28 |  |         /// </summary> | 
|   |  | 29 |  |         private byte[] _syncMarker; | 
|   |  | 30 |  |  | 
|   |  | 31 |  |         /// <summary> | 
|   |  | 32 |  |         /// Avro metadata. | 
|   |  | 33 |  |         /// </summary> | 
|   |  | 34 |  |         private Dictionary<string, string> _metadata; | 
|   |  | 35 |  |  | 
|   |  | 36 |  |         /// <summary> | 
|   |  | 37 |  |         /// Avro schema. | 
|   |  | 38 |  |         /// </summary> | 
|   |  | 39 |  |         private AvroType _itemType; | 
|   |  | 40 |  |  | 
|   |  | 41 |  |         /// <summary> | 
|   |  | 42 |  |         /// The number of items remaining in the current block. | 
|   |  | 43 |  |         /// </summary> | 
|   |  | 44 |  |         private long _itemsRemainingInBlock; | 
|   |  | 45 |  |  | 
|   |  | 46 |  |         /// <summary> | 
|   |  | 47 |  |         /// The byte offset within the Avro file (both header and data) | 
|   |  | 48 |  |         /// of the start of the current block. | 
|   |  | 49 |  |         /// </summary> | 
|   | 144 | 50 |  |         public virtual long BlockOffset { get; private set; } | 
|   |  | 51 |  |  | 
|   |  | 52 |  |         /// <summary> | 
|   |  | 53 |  |         /// The index of the current object within the current block. | 
|   |  | 54 |  |         /// </summary> | 
|   |  | 55 |  |         /// <returns></returns> | 
|   | 144 | 56 |  |         public virtual long ObjectIndex { get; private set; } | 
|   |  | 57 |  |  | 
|   |  | 58 |  |         /// <summary> | 
|   |  | 59 |  |         /// If this Avro Reader has been initalized. | 
|   |  | 60 |  |         /// </summary> | 
|   |  | 61 |  |         private bool _initalized; | 
|   |  | 62 |  |  | 
|   |  | 63 |  |         /// <summary> | 
|   |  | 64 |  |         /// To detect redundant calls | 
|   |  | 65 |  |         /// </summary> | 
|   |  | 66 |  |         private bool _disposed = false; | 
|   |  | 67 |  |  | 
|   |  | 68 |  |         /// <summary> | 
|   |  | 69 |  |         /// Remembers where we started if partial data stream was provided. | 
|   |  | 70 |  |         /// </summary> | 
|   |  | 71 |  |         private readonly long _initialBlockOffset; | 
|   |  | 72 |  |  | 
|   |  | 73 |  |         /// <summary> | 
|   |  | 74 |  |         /// Constructor for an AvroReader that will read from the | 
|   |  | 75 |  |         /// beginning of an Avro file. | 
|   |  | 76 |  |         /// </summary> | 
|   | 48 | 77 |  |         public AvroReader(Stream dataStream) | 
|   |  | 78 |  |         { | 
|   | 48 | 79 |  |             if (dataStream.CanSeek) | 
|   |  | 80 |  |             { | 
|   | 24 | 81 |  |                 _dataStream = dataStream; | 
|   | 24 | 82 |  |                 _headerStream = dataStream; | 
|   |  | 83 |  |             } | 
|   |  | 84 |  |             else | 
|   |  | 85 |  |             { | 
|   | 24 | 86 |  |                 _dataStream = new StreamWithPosition(dataStream); | 
|   | 24 | 87 |  |                 _headerStream = _dataStream; | 
|   |  | 88 |  |             } | 
|   |  | 89 |  |  | 
|   | 48 | 90 |  |             _metadata = new Dictionary<string, string>(); | 
|   | 48 | 91 |  |             _initalized = false; | 
|   | 48 | 92 |  |         } | 
|   |  | 93 |  |  | 
|   |  | 94 |  |         /// <summary> | 
|   |  | 95 |  |         /// Constructor for an Avro Reader that will read beginning | 
|   |  | 96 |  |         /// in the middle of an Avro file. | 
|   |  | 97 |  |         /// </summary> | 
|   | 0 | 98 |  |         public AvroReader( | 
|   | 0 | 99 |  |             Stream dataStream, | 
|   | 0 | 100 |  |             Stream headerStream, | 
|   | 0 | 101 |  |             long currentBlockOffset, | 
|   | 0 | 102 |  |             long indexWithinCurrentBlock) | 
|   |  | 103 |  |         { | 
|   | 0 | 104 |  |             if (dataStream.CanSeek) | 
|   |  | 105 |  |             { | 
|   | 0 | 106 |  |                 _dataStream = dataStream; | 
|   |  | 107 |  |             } | 
|   |  | 108 |  |             else | 
|   |  | 109 |  |             { | 
|   | 0 | 110 |  |                 _dataStream = new StreamWithPosition(dataStream); | 
|   |  | 111 |  |             } | 
|   |  | 112 |  |  | 
|   | 0 | 113 |  |             if (headerStream.CanSeek) | 
|   |  | 114 |  |             { | 
|   | 0 | 115 |  |                 _headerStream = headerStream; | 
|   |  | 116 |  |             } | 
|   |  | 117 |  |             else | 
|   |  | 118 |  |             { | 
|   | 0 | 119 |  |                 _headerStream = new StreamWithPosition(headerStream); | 
|   |  | 120 |  |             } | 
|   |  | 121 |  |  | 
|   | 0 | 122 |  |             _metadata = new Dictionary<string, string>(); | 
|   | 0 | 123 |  |             _initalized = false; | 
|   | 0 | 124 |  |             _initialBlockOffset = currentBlockOffset; | 
|   | 0 | 125 |  |             BlockOffset = currentBlockOffset; | 
|   | 0 | 126 |  |             ObjectIndex = indexWithinCurrentBlock; | 
|   | 0 | 127 |  |             _initalized = false; | 
|   | 0 | 128 |  |         } | 
|   |  | 129 |  |  | 
|   |  | 130 |  |         /// <summary> | 
|   |  | 131 |  |         /// Constructor for mocking.  Do not use. | 
|   |  | 132 |  |         /// </summary> | 
|   | 0 | 133 |  |         public AvroReader() { } | 
|   |  | 134 |  |  | 
|   |  | 135 |  |         public virtual async Task Initalize(bool async, CancellationToken cancellationToken = default) | 
|   |  | 136 |  |         { | 
|   |  | 137 |  |             // Four bytes, ASCII 'O', 'b', 'j', followed by 1. | 
|   | 48 | 138 |  |             byte[] header = await AvroParser.ReadFixedBytesAsync(_headerStream, AvroConstants.InitBytes.Length, async, c | 
|   | 48 | 139 |  |             if (!header.SequenceEqual(AvroConstants.InitBytes)) | 
|   |  | 140 |  |             { | 
|   | 0 | 141 |  |                 throw new ArgumentException("Stream is not an Avro file."); | 
|   |  | 142 |  |             } | 
|   |  | 143 |  |  | 
|   |  | 144 |  |             // File metadata is written as if defined by the following map schema: | 
|   |  | 145 |  |             // { "type": "map", "values": "bytes"} | 
|   | 48 | 146 |  |             _metadata = await AvroParser.ReadMapAsync(_headerStream, AvroParser.ReadStringAsync, async, cancellationToke | 
|   |  | 147 |  |  | 
|   |  | 148 |  |             // Validate codec | 
|   | 48 | 149 |  |             _metadata.TryGetValue(AvroConstants.CodecKey, out string codec); | 
|   | 48 | 150 |  |             if (!(codec == null || codec == "null")) | 
|   |  | 151 |  |             { | 
|   | 0 | 152 |  |                 throw new ArgumentException("Codecs are not supported"); | 
|   |  | 153 |  |             } | 
|   |  | 154 |  |  | 
|   |  | 155 |  |             // The 16-byte, randomly-generated sync marker for this file. | 
|   | 48 | 156 |  |             _syncMarker = await AvroParser.ReadFixedBytesAsync(_headerStream, AvroConstants.SyncMarkerSize, async, cance | 
|   |  | 157 |  |  | 
|   |  | 158 |  |             // Parse the schema | 
|   | 48 | 159 |  |             using JsonDocument schema = JsonDocument.Parse(_metadata[AvroConstants.SchemaKey]); | 
|   | 48 | 160 |  |             _itemType = AvroType.FromSchema(schema.RootElement); | 
|   |  | 161 |  |  | 
|   | 48 | 162 |  |             if (BlockOffset == 0) | 
|   |  | 163 |  |             { | 
|   | 48 | 164 |  |                 BlockOffset = _initialBlockOffset + _dataStream.Position; | 
|   |  | 165 |  |             } | 
|   |  | 166 |  |  | 
|   |  | 167 |  |             // Populate _itemsRemainingInCurrentBlock | 
|   | 48 | 168 |  |             _itemsRemainingInBlock = await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwai | 
|   |  | 169 |  |  | 
|   |  | 170 |  |             // skip block length | 
|   | 48 | 171 |  |             await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); | 
|   |  | 172 |  |  | 
|   | 48 | 173 |  |             _initalized = true; | 
|   |  | 174 |  |  | 
|   | 48 | 175 |  |             if (ObjectIndex > 0) | 
|   |  | 176 |  |             { | 
|   | 0 | 177 |  |                 for (int i = 0; i < ObjectIndex; i++) | 
|   |  | 178 |  |                 { | 
|   | 0 | 179 |  |                     await _itemType.ReadAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); | 
|   | 0 | 180 |  |                     _itemsRemainingInBlock--; | 
|   |  | 181 |  |                 } | 
|   |  | 182 |  |             } | 
|   | 48 | 183 |  |         } | 
|   |  | 184 |  |  | 
|   | 48 | 185 |  |         public virtual bool HasNext() => !_initalized || _itemsRemainingInBlock > 0; | 
|   |  | 186 |  |  | 
|   |  | 187 |  |         public virtual async Task<object> Next(bool async, CancellationToken cancellationToken = default) | 
|   |  | 188 |  |         { | 
|   |  | 189 |  |             // Initialize AvroReader, if necessary. | 
|   | 48 | 190 |  |             if (!_initalized) | 
|   |  | 191 |  |             { | 
|   | 48 | 192 |  |                 await Initalize(async, cancellationToken).ConfigureAwait(false); | 
|   |  | 193 |  |             } | 
|   |  | 194 |  |  | 
|   | 48 | 195 |  |             if (!HasNext()) | 
|   |  | 196 |  |             { | 
|   | 0 | 197 |  |                 throw new ArgumentException("There are no more items in the stream"); | 
|   |  | 198 |  |             } | 
|   |  | 199 |  |  | 
|   |  | 200 |  |  | 
|   | 48 | 201 |  |             object result = await _itemType.ReadAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); | 
|   |  | 202 |  |  | 
|   | 48 | 203 |  |             _itemsRemainingInBlock--; | 
|   | 48 | 204 |  |             ObjectIndex++; | 
|   |  | 205 |  |  | 
|   | 48 | 206 |  |             if (_itemsRemainingInBlock == 0) | 
|   |  | 207 |  |             { | 
|   | 0 | 208 |  |                 byte[] marker = await AvroParser.ReadFixedBytesAsync(_dataStream, 16, async, cancellationToken).Configur | 
|   |  | 209 |  |  | 
|   | 0 | 210 |  |                 BlockOffset = _initialBlockOffset + _dataStream.Position; | 
|   | 0 | 211 |  |                 ObjectIndex = 0; | 
|   |  | 212 |  |  | 
|   | 0 | 213 |  |                 if (!_syncMarker.SequenceEqual(marker)) | 
|   |  | 214 |  |                 { | 
|   | 0 | 215 |  |                     throw new ArgumentException("Stream is not a valid Avro file."); | 
|   |  | 216 |  |                 } | 
|   |  | 217 |  |  | 
|   |  | 218 |  |                 try | 
|   |  | 219 |  |                 { | 
|   | 0 | 220 |  |                     _itemsRemainingInBlock = await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).Confi | 
|   | 0 | 221 |  |                 } | 
|   | 0 | 222 |  |                 catch (InvalidOperationException) | 
|   |  | 223 |  |                 { | 
|   |  | 224 |  |                     // We hit the end of the stream. | 
|   | 0 | 225 |  |                 } | 
|   |  | 226 |  |  | 
|   | 0 | 227 |  |                 if (_itemsRemainingInBlock > 0) | 
|   |  | 228 |  |                 { | 
|   |  | 229 |  |                     // Ignore block size | 
|   | 0 | 230 |  |                     await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); | 
|   |  | 231 |  |                 } | 
|   |  | 232 |  |             } | 
|   |  | 233 |  |  | 
|   | 48 | 234 |  |             return result; | 
|   | 48 | 235 |  |         } | 
|   |  | 236 |  |  | 
|   | 0 | 237 |  |         public void Dispose() => Dispose(true); | 
|   |  | 238 |  |  | 
|   |  | 239 |  |         protected virtual void Dispose(bool disposing) | 
|   |  | 240 |  |         { | 
|   | 0 | 241 |  |             if (_disposed) | 
|   |  | 242 |  |             { | 
|   | 0 | 243 |  |                 return; | 
|   |  | 244 |  |             } | 
|   |  | 245 |  |  | 
|   | 0 | 246 |  |             if (disposing) | 
|   |  | 247 |  |             { | 
|   | 0 | 248 |  |                 _dataStream.Dispose(); | 
|   | 0 | 249 |  |                 _headerStream.Dispose(); | 
|   |  | 250 |  |             } | 
|   |  | 251 |  |  | 
|   | 0 | 252 |  |             _disposed = true; | 
|   | 0 | 253 |  |         } | 
|   |  | 254 |  |     } | 
|   |  | 255 |  | } |