< Summary

Class:Azure.Storage.Internal.Avro.AvroReader
Assembly:Azure.Storage.Blobs.ChangeFeed
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Internal.Avro\src\AvroReader.cs
Covered lines:61
Uncovered lines:19
Coverable lines:80
Total lines:255
Line coverage:76.2% (61 of 80)
Covered branches:22
Total branches:38
Branch coverage:57.8% (22 of 38)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_BlockOffset()-100%100%
get_ObjectIndex()-100%100%
.ctor(...)-77.78%50%
.ctor(...)-88.89%50%
.ctor()-100%100%
Initalize()-90%68.75%
HasNext()-100%50%
Next()-76.19%70%
Dispose()-0%100%
Dispose(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Internal.Avro\src\AvroReader.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.IO;
 7using System.Linq;
 8using System.Threading.Tasks;
 9using System.Text.Json;
 10using System.Threading;
 11
 12namespace Azure.Storage.Internal.Avro
 13{
 14    internal class AvroReader : IDisposable
 15    {
 16        /// <summary>
 17        /// Stream containing the body of the Avro file.
 18        /// </summary>
 19        private readonly Stream _dataStream;
 20
 21        /// <summary>
 22        /// Stream containing the header of the Avro file.
 23        /// </summary>
 24        private readonly Stream _headerStream;
 25
 26        /// <summary>
 27        /// Sync marker.
 28        /// </summary>
 29        private byte[] _syncMarker;
 30
 31        /// <summary>
 32        /// Avro metadata.
 33        /// </summary>
 34        private Dictionary<string, string> _metadata;
 35
 36        /// <summary>
 37        /// Avro schema.
 38        /// </summary>
 39        private AvroType _itemType;
 40
 41        /// <summary>
 42        /// The number of items remaining in the current block.
 43        /// </summary>
 44        private long _itemsRemainingInBlock;
 45
 46        /// <summary>
 47        /// The byte offset within the Avro file (both header and data)
 48        /// of the start of the current block.
 49        /// </summary>
 3336250        public virtual long BlockOffset { get; private set; }
 51
 52        /// <summary>
 53        /// The index of the current object within the current block.
 54        /// </summary>
 55        /// <returns></returns>
 9250656        public virtual long ObjectIndex { get; private set; }
 57
 58        /// <summary>
 59        /// If this Avro Reader has been initalized.
 60        /// </summary>
 61        private bool _initalized;
 62
 63        /// <summary>
 64        /// To detect redundant calls
 65        /// </summary>
 66        private bool _disposed = false;
 67
 68        /// <summary>
 69        /// Remembers where we started if partial data stream was provided.
 70        /// </summary>
 71        private readonly long _initialBlockOffset;
 72
 73        /// <summary>
 74        /// Constructor for an AvroReader that will read from the
 75        /// beginning of an Avro file.
 76        /// </summary>
 16877        public AvroReader(Stream dataStream)
 78        {
 16879            if (dataStream.CanSeek)
 80            {
 081                _dataStream = dataStream;
 082                _headerStream = dataStream;
 83            }
 84            else
 85            {
 16886                _dataStream = new StreamWithPosition(dataStream);
 16887                _headerStream = _dataStream;
 88            }
 89
 16890            _metadata = new Dictionary<string, string>();
 16891            _initalized = false;
 16892        }
 93
 94        /// <summary>
 95        /// Constructor for an Avro Reader that will read beginning
 96        /// in the middle of an Avro file.
 97        /// </summary>
 4898        public AvroReader(
 4899            Stream dataStream,
 48100            Stream headerStream,
 48101            long currentBlockOffset,
 48102            long indexWithinCurrentBlock)
 103        {
 48104            if (dataStream.CanSeek)
 105            {
 0106                _dataStream = dataStream;
 107            }
 108            else
 109            {
 48110                _dataStream = new StreamWithPosition(dataStream);
 111            }
 112
 48113            if (headerStream.CanSeek)
 114            {
 0115                _headerStream = headerStream;
 116            }
 117            else
 118            {
 48119                _headerStream = new StreamWithPosition(headerStream);
 120            }
 121
 48122            _metadata = new Dictionary<string, string>();
 48123            _initalized = false;
 48124            _initialBlockOffset = currentBlockOffset;
 48125            BlockOffset = currentBlockOffset;
 48126            ObjectIndex = indexWithinCurrentBlock;
 48127            _initalized = false;
 48128        }
 129
 130        /// <summary>
 131        /// Constructor for mocking.  Do not use.
 132        /// </summary>
 24133        public AvroReader() { }
 134
 135        public virtual async Task Initalize(bool async, CancellationToken cancellationToken = default)
 136        {
 137            // Four bytes, ASCII 'O', 'b', 'j', followed by 1.
 216138            byte[] header = await AvroParser.ReadFixedBytesAsync(_headerStream, AvroConstants.InitBytes.Length, async, c
 216139            if (!header.SequenceEqual(AvroConstants.InitBytes))
 140            {
 0141                throw new ArgumentException("Stream is not an Avro file.");
 142            }
 143
 144            // File metadata is written as if defined by the following map schema:
 145            // { "type": "map", "values": "bytes"}
 216146            _metadata = await AvroParser.ReadMapAsync(_headerStream, AvroParser.ReadStringAsync, async, cancellationToke
 147
 148            // Validate codec
 216149            _metadata.TryGetValue(AvroConstants.CodecKey, out string codec);
 216150            if (!(codec == null || codec == "null"))
 151            {
 0152                throw new ArgumentException("Codecs are not supported");
 153            }
 154
 155            // The 16-byte, randomly-generated sync marker for this file.
 216156            _syncMarker = await AvroParser.ReadFixedBytesAsync(_headerStream, AvroConstants.SyncMarkerSize, async, cance
 157
 158            // Parse the schema
 216159            using JsonDocument schema = JsonDocument.Parse(_metadata[AvroConstants.SchemaKey]);
 216160            _itemType = AvroType.FromSchema(schema.RootElement);
 161
 216162            if (BlockOffset == 0)
 163            {
 168164                BlockOffset = _initialBlockOffset + _dataStream.Position;
 165            }
 166
 167            // Populate _itemsRemainingInCurrentBlock
 216168            _itemsRemainingInBlock = await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwai
 169
 170            // skip block length
 216171            await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false);
 172
 216173            _initalized = true;
 174
 216175            if (ObjectIndex > 0)
 176            {
 304177                for (int i = 0; i < ObjectIndex; i++)
 178                {
 120179                    await _itemType.ReadAsync(_dataStream, async, cancellationToken).ConfigureAwait(false);
 120180                    _itemsRemainingInBlock--;
 181                }
 182            }
 216183        }
 184
 148116185        public virtual bool HasNext() => !_initalized || _itemsRemainingInBlock > 0;
 186
 187        public virtual async Task<object> Next(bool async, CancellationToken cancellationToken = default)
 188        {
 189            // Initialize AvroReader, if necessary.
 29580190            if (!_initalized)
 191            {
 0192                await Initalize(async, cancellationToken).ConfigureAwait(false);
 193            }
 194
 29580195            if (!HasNext())
 196            {
 0197                throw new ArgumentException("There are no more items in the stream");
 198            }
 199
 200
 29580201            object result = await _itemType.ReadAsync(_dataStream, async, cancellationToken).ConfigureAwait(false);
 202
 29580203            _itemsRemainingInBlock--;
 29580204            ObjectIndex++;
 205
 29580206            if (_itemsRemainingInBlock == 0)
 207            {
 3350208                byte[] marker = await AvroParser.ReadFixedBytesAsync(_dataStream, 16, async, cancellationToken).Configur
 209
 3350210                BlockOffset = _initialBlockOffset + _dataStream.Position;
 3350211                ObjectIndex = 0;
 212
 3350213                if (!_syncMarker.SequenceEqual(marker))
 214                {
 0215                    throw new ArgumentException("Stream is not a valid Avro file.");
 216                }
 217
 218                try
 219                {
 3350220                    _itemsRemainingInBlock = await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).Confi
 3350221                }
 0222                catch (InvalidOperationException)
 223                {
 224                    // We hit the end of the stream.
 0225                }
 226
 3350227                if (_itemsRemainingInBlock > 0)
 228                {
 229                    // Ignore block size
 3274230                    await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false);
 231                }
 232            }
 233
 29580234            return result;
 29580235        }
 236
 0237        public void Dispose() => Dispose(true);
 238
 239        protected virtual void Dispose(bool disposing)
 240        {
 0241            if (_disposed)
 242            {
 0243                return;
 244            }
 245
 0246            if (disposing)
 247            {
 0248                _dataStream.Dispose();
 0249                _headerStream.Dispose();
 250            }
 251
 0252            _disposed = true;
 0253        }
 254    }
 255}