< Summary

Class:Azure.Storage.Internal.Avro.AvroReader
Assembly:Azure.Storage.Internal.Avro
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Internal.Avro\src\AvroReader.cs
Covered lines:36
Uncovered lines:44
Coverable lines:80
Total lines:255
Line coverage:45% (36 of 80)
Covered branches:19
Total branches:38
Branch coverage:50% (19 of 38)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_BlockOffset()-100%100%
get_ObjectIndex()-100%100%
.ctor(...)-100%100%
.ctor(...)-0%0%
.ctor()-0%100%
Initalize()-75%68.75%
HasNext()-100%50%
Next()-42.86%50%
Dispose()-0%100%
Dispose(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Internal.Avro\src\AvroReader.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.IO;
 7using System.Linq;
 8using System.Threading.Tasks;
 9using System.Text.Json;
 10using System.Threading;
 11
 12namespace Azure.Storage.Internal.Avro
 13{
 14    internal class AvroReader : IDisposable
 15    {
 16        /// <summary>
 17        /// Stream containing the body of the Avro file.
 18        /// </summary>
 19        private readonly Stream _dataStream;
 20
 21        /// <summary>
 22        /// Stream containing the header of the Avro file.
 23        /// </summary>
 24        private readonly Stream _headerStream;
 25
 26        /// <summary>
 27        /// Sync marker.
 28        /// </summary>
 29        private byte[] _syncMarker;
 30
 31        /// <summary>
 32        /// Avro metadata.
 33        /// </summary>
 34        private Dictionary<string, string> _metadata;
 35
 36        /// <summary>
 37        /// Avro schema.
 38        /// </summary>
 39        private AvroType _itemType;
 40
 41        /// <summary>
 42        /// The number of items remaining in the current block.
 43        /// </summary>
 44        private long _itemsRemainingInBlock;
 45
 46        /// <summary>
 47        /// The byte offset within the Avro file (both header and data)
 48        /// of the start of the current block.
 49        /// </summary>
 14450        public virtual long BlockOffset { get; private set; }
 51
 52        /// <summary>
 53        /// The index of the current object within the current block.
 54        /// </summary>
 55        /// <returns></returns>
 14456        public virtual long ObjectIndex { get; private set; }
 57
 58        /// <summary>
 59        /// If this Avro Reader has been initalized.
 60        /// </summary>
 61        private bool _initalized;
 62
 63        /// <summary>
 64        /// To detect redundant calls
 65        /// </summary>
 66        private bool _disposed = false;
 67
 68        /// <summary>
 69        /// Remembers where we started if partial data stream was provided.
 70        /// </summary>
 71        private readonly long _initialBlockOffset;
 72
 73        /// <summary>
 74        /// Constructor for an AvroReader that will read from the
 75        /// beginning of an Avro file.
 76        /// </summary>
 4877        public AvroReader(Stream dataStream)
 78        {
 4879            if (dataStream.CanSeek)
 80            {
 2481                _dataStream = dataStream;
 2482                _headerStream = dataStream;
 83            }
 84            else
 85            {
 2486                _dataStream = new StreamWithPosition(dataStream);
 2487                _headerStream = _dataStream;
 88            }
 89
 4890            _metadata = new Dictionary<string, string>();
 4891            _initalized = false;
 4892        }
 93
 94        /// <summary>
 95        /// Constructor for an Avro Reader that will read beginning
 96        /// in the middle of an Avro file.
 97        /// </summary>
 098        public AvroReader(
 099            Stream dataStream,
 0100            Stream headerStream,
 0101            long currentBlockOffset,
 0102            long indexWithinCurrentBlock)
 103        {
 0104            if (dataStream.CanSeek)
 105            {
 0106                _dataStream = dataStream;
 107            }
 108            else
 109            {
 0110                _dataStream = new StreamWithPosition(dataStream);
 111            }
 112
 0113            if (headerStream.CanSeek)
 114            {
 0115                _headerStream = headerStream;
 116            }
 117            else
 118            {
 0119                _headerStream = new StreamWithPosition(headerStream);
 120            }
 121
 0122            _metadata = new Dictionary<string, string>();
 0123            _initalized = false;
 0124            _initialBlockOffset = currentBlockOffset;
 0125            BlockOffset = currentBlockOffset;
 0126            ObjectIndex = indexWithinCurrentBlock;
 0127            _initalized = false;
 0128        }
 129
 130        /// <summary>
 131        /// Constructor for mocking.  Do not use.
 132        /// </summary>
 0133        public AvroReader() { }
 134
 135        public virtual async Task Initalize(bool async, CancellationToken cancellationToken = default)
 136        {
 137            // Four bytes, ASCII 'O', 'b', 'j', followed by 1.
 48138            byte[] header = await AvroParser.ReadFixedBytesAsync(_headerStream, AvroConstants.InitBytes.Length, async, c
 48139            if (!header.SequenceEqual(AvroConstants.InitBytes))
 140            {
 0141                throw new ArgumentException("Stream is not an Avro file.");
 142            }
 143
 144            // File metadata is written as if defined by the following map schema:
 145            // { "type": "map", "values": "bytes"}
 48146            _metadata = await AvroParser.ReadMapAsync(_headerStream, AvroParser.ReadStringAsync, async, cancellationToke
 147
 148            // Validate codec
 48149            _metadata.TryGetValue(AvroConstants.CodecKey, out string codec);
 48150            if (!(codec == null || codec == "null"))
 151            {
 0152                throw new ArgumentException("Codecs are not supported");
 153            }
 154
 155            // The 16-byte, randomly-generated sync marker for this file.
 48156            _syncMarker = await AvroParser.ReadFixedBytesAsync(_headerStream, AvroConstants.SyncMarkerSize, async, cance
 157
 158            // Parse the schema
 48159            using JsonDocument schema = JsonDocument.Parse(_metadata[AvroConstants.SchemaKey]);
 48160            _itemType = AvroType.FromSchema(schema.RootElement);
 161
 48162            if (BlockOffset == 0)
 163            {
 48164                BlockOffset = _initialBlockOffset + _dataStream.Position;
 165            }
 166
 167            // Populate _itemsRemainingInCurrentBlock
 48168            _itemsRemainingInBlock = await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwai
 169
 170            // skip block length
 48171            await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false);
 172
 48173            _initalized = true;
 174
 48175            if (ObjectIndex > 0)
 176            {
 0177                for (int i = 0; i < ObjectIndex; i++)
 178                {
 0179                    await _itemType.ReadAsync(_dataStream, async, cancellationToken).ConfigureAwait(false);
 0180                    _itemsRemainingInBlock--;
 181                }
 182            }
 48183        }
 184
 48185        public virtual bool HasNext() => !_initalized || _itemsRemainingInBlock > 0;
 186
 187        public virtual async Task<object> Next(bool async, CancellationToken cancellationToken = default)
 188        {
 189            // Initialize AvroReader, if necessary.
 48190            if (!_initalized)
 191            {
 48192                await Initalize(async, cancellationToken).ConfigureAwait(false);
 193            }
 194
 48195            if (!HasNext())
 196            {
 0197                throw new ArgumentException("There are no more items in the stream");
 198            }
 199
 200
 48201            object result = await _itemType.ReadAsync(_dataStream, async, cancellationToken).ConfigureAwait(false);
 202
 48203            _itemsRemainingInBlock--;
 48204            ObjectIndex++;
 205
 48206            if (_itemsRemainingInBlock == 0)
 207            {
 0208                byte[] marker = await AvroParser.ReadFixedBytesAsync(_dataStream, 16, async, cancellationToken).Configur
 209
 0210                BlockOffset = _initialBlockOffset + _dataStream.Position;
 0211                ObjectIndex = 0;
 212
 0213                if (!_syncMarker.SequenceEqual(marker))
 214                {
 0215                    throw new ArgumentException("Stream is not a valid Avro file.");
 216                }
 217
 218                try
 219                {
 0220                    _itemsRemainingInBlock = await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).Confi
 0221                }
 0222                catch (InvalidOperationException)
 223                {
 224                    // We hit the end of the stream.
 0225                }
 226
 0227                if (_itemsRemainingInBlock > 0)
 228                {
 229                    // Ignore block size
 0230                    await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false);
 231                }
 232            }
 233
 48234            return result;
 48235        }
 236
 0237        public void Dispose() => Dispose(true);
 238
 239        protected virtual void Dispose(bool disposing)
 240        {
 0241            if (_disposed)
 242            {
 0243                return;
 244            }
 245
 0246            if (disposing)
 247            {
 0248                _dataStream.Dispose();
 0249                _headerStream.Dispose();
 250            }
 251
 0252            _disposed = true;
 0253        }
 254    }
 255}