< Summary

Class:Azure.Storage.Blobs.BlobQuickQueryStream
Assembly:Azure.Storage.Blobs
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs\src\BlobQuickQueryStream.cs
Covered lines:94
Uncovered lines:37
Coverable lines:131
Total lines:327
Line coverage:71.7% (94 of 131)
Covered branches:44
Total branches:58
Branch coverage:75.8% (44 of 58)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-100%100%
Read(...)-100%100%
ReadAsync()-0%100%
ReadInternal()-69.57%75%
ValidateReadParameters(...)-100%100%
ProcessErrorRecord(...)-81.82%60%
get_CanRead()-100%100%
get_CanSeek()-0%100%
get_CanWrite()-0%100%
get_Length()-0%100%
get_Position()-0%100%
set_Position(...)-0%100%
Flush()-0%100%
Seek(...)-0%100%
SetLength(...)-0%100%
Write(...)-0%100%
Dispose(...)-75%75%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs\src\BlobQuickQueryStream.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.IO;
 7using System.Threading.Tasks;
 8using Azure.Core.Pipeline;
 9using Azure.Storage.Internal.Avro;
 10using Azure.Storage.Blobs.Models;
 11using System.Buffers;
 12
 13namespace Azure.Storage.Blobs
 14{
 15    /// <summary>
 16    /// QuickQueryStream.
 17    /// </summary>
 18    internal class BlobQuickQueryStream : Stream
 19    {
 20        /// <summary>
 21        /// Underlying stream.
 22        /// </summary>
 23        internal Stream _avroStream;
 24
 25        /// <summary>
 26        /// Avro Reader.
 27        /// </summary>
 28        internal AvroReader _avroReader;
 29
 30        /// <summary>
 31        /// Buffer to hold bytes we haven't processed yet.
 32        /// </summary>
 33        internal byte[] _buffer;
 34
 35        /// <summary>
 36        /// Current buffer offset.
 37        /// </summary>
 38        internal int _bufferOffset;
 39
 40        /// <summary>
 41        /// The current length of the buffer.
 42        /// </summary>
 43        internal int _bufferLength;
 44
 45        /// <summary>
 46        /// Progress handler.
 47        /// </summary>
 48        internal IProgress<long> _progressHandler;
 49
 50        /// <summary>
 51        /// Error handler.
 52        /// </summary>
 53        internal Action<BlobQueryError> _errorHandler;
 54
 9055        public BlobQuickQueryStream(
 9056            Stream avroStream,
 9057            IProgress<long> progressHandler = default,
 9058            Action<BlobQueryError> errorHandler = default)
 59        {
 9060            _avroStream = avroStream;
 9061            _avroReader = new AvroReader(_avroStream);
 9062            _bufferOffset = 0;
 9063            _bufferLength = 0;
 9064            _progressHandler = progressHandler;
 9065            _errorHandler = errorHandler;
 9066        }
 67
 68        /// <inheritdoc/>
 69        public override int Read(byte[] buffer, int offset, int count)
 8070            => ReadInternal(async: false, buffer, offset, count).EnsureCompleted();
 71
 72        /// <inheritdoc/>
 73        public new async Task<int> ReadAsync(byte[] buffer, int offset, int count)
 074            => await ReadInternal(async: true, buffer, offset, count).ConfigureAwait(false);
 75
 76
 77        // Note - offset is with respect to buffer.
 78        private async Task<int> ReadInternal(bool async, byte[] buffer, int offset, int count)
 79        {
 8080            ValidateReadParameters(buffer, offset, count);
 81
 8082            int remainingBytes = _bufferLength - _bufferOffset;
 83
 84            // We have enough bytes in the buffer and don't need to read the next Record.
 8085            if (count <= remainingBytes)
 86            {
 087                Array.Copy(
 088                    sourceArray: _buffer,
 089                    sourceIndex: _bufferOffset,
 090                    destinationArray: buffer,
 091                    destinationIndex: offset,
 092                    length: count);
 093                _bufferOffset += count;
 094                return count;
 95            }
 96
 97            // Copy remaining buffer
 8098            if (remainingBytes > 0)
 99            {
 0100                Array.Copy(
 0101                    sourceArray: _buffer,
 0102                    sourceIndex: _bufferOffset,
 0103                    destinationArray: buffer,
 0104                    destinationIndex: offset,
 0105                    length: remainingBytes);
 0106                _bufferOffset += remainingBytes;
 0107                return remainingBytes;
 108            }
 109
 110            // Reset _bufferOffset, _bufferLength, and remainingBytes
 80111            _bufferOffset = 0;
 80112            _bufferLength = 0;
 80113            remainingBytes = 0;
 114
 115            // We've caught up to the end of the _avroStream, but it isn't necessarly the end of the stream.
 80116            if (!_avroReader.HasNext())
 117            {
 0118                return 0;
 119            }
 120
 121            // We need to keep getting the next record until we get a data record.
 176122            while (remainingBytes == 0)
 123            {
 124                // Get next Record.
 136125                Dictionary<string, object> record = (Dictionary<string, object>)await _avroReader.Next(async).ConfigureA
 126
 136127                switch (record["$schema"])
 128                {
 129                    // Data Record
 130                    case Constants.QuickQuery.DataRecordName:
 40131                        record.TryGetValue(Constants.QuickQuery.Data, out object byteObject);
 132
 40133                        if (byteObject == null)
 134                        {
 0135                            throw new InvalidOperationException($"Avro data record is missing {Constants.QuickQuery.Data
 136                        }
 137
 40138                        byte[] bytes = (byte[])byteObject;
 139
 140                        // Return the buffer if it is not null and not big enough.
 40141                        if (_buffer != null && _buffer.Length < bytes.Length)
 142                        {
 0143                            ArrayPool<byte>.Shared.Return(_buffer, clearArray: true);
 144                        }
 145
 146                        // Rent a new buffer if it is null or not big enough.
 40147                        if (_buffer == null || _buffer.Length < bytes.Length)
 148                        {
 40149                            _buffer = ArrayPool<byte>.Shared.Rent(Math.Max(4 * Constants.MB, bytes.Length));
 150                        }
 151
 40152                        Array.Copy(
 40153                            sourceArray: bytes,
 40154                            sourceIndex: 0,
 40155                            destinationArray: _buffer,
 40156                            destinationIndex: 0,
 40157                            length: bytes.Length);
 158
 40159                        _bufferLength = bytes.Length;
 160
 161                        // Don't remove this reset, it is used in the final array copy below.
 40162                        remainingBytes = bytes.Length;
 40163                        break;
 164
 165                    // Progress Record
 166                    case Constants.QuickQuery.ProgressRecordName:
 40167                        if (_progressHandler != default)
 168                        {
 8169                            record.TryGetValue(Constants.QuickQuery.BytesScanned, out object progress);
 170
 8171                            if (progress == null)
 172                            {
 0173                                throw new InvalidOperationException($"Avro progress record is mssing {Constants.QuickQue
 174                            }
 175
 8176                            _progressHandler.Report((long)progress);
 177                        }
 8178                        break;
 179
 180                    // Error Record
 181                    case Constants.QuickQuery.ErrorRecordName:
 16182                        ProcessErrorRecord(record);
 16183                        break;
 184
 185                    // End Record
 186                    case Constants.QuickQuery.EndRecordName:
 40187                        if (_progressHandler != default)
 188                        {
 8189                            record.TryGetValue(Constants.QuickQuery.TotalBytes, out object progress);
 190
 8191                            if (progress == null)
 192                            {
 0193                                throw new InvalidOperationException($"Avro end record is missing {Constants.QuickQuery.T
 194                            }
 195
 8196                            _progressHandler.Report((long)progress);
 197                        }
 40198                        return 0;
 199                }
 200            }
 201
 40202            int length = Math.Min(count, remainingBytes);
 40203            Array.Copy(
 40204                sourceArray: _buffer,
 40205                sourceIndex: _bufferOffset,
 40206                destinationArray: buffer,
 40207                destinationIndex: offset,
 40208                length: length);
 209
 40210            _bufferOffset += length;
 40211            return length;
 80212        }
 213
 214
 215        internal static void ValidateReadParameters(byte[] buffer, int offset, int count)
 216        {
 88217            if (buffer == null)
 218            {
 2219                throw new ArgumentNullException($"{nameof(buffer)}", "Parameter cannot be null.");
 220            }
 221
 86222            if (offset < 0)
 223            {
 2224                throw new ArgumentOutOfRangeException($"{nameof(offset)}", "Parameter cannot be negative.");
 225            }
 226
 84227            if (count < 0)
 228            {
 2229                throw new ArgumentOutOfRangeException($"{nameof(count)}", "Parameter cannot be negative.");
 230            }
 231
 82232            if (offset + count > buffer.Length)
 233            {
 2234                throw new ArgumentException($"The sum of {nameof(offset)} and {nameof(count)} cannot be greater than {na
 235            }
 80236        }
 237
 238        internal void ProcessErrorRecord(Dictionary<string, object> record)
 239        {
 18240            record.TryGetValue(Constants.QuickQuery.Fatal, out object fatal);
 18241            record.TryGetValue(Constants.QuickQuery.Name, out object name);
 18242            record.TryGetValue(Constants.QuickQuery.Description, out object description);
 18243            record.TryGetValue(Constants.QuickQuery.Position, out object position);
 244
 18245            if (fatal == null)
 246            {
 0247                throw new InvalidOperationException($"Avro error record is missing {nameof(fatal)} property");
 248            }
 249
 18250            if (name == null)
 251            {
 0252                throw new InvalidOperationException($"Avro error record is missing {nameof(name)} property");
 253            }
 254
 18255            if (description == null)
 256            {
 0257                throw new InvalidOperationException($"Avro error record is missing {nameof(description)} property");
 258            }
 259
 18260            if (position == null)
 261            {
 0262                throw new InvalidOperationException($"Avro error record is missing {nameof(position)} property");
 263            }
 264
 18265            if (_errorHandler != null)
 266            {
 10267                BlobQueryError blobQueryError = new BlobQueryError
 10268                {
 10269                    IsFatal = (bool)fatal,
 10270                    Name = (string)name,
 10271                    Description = (string)description,
 10272                    Position = (long)position
 10273                };
 10274                _errorHandler(blobQueryError);
 275            }
 18276        }
 277
 278        /// <inheritdoc/>
 120279        public override bool CanRead => true;
 280
 281        /// <inheritdoc/>
 0282        public override bool CanSeek => false;
 283
 284        /// <inheritdoc/>
 0285        public override bool CanWrite => false;
 286
 287        /// <inheritdoc/>
 0288        public override long Length => throw new NotSupportedException();
 289
 290        /// <inheritdoc/>
 291        public override long Position
 292        {
 0293            get => throw new NotSupportedException();
 0294            set => throw new NotSupportedException();
 295        }
 296
 297        /// <inheritdoc/>
 0298        public override void Flush() => throw new NotSupportedException();
 299
 300        /// <inheritdoc/>
 0301        public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
 302
 303        /// <inheritdoc/>
 0304        public override void SetLength(long value) => throw new NotSupportedException();
 305
 306        /// <inheritdoc/>
 0307        public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
 308
 309        /// <inheritdoc/>
 310        protected override void Dispose(bool disposing)
 311        {
 312            // Return the buffer to the pool if we're called from Dispose or a finalizer
 40313            if (_buffer != null)
 314            {
 40315                ArrayPool<byte>.Shared.Return(_buffer, clearArray: true);
 40316                _buffer = null;
 317            }
 318
 40319            _avroStream.Dispose();
 40320            if (_buffer != null)
 321            {
 0322                ArrayPool<byte>.Shared.Return(_buffer, clearArray: true);
 0323                _buffer = null;
 324            }
 40325        }
 326    }
 327}