< Summary

Class:Azure.Storage.Internal.Avro.AvroType
Assembly:Azure.Storage.Blobs
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Internal.Avro\src\AvroParser.cs
Covered lines:25
Uncovered lines:20
Coverable lines:45
Total lines:465
Line coverage:55.5% (25 of 45)
Covered branches:26
Total branches:114
Branch coverage:22.8% (26 of 114)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
FromSchema(...)-90%75%
FromStringSchema(...)-61.54%39.47%
FromArraySchema(...)-100%100%
FromObjectSchema(...)-33.33%11.11%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Internal.Avro\src\AvroParser.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.IO;
 7using System.Text;
 8using System.Text.Json;
 9using System.Threading;
 10using System.Threading.Tasks;
 11
 12#pragma warning disable SA1402 // File may only contain a single type
 13
 14namespace Azure.Storage.Internal.Avro
 15{
 16    internal static class AvroParser
 17    {
 18        /// <summary>
 19        /// Reads a fixed number of bytes from the stream.
 20        /// The number of bytes to return is the first int read from the stream.
 21        /// </summary>
 22        /// <remarks>
 23        /// Note that in the Avro spec, byte array length is specified as a long.
 24        /// This is fine for Quick Query and Change Feed, but could become a problem
 25        /// in the future.
 26        /// </remarks>
 27        public static async Task<byte[]> ReadFixedBytesAsync(
 28            Stream stream,
 29            int length,
 30            bool async,
 31            CancellationToken cancellationToken)
 32        {
 33            byte[] data = new byte[length];
 34            int start = 0;
 35            while (length > 0)
 36            {
 37                int n = async ?
 38                    await stream.ReadAsync(data, start, length, cancellationToken).ConfigureAwait(false) :
 39                    stream.Read(data, start, length);
 40                start += n;
 41                length -= n;
 42
 43                // We hit the end of the stream
 44                if (n <= 0)
 45                    return data;
 46            }
 47            return data;
 48        }
 49
 50        /// <summary>
 51        /// Reads a single byte from the stream.
 52        /// </summary>
 53        private static async Task<byte> ReadByteAsync(
 54            Stream stream,
 55            bool async,
 56            CancellationToken cancellationToken)
 57        {
 58            byte[] bytes = await ReadFixedBytesAsync(stream, 1, async, cancellationToken).ConfigureAwait(false);
 59            return bytes[0];
 60        }
 61
 62        /// <summary>
 63        /// Internal implementation of ReadLongAsync().
 64        /// </summary>
 65        private static async Task<long> ReadZigZagLongAsync(
 66            Stream stream,
 67            bool async,
 68            CancellationToken cancellationToken)
 69        {
 70            byte b = await ReadByteAsync(stream, async, cancellationToken).ConfigureAwait(false);
 71            ulong next = b & 0x7FUL;
 72            int shift = 7;
 73            while ((b & 0x80) != 0)
 74            {
 75                b = await ReadByteAsync(stream, async, cancellationToken).ConfigureAwait(false);
 76                next |= (b & 0x7FUL) << shift;
 77                shift += 7;
 78            }
 79            long value = (long)next;
 80            return (-(value & 0x01L)) ^ ((value >> 1) & 0x7fffffffffffffffL);
 81        }
 82
 83        /// <summary>
 84        /// Returns null.
 85        /// </summary>
 86        public static Task<object> ReadNullAsync() => Task.FromResult<object>(null);
 87
 88        /// <summary>
 89        /// Reads a bool from the stream.
 90        /// </summary>
 91        public static async Task<bool> ReadBoolAsync(
 92            Stream stream,
 93            bool async,
 94            CancellationToken cancellationToken)
 95        {
 96            byte b = await ReadByteAsync(stream, async, cancellationToken).ConfigureAwait(false);
 97
 98            if (b != 0)
 99                return true;
 100            return false;
 101        }
 102
 103        /// <summary>
 104        /// Reads a long from the stream.
 105        /// </summary>
 106        public static async Task<long> ReadLongAsync(
 107            Stream stream,
 108            bool async,
 109            CancellationToken cancellationToken) =>
 110            await ReadZigZagLongAsync(stream, async, cancellationToken).ConfigureAwait(false);
 111
 112        /// <summary>
 113        /// Reads an int from the stream.
 114        /// </summary>
 115        public static async Task<int> ReadIntAsync(
 116            Stream stream,
 117            bool async,
 118            CancellationToken cancellationToken) =>
 119            (int)(await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false));
 120
 121        /// <summary>
 122        /// Reads a float from the stream.
 123        /// </summary>
 124        public static async Task<float> ReadFloatAsync(
 125            Stream stream,
 126            bool async,
 127            CancellationToken cancellationToken)
 128        {
 129            byte[] bytes = await ReadFixedBytesAsync(stream, 4, async, cancellationToken).ConfigureAwait(false);
 130            return BitConverter.ToSingle(bytes, 0);
 131        }
 132
 133        /// <summary>
 134        /// Reads a double from the stream.
 135        /// </summary>
 136        public static async Task<double> ReadDoubleAsync(
 137            Stream stream,
 138            bool async,
 139            CancellationToken cancellationToken)
 140        {
 141            byte[] bytes = await ReadFixedBytesAsync(stream, 8, async, cancellationToken).ConfigureAwait(false);
 142            return BitConverter.ToDouble(bytes, 0);
 143        }
 144
 145        /// <summary>
 146        /// Reads a fixed number of bytes from the stream.
 147        /// </summary>
 148        public static async Task<byte[]> ReadBytesAsync(
 149            Stream stream,
 150            bool async,
 151            CancellationToken cancellationToken)
 152        {
 153            // Note that byte array length is actually defined as a long in the Avro spec.
 154            // This is fine for now, but may need to be changed in the future.
 155            int size = await ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(false);
 156            return await ReadFixedBytesAsync(stream, size, async, cancellationToken).ConfigureAwait(false);
 157        }
 158
 159        /// <summary>
 160        /// Reads a string from the stream.
 161        /// </summary>
 162        public static async Task<string> ReadStringAsync(
 163            Stream stream,
 164            bool async,
 165            CancellationToken cancellationToken)
 166        {
 167            byte[] bytes = await ReadBytesAsync(stream, async, cancellationToken).ConfigureAwait(false);
 168            return Encoding.UTF8.GetString(bytes);
 169        }
 170
 171        /// <summary>
 172        /// Reads a KeyValuePair from the stream.
 173        /// Used in ReadMapAsync().
 174        /// </summary>
 175        private static async Task<KeyValuePair<string, T>> ReadMapPairAsync<T>(
 176            Stream stream,
 177            Func<Stream, bool, CancellationToken, Task<T>> parseItemAsync,
 178            bool async,
 179            CancellationToken cancellationToken)
 180        {
 181            string key = await ReadStringAsync(stream, async, cancellationToken).ConfigureAwait(false);
 182            #pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 183            T value = await parseItemAsync(stream, async, cancellationToken).ConfigureAwait(false);
 184            #pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 185            return new KeyValuePair<string, T>(key, value);
 186        }
 187
 188        /// <summary>
 189        /// Reads a map from the stream.
 190        /// </summary>
 191        public static async Task<Dictionary<string, T>> ReadMapAsync<T>(
 192            Stream stream,
 193            Func<Stream, bool, CancellationToken, Task<T>> parseItemAsync,
 194            bool async,
 195            CancellationToken cancellationToken)
 196        {
 197            Func<Stream, bool, CancellationToken, Task<KeyValuePair<string, T>>> parsePair =
 198                async (s, async, cancellationToken) => await ReadMapPairAsync(s, parseItemAsync, async, cancellationToke
 199            IEnumerable<KeyValuePair<string, T>> entries =
 200                await ReadArrayAsync(stream, parsePair, async, cancellationToken).ConfigureAwait(false);
 201            return entries.ToDictionary();
 202        }
 203
 204        /// <summary>
 205        /// Reads an array of objects from the stream.
 206        /// </summary>
 207        private static async Task<IEnumerable<T>> ReadArrayAsync<T>(
 208            Stream stream,
 209            Func<Stream, bool, CancellationToken, Task<T>> parseItemAsync,
 210            bool async,
 211            CancellationToken cancellationToken)
 212        {
 213            // TODO: This is unpleasant, but I don't want to switch everything to IAsyncEnumerable for every array
 214            List<T> items = new List<T>();
 215            for (long length = await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false);
 216                 length != 0;
 217                 length = await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false))
 218            {
 219                // Ignore block sizes because we're not skipping anything
 220                if (length < 0)
 221                {
 222                    await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false);
 223                    length = -length;
 224                }
 225                while (length-- > 0)
 226                {
 227                    #pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 228                    T item = await parseItemAsync(stream, async, cancellationToken).ConfigureAwait(false);
 229                    #pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 230                    items.Add(item);
 231                };
 232            }
 233            return items;
 234        }
 235
 236        /// <summary>
 237        /// Adds the select to each element in the array.
 238        /// </summary>
 239        internal static List<T> Map<T>(this JsonElement array, Func<JsonElement, T> selector)
 240        {
 241            var values = new List<T>();
 242            foreach (JsonElement element in array.EnumerateArray())
 243            {
 244                values.Add(selector(element));
 245            }
 246            return values;
 247        }
 248
 249        /// <summary>
 250        /// Converts an IEnumerable of KeyValuePair into a Dictionary.
 251        /// </summary>
 252        internal static Dictionary<string, T> ToDictionary<T>(this IEnumerable<KeyValuePair<string, T>> values)
 253        {
 254            Dictionary<string, T> dict = new Dictionary<string, T>();
 255            foreach (KeyValuePair<string, T> pair in values)
 256            {
 257                dict[pair.Key] = pair.Value;
 258            }
 259            return dict;
 260        }
 261    }
 262
 263    /// <summary>
 264    /// Parent class of AvroTypes.
 265    /// </summary>
 266    internal abstract class AvroType
 267    {
 268        /// <summary>
 269        /// Reads an object from the stream.
 270        /// </summary>
 271        public abstract Task<object> ReadAsync(
 272            Stream stream,
 273            bool async,
 274            CancellationToken cancellationToken);
 275
 276        /// <summary>
 277        /// Determinds the AvroType from the Avro Schema.
 278        /// </summary>
 279        public static AvroType FromSchema(JsonElement schema)
 280        {
 520281            return schema.ValueKind switch
 520282            {
 520283                // Primitives
 840284                JsonValueKind.String => FromStringSchema(schema),
 520285                // Union types
 560286                JsonValueKind.Array => FromArraySchema(schema),
 520287                // Everything else
 680288                JsonValueKind.Object => FromObjectSchema(schema),
 0289                _ => throw new InvalidOperationException($"Unexpected JSON Element: {schema}"),
 520290            };
 291        }
 292
 293        private static AvroType FromStringSchema(JsonElement schema)
 294        {
 320295            string type = schema.GetString();
 320296            return type switch
 320297            {
 0298                AvroConstants.Null => new AvroPrimitiveType { Primitive = AvroPrimitive.Null },
 360299                AvroConstants.Boolean => new AvroPrimitiveType { Primitive = AvroPrimitive.Boolean },
 0300                AvroConstants.Int => new AvroPrimitiveType { Primitive = AvroPrimitive.Int },
 480301                AvroConstants.Long => new AvroPrimitiveType { Primitive = AvroPrimitive.Long },
 0302                AvroConstants.Float => new AvroPrimitiveType { Primitive = AvroPrimitive.Float },
 0303                AvroConstants.Double => new AvroPrimitiveType { Primitive = AvroPrimitive.Double },
 360304                AvroConstants.Bytes => new AvroPrimitiveType { Primitive = AvroPrimitive.Bytes },
 400305                AvroConstants.String => new AvroPrimitiveType { Primitive = AvroPrimitive.String },
 0306                _ => throw new InvalidOperationException($"Unexpected Avro type {type} in {schema}"),
 320307            };
 308        }
 309
 310        private static AvroType FromArraySchema(JsonElement schema)
 311        {
 40312            return new AvroUnionType { Types = schema.Map(FromSchema) };
 313        }
 314
 315        private static AvroType FromObjectSchema(JsonElement schema)
 316        {
 160317            string type = schema.GetProperty("type").GetString();
 318            switch (type)
 319            {
 320                // Primitives can be defined as strings or objects
 321                case AvroConstants.Null:
 0322                    return new AvroPrimitiveType { Primitive = AvroPrimitive.Null };
 323                case AvroConstants.Boolean:
 0324                    return new AvroPrimitiveType { Primitive = AvroPrimitive.Boolean };
 325                case AvroConstants.Int:
 0326                    return new AvroPrimitiveType { Primitive = AvroPrimitive.Int };
 327                case AvroConstants.Long:
 0328                    return new AvroPrimitiveType { Primitive = AvroPrimitive.Long };
 329                case AvroConstants.Float:
 0330                    return new AvroPrimitiveType { Primitive = AvroPrimitive.Float };
 331                case AvroConstants.Double:
 0332                    return new AvroPrimitiveType { Primitive = AvroPrimitive.Double };
 333                case AvroConstants.Bytes:
 0334                    return new AvroPrimitiveType { Primitive = AvroPrimitive.Bytes };
 335                case AvroConstants.String:
 0336                    return new AvroPrimitiveType { Primitive = AvroPrimitive.String };
 337                case AvroConstants.Record:
 160338                    if (schema.TryGetProperty(AvroConstants.Aliases, out var _))
 0339                        throw new InvalidOperationException($"Unexpected aliases on {schema}");
 160340                    string name = schema.GetProperty(AvroConstants.Name).GetString();
 160341                    Dictionary<string, AvroType> fields = new Dictionary<string, AvroType>();
 960342                    foreach (JsonElement field in schema.GetProperty(AvroConstants.Fields).EnumerateArray())
 343                    {
 320344                        fields[field.GetProperty(AvroConstants.Name).GetString()] = FromSchema(field.GetProperty(AvroCon
 345                    }
 160346                    return new AvroRecordType { Schema = name, Fields = fields };
 347                case AvroConstants.Enum:
 0348                    if (schema.TryGetProperty(AvroConstants.Aliases, out var _))
 0349                        throw new InvalidOperationException($"Unexpected aliases on {schema}");
 0350                    return new AvroEnumType { Symbols = schema.GetProperty(AvroConstants.Symbols).Map(s => s.GetString()
 351                case AvroConstants.Map:
 0352                    return new AvroMapType { ItemType = FromSchema(schema.GetProperty(AvroConstants.Values)) };
 353                case AvroConstants.Array: // Unused today
 354                case AvroConstants.Union: // Unused today
 355                case AvroConstants.Fixed: // Unused today
 356                default:
 0357                    throw new InvalidOperationException($"Unexpected Avro type {type} in {schema}");
 358            }
 359        }
 360    }
 361
 362    internal enum AvroPrimitive { Null, Boolean, Int, Long, Float, Double, Bytes, String };
 363
 364    /// <summary>
 365    /// AvroPrimativeType.
 366    /// </summary>
 367    internal class AvroPrimitiveType : AvroType
 368    {
 369        public AvroPrimitive Primitive { get; set; }
 370
 371        public override async Task<object> ReadAsync(
 372            Stream stream,
 373            bool async,
 374            CancellationToken cancellationToken) =>
 375            Primitive switch
 376            {
 377                #pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 378                AvroPrimitive.Null => await AvroParser.ReadNullAsync().ConfigureAwait(false),
 379                #pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope.
 380                AvroPrimitive.Boolean => await AvroParser.ReadBoolAsync(stream, async, cancellationToken).ConfigureAwait
 381                AvroPrimitive.Int => await AvroParser.ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(fals
 382                AvroPrimitive.Long => await AvroParser.ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(fa
 383                AvroPrimitive.Float => await AvroParser.ReadFloatAsync(stream, async, cancellationToken).ConfigureAwait(
 384                AvroPrimitive.Double => await AvroParser.ReadDoubleAsync(stream, async, cancellationToken).ConfigureAwai
 385                AvroPrimitive.Bytes => await AvroParser.ReadBytesAsync(stream, async, cancellationToken).ConfigureAwait(
 386                AvroPrimitive.String => await AvroParser.ReadStringAsync(stream, async, cancellationToken).ConfigureAwai
 387                _ => throw new InvalidOperationException("Unknown Avro Primitive!")
 388            };
 389    }
 390
 391    /// <summary>
 392    /// AvroEnumType.
 393    /// </summary>
 394    internal class AvroEnumType : AvroType
 395    {
 396        public IReadOnlyList<string> Symbols { get; set; }
 397
 398        public override async Task<object> ReadAsync(
 399            Stream stream,
 400            bool async,
 401            CancellationToken cancellationToken)
 402        {
 403            int value = await AvroParser.ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(false);
 404            return Symbols[value];
 405        }
 406    }
 407
 408    /// <summary>
 409    /// AvroUnionType.
 410    /// </summary>
 411    internal class AvroUnionType : AvroType
 412    {
 413        public IReadOnlyList<AvroType> Types { get; set; }
 414
 415        public override async Task<object> ReadAsync(
 416            Stream stream,
 417            bool async,
 418            CancellationToken cancellationToken)
 419        {
 420            int option = await AvroParser.ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(false);
 421            return await Types[option].ReadAsync(stream, async, cancellationToken).ConfigureAwait(false);
 422        }
 423    }
 424
 425    /// <summary>
 426    /// AvroMapType.
 427    /// </summary>
 428    internal class AvroMapType : AvroType
 429    {
 430        public AvroType ItemType { get; set; }
 431
 432        public override async Task<object> ReadAsync(
 433            Stream stream,
 434            bool async,
 435            CancellationToken cancellationToken)
 436        {
 437            Func<Stream, bool, CancellationToken, Task<object>> parseItemAsync =
 438                async (s, a, c) => await ItemType.ReadAsync(s, a, c).ConfigureAwait(false);
 439            return await AvroParser.ReadMapAsync(stream, parseItemAsync, async, cancellationToken).ConfigureAwait(false)
 440        }
 441    }
 442
 443    /// <summary>
 444    /// AvroRecordType.
 445    /// </summary>
 446    internal class AvroRecordType : AvroType
 447    {
 448        public string Schema { get; set; }
 449        public IReadOnlyDictionary<string, AvroType> Fields { get; set; }
 450
 451        public override async Task<object> ReadAsync(
 452            Stream stream,
 453            bool async,
 454            CancellationToken cancellationToken)
 455        {
 456            Dictionary<string, object> record = new Dictionary<string, object>();
 457            record["$schema"] = Schema;
 458            foreach (KeyValuePair<string, AvroType> field in Fields)
 459            {
 460                record[field.Key] = await field.Value.ReadAsync(stream, async, cancellationToken).ConfigureAwait(false);
 461            }
 462            return record;
 463        }
 464    }
 465}