< Summary

Class:Azure.Messaging.EventGrid.EventGridConsumer
Assembly:Azure.Messaging.EventGrid
File(s):C:\Git\azure-sdk-for-net\sdk\eventgrid\Azure.Messaging.EventGrid\src\Customization\EventGridConsumer.cs
Covered lines:88
Uncovered lines:17
Coverable lines:105
Total lines:295
Line coverage:83.8% (88 of 105)
Covered branches:39
Total branches:54
Branch coverage:72.2% (39 of 54)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor()-100%100%
.ctor(...)-100%100%
DeserializeEventGridEvents(...)-100%100%
DeserializeEventGridEventsAsync()-0%100%
DeserializeEventGridEventsInternal()-96.67%92.86%
DeserializeCloudEvents(...)-100%100%
DeserializeCloudEventsAsync()-0%100%
DeserializeCloudEventsInternal()-83.78%70%
TryGetPrimitiveFromJsonElement(...)-53.33%56.25%
ParseJsonToDocument()-80%50%
SerializePayloadToStream(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventgrid\Azure.Messaging.EventGrid\src\Customization\EventGridConsumer.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.IO;
 7using System.Text;
 8using System.Text.Json;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Azure.Core;
 12using Azure.Core.Pipeline;
 13using Azure.Core.Serialization;
 14using Azure.Messaging.EventGrid.Models;
 15
 16namespace Azure.Messaging.EventGrid
 17{
 18    /// <summary>
 19    /// Class used to decode events from the Event Grid service.
 20    /// </summary>
 21    public class EventGridConsumer
 22    {
 23        private readonly ObjectSerializer _dataSerializer;
 24        private readonly IDictionary<string, Type> _customEventTypeMappings;
 25
 26        /// <summary>
 27        /// Initializes a new instance of the <see cref="EventGridConsumer"/> class.
 28        /// </summary>
 29        public EventGridConsumer() :
 630            this(new EventGridConsumerOptions())
 31        {
 632        }
 33
 34        /// <summary>
 35        /// Initializes a new instance of the <see cref="EventGridConsumer"/> class.
 36        /// </summary>
 37        /// <param name="options"> Options for configuring deserialization. </param>
 1838        public EventGridConsumer(EventGridConsumerOptions options)
 39        {
 1840            Argument.AssertNotNull(options, nameof(options));
 1841            _dataSerializer = options.DataSerializer;
 1842            _customEventTypeMappings = new Dictionary<string, Type>();
 6043            foreach (KeyValuePair<string, Type> kvp in options.CustomEventTypeMappings)
 44            {
 1245                _customEventTypeMappings.Add(kvp);
 46            }
 1847        }
 48
 49        /// <summary>
 50        /// Deserializes JSON encoded events and returns an array of events encoded in the EventGrid event schema.
 51        /// </summary>
 52        /// <param name="requestContent">
 53        /// The JSON encoded representation of either a single event or an array or events, encoded in the EventGrid eve
 54        /// </param>
 55        /// <param name="cancellationToken"> The cancellation token to use. </param>
 56        /// <returns>A list of EventGrid Events.</returns>
 57        public virtual EventGridEvent[] DeserializeEventGridEvents(string requestContent, CancellationToken cancellation
 16858            => DeserializeEventGridEventsInternal(requestContent, false /*async*/, cancellationToken).EnsureCompleted();
 59
 60        /// <summary>
 61        /// Deserializes JSON encoded events and returns an array of events encoded in the EventGrid event schema.
 62        /// </summary>
 63        /// <param name="requestContent">
 64        /// The JSON encoded representation of either a single event or an array or events, encoded in the EventGrid eve
 65        /// </param>
 66        /// <param name="cancellationToken"> The cancellation token to use. </param>
 67        /// <returns>A list of EventGrid Events.</returns>
 68        public virtual async Task<EventGridEvent[]> DeserializeEventGridEventsAsync(string requestContent, CancellationT
 069            => await DeserializeEventGridEventsInternal(requestContent, true /*async*/, cancellationToken).ConfigureAwai
 70
 71        private async Task<EventGridEvent[]> DeserializeEventGridEventsInternal(string requestContent, bool async, Cance
 72        {
 16873            List<EventGridEventInternal> egInternalEvents = new List<EventGridEventInternal>();
 16874            List<EventGridEvent> egEvents = new List<EventGridEvent>();
 75
 76            // Deserialize raw JSON string into separate events, deserialize event envelope properties
 16877            JsonDocument requestDocument = await ParseJsonToDocument(requestContent, async, cancellationToken).Configure
 68078            foreach (JsonElement property in requestDocument.RootElement.EnumerateArray())
 79            {
 17280                egInternalEvents.Add(EventGridEventInternal.DeserializeEventGridEventInternal(property));
 81            }
 82
 83            // Deserialize 'Data' property from JsonElement for each event
 68084            foreach (EventGridEventInternal egEventInternal in egInternalEvents)
 85            {
 17286                JsonElement dataElement = egEventInternal.Data;
 17287                object egEventData = null;
 88
 89                // Reserialize JsonElement to stream
 17290                MemoryStream dataStream = SerializePayloadToStream(dataElement, cancellationToken);
 91
 92                // First, let's attempt to find the mapping for the event type in the custom event mapping.
 17293                if (_customEventTypeMappings.TryGetValue(egEventInternal.EventType, out Type typeOfEventData))
 94                {
 895                    if (!TryGetPrimitiveFromJsonElement(dataElement, out egEventData))
 96                    {
 497                        if (async)
 98                        {
 099                            egEventData = await _dataSerializer.DeserializeAsync(dataStream, typeOfEventData, cancellati
 100                        }
 101                        else
 102                        {
 4103                            egEventData = _dataSerializer.Deserialize(dataStream, typeOfEventData, cancellationToken);
 104                        }
 105                    }
 106                }
 107                // If a custom mapping doesn't exist, let's attempt to find the mapping for the deserialization function
 164108                else if (SystemEventTypeMappings.SystemEventDeserializers.TryGetValue(egEventInternal.EventType, out Fun
 109                {
 160110                    egEventData = systemDeserializationFunction(dataElement);
 111                }
 112                else
 113                {
 114                    // If event data is not a primitive/string, return as BinaryData
 4115                    if (!TryGetPrimitiveFromJsonElement(dataElement, out egEventData))
 116                    {
 2117                        egEventData = BinaryData.FromStream(dataStream);
 118                    }
 119                }
 120
 172121                egEvents.Add(new EventGridEvent(
 172122                    egEventInternal.Subject,
 172123                    egEventData,
 172124                    egEventInternal.EventType,
 172125                    egEventInternal.DataVersion)
 172126                {
 172127                    Id = egEventInternal.Id,
 172128                    EventTime = egEventInternal.EventTime
 172129                });
 172130            }
 131
 168132            return egEvents.ToArray();
 168133        }
 134
 135        /// <summary>
 136        /// Deserializes JSON encoded events and returns an array of events encoded in the CloudEvent schema.
 137        /// </summary>
 138        /// <param name="requestContent">
 139        /// The JSON encoded representation of either a single event or an array or events, encoded in the CloudEvent sc
 140        /// </param>
 141        /// <param name="cancellationToken"> The cancellation token to use. </param>
 142        /// <returns>A list of CloudEvents.</returns>
 143        public virtual CloudEvent[] DeserializeCloudEvents(string requestContent, CancellationToken cancellationToken = 
 10144            => DeserializeCloudEventsInternal(requestContent, false /*async*/, cancellationToken).EnsureCompleted();
 145
 146        /// <summary>
 147        /// Deserializes JSON encoded events and returns an array of events encoded in the CloudEvent schema.
 148        /// </summary>
 149        /// <param name="requestContent">
 150        /// The JSON encoded representation of either a single event or an array or events, encoded in the CloudEvent sc
 151        /// </param>
 152        /// <param name="cancellationToken"> The cancellation token to use. </param>
 153        /// <returns>A list of CloudEvents.</returns>
 154        public virtual async Task<CloudEvent[]> DeserializeCloudEventsAsync(string requestContent, CancellationToken can
 0155            => await DeserializeCloudEventsInternal(requestContent, true /*async*/, cancellationToken).ConfigureAwait(fa
 156
 157        private async Task<CloudEvent[]> DeserializeCloudEventsInternal(string requestContent, bool async, CancellationT
 158        {
 10159            List<CloudEventInternal> cloudEventsInternal = new List<CloudEventInternal>();
 10160            List<CloudEvent> cloudEvents = new List<CloudEvent>();
 161
 162            // Deserialize raw JSON string into separate events, deserialize event envelope properties
 10163            JsonDocument requestDocument = await ParseJsonToDocument(requestContent, async, cancellationToken).Configure
 48164            foreach (JsonElement property in requestDocument.RootElement.EnumerateArray())
 165            {
 14166                cloudEventsInternal.Add(CloudEventInternal.DeserializeCloudEventInternal(property));
 167            }
 168
 169            // Deserialize 'Data' property from JsonElement for each event
 48170            foreach (CloudEventInternal cloudEventInternal in cloudEventsInternal)
 171            {
 14172                object cloudEventData = null;
 14173                if (cloudEventInternal.DataBase64 != null)
 174                {
 2175                    cloudEventData = Convert.FromBase64String(cloudEventInternal.DataBase64);
 176                }
 177                else
 178                {
 12179                    JsonElement? dataElement = cloudEventInternal.Data;
 180
 12181                    if (dataElement.HasValue && dataElement.Value.ValueKind != JsonValueKind.Null)
 182                    {
 183                        // Reserialize JsonElement to stream
 8184                        MemoryStream dataStream = SerializePayloadToStream(dataElement, cancellationToken);
 185
 186                        // First, let's attempt to find the mapping for the event type in the custom event mapping.
 8187                        if (_customEventTypeMappings.TryGetValue(cloudEventInternal.Type, out Type typeOfEventData))
 188                        {
 0189                            if (!TryGetPrimitiveFromJsonElement(dataElement.Value, out cloudEventData))
 190                            {
 0191                                if (async)
 192                                {
 0193                                    cloudEventData = await _dataSerializer.DeserializeAsync(dataStream, typeOfEventData,
 194                                }
 195                                else
 196                                {
 0197                                    cloudEventData = _dataSerializer.Deserialize(dataStream, typeOfEventData, cancellati
 198                                }
 199                            }
 200                        }
 201                        // If a custom mapping doesn't exist, let's attempt to find the mapping for the deserialization 
 8202                        else if (SystemEventTypeMappings.SystemEventDeserializers.TryGetValue(cloudEventInternal.Type, o
 203                        {
 8204                            cloudEventData = systemDeserializationFunction(dataElement.Value);
 205                        }
 206                        // If no custom mapping was added, either return a primitive/string, or an object wrapped as Bin
 207                        else
 208                        {
 209                            // If event data is not a primitive/string, return as BinaryData
 0210                            if (!TryGetPrimitiveFromJsonElement(dataElement.Value, out cloudEventData))
 211                            {
 0212                                cloudEventData = BinaryData.FromStream(dataStream);
 213                            }
 214                        }
 215                    }
 216                    else // Event has null data
 217                    {
 4218                        cloudEventData = null;
 4219                        cloudEventInternal.Type = "";
 220                    }
 221                }
 222
 14223                cloudEvents.Add(new CloudEvent(
 14224                    cloudEventInternal.Source,
 14225                    cloudEventInternal.Type)
 14226                {
 14227                    Id = cloudEventInternal.Id,
 14228                    Data = cloudEventData,
 14229                    Time = cloudEventInternal.Time,
 14230                    DataSchema = cloudEventInternal.Dataschema,
 14231                    DataContentType = cloudEventInternal.Datacontenttype,
 14232                    Subject = cloudEventInternal.Subject
 14233                });
 14234            }
 235
 10236            return cloudEvents.ToArray();
 10237        }
 238
 239        private static bool TryGetPrimitiveFromJsonElement(JsonElement jsonElement, out object elementValue)
 240        {
 12241            elementValue = null;
 12242            if (jsonElement.ValueKind == JsonValueKind.True || jsonElement.ValueKind == JsonValueKind.False)
 243            {
 2244                elementValue = jsonElement.GetBoolean();
 245            }
 10246            else if (jsonElement.ValueKind == JsonValueKind.Number)
 247            {
 0248                if (jsonElement.TryGetInt32(out var vali))
 249                {
 0250                    elementValue = vali;
 251                }
 0252                if (jsonElement.TryGetInt64(out var vall))
 253                {
 0254                    elementValue = vall;
 255                }
 0256                if (jsonElement.TryGetDouble(out var val))
 257                {
 0258                    elementValue = val;
 259                }
 260            }
 10261            else if (jsonElement.ValueKind == JsonValueKind.String)
 262            {
 4263                elementValue = jsonElement.GetString();
 264            }
 6265            else if (jsonElement.ValueKind == JsonValueKind.Undefined)
 266            {
 0267                elementValue = "";
 268            }
 269
 12270            return elementValue != null;
 271        }
 272
 273        private static async Task<JsonDocument> ParseJsonToDocument(string requestContent, bool async, CancellationToken
 274        {
 178275            MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(requestContent));
 178276            if (async)
 277            {
 0278                return await JsonDocument.ParseAsync(stream, default, cancellationToken).ConfigureAwait(false);
 279            }
 280            else
 281            {
 178282                return JsonDocument.Parse(stream, default);
 283            }
 178284        }
 285
 286        private static MemoryStream SerializePayloadToStream(JsonElement? dataElement, CancellationToken cancellationTok
 287        {
 180288            MemoryStream dataStream = new MemoryStream();
 180289            JsonObjectSerializer serializer = new JsonObjectSerializer();
 180290            serializer.Serialize(dataStream, dataElement, dataElement.GetType(), cancellationToken);
 180291            dataStream.Position = 0;
 180292            return dataStream;
 293        }
 294    }
 295}