< Summary

Class:Azure.Messaging.EventHubs.Amqp.AmqpMessageConverter
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpMessageConverter.cs
Covered lines:224
Uncovered lines:15
Coverable lines:239
Total lines:799
Line coverage:93.7% (224 of 239)
Covered branches:152
Total branches:174
Branch coverage:87.3% (152 of 174)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
CreateMessageFromEvent(...)-100%100%
CreateBatchFromEvents(...)-100%100%
CreateBatchFromMessages(...)-100%100%
CreateEventFromMessage(...)-100%100%
CreateEventHubPropertiesRequest(...)-100%100%
CreateEventHubPropertiesFromResponse(...)-85.71%50%
CreatePartitionPropertiesRequest(...)-100%100%
CreatePartitionPropertiesFromResponse(...)-90.91%50%
BuildAmqpBatchFromEvents(...)-100%100%
BuildAmqpBatchFromMessages(...)-100%100%
BuildAmqpMessageFromEvent(...)-100%85.71%
BuildEventFromAmqpMessage(...)-100%100%
ParseSystemAnnotations(...)-96.15%95.16%
TryCreateAmqpPropertyValueForEventProperty(...)-94.74%96.15%
TryCreateEventPropertyForAmqpProperty(...)-73.08%75%
GetTypeIdentifier(...)-100%50%
TranslateSymbol(...)-100%100%
ReadStreamToArraySegment(...)-83.33%50%
ReadStreamToMemory(...)-83.33%50%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpMessageConverter.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Globalization;
 7using System.IO;
 8using System.Linq;
 9using System.Runtime.Serialization;
 10using Azure.Core;
 11using Azure.Messaging.EventHubs.Diagnostics;
 12using Microsoft.Azure.Amqp;
 13using Microsoft.Azure.Amqp.Encoding;
 14using Microsoft.Azure.Amqp.Framing;
 15
 16namespace Azure.Messaging.EventHubs.Amqp
 17{
 18    /// <summary>
 19    ///   Serves as a translator between client types and AMQP messages for
 20    ///   communication with the Event Hubs service.
 21    /// </summary>
 22    ///
 23    internal class AmqpMessageConverter
 24    {
 25        /// <summary>The size, in bytes, to use as a buffer for stream operations.</summary>
 26        private const int StreamBufferSizeInBytes = 512;
 27
 28        /// <summary>
 29        ///   Converts a given <see cref="EventData" /> source into its corresponding
 30        ///   AMQP representation.
 31        /// </summary>
 32        ///
 33        /// <param name="source">The source event to convert.</param>
 34        /// <param name="partitionKey">The partition key to associate with the batch, if any.</param>
 35        ///
 36        /// <returns>The AMQP message that represents the converted event data.</returns>
 37        ///
 38        /// <remarks>
 39        ///   The caller is assumed to hold ownership over the message once it has been created, including
 40        ///   ensuring proper disposal.
 41        /// </remarks>
 42        ///
 43        public virtual AmqpMessage CreateMessageFromEvent(EventData source,
 44                                                          string partitionKey = null)
 45        {
 8246            Argument.AssertNotNull(source, nameof(source));
 8047            return BuildAmqpMessageFromEvent(source, partitionKey);
 48        }
 49
 50        /// <summary>
 51        ///   Converts a given set of <see cref="EventData" /> instances into a batch AMQP representation.
 52        /// </summary>
 53        ///
 54        /// <param name="source">The set of source events to convert.</param>
 55        /// <param name="partitionKey">The partition key to associate with the batch, if any.</param>
 56        ///
 57        /// <returns>The AMQP message that represents a batch containing the converted set of event data.</returns>
 58        ///
 59        /// <remarks>
 60        ///   The caller is assumed to hold ownership over the message once it has been created, including
 61        ///   ensuring proper disposal.
 62        /// </remarks>
 63        ///
 64        public virtual AmqpMessage CreateBatchFromEvents(IEnumerable<EventData> source,
 65                                                         string partitionKey = null)
 66        {
 14267            Argument.AssertNotNull(source, nameof(source));
 14068            return BuildAmqpBatchFromEvents(source, partitionKey);
 69        }
 70
 71        /// <summary>
 72        ///   Converts a given set of <see cref="AmqpMessage" /> instances into a batch AMQP representation.
 73        /// </summary>
 74        ///
 75        /// <param name="source">The set of source messages to convert.</param>
 76        /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci
 77        ///
 78        /// <returns>The AMQP message that represents a batch containing the converted set of event data.</returns>
 79        ///
 80        /// <remarks>
 81        ///   The caller is assumed to hold ownership over the message once it has been created, including
 82        ///   ensuring proper disposal.
 83        /// </remarks>
 84        ///
 85        public virtual AmqpMessage CreateBatchFromMessages(IEnumerable<AmqpMessage> source,
 86                                                           string partitionKey = null)
 87        {
 2488            Argument.AssertNotNull(source, nameof(source));
 2289            return BuildAmqpBatchFromMessages(source, partitionKey);
 90        }
 91
 92        /// <summary>
 93        ///   Converts a given <see cref="AmqpMessage" /> source into its corresponding
 94        ///   <see cref="EventData" /> representation.
 95        /// </summary>
 96        ///
 97        /// <param name="source">The source message to convert.</param>
 98        ///
 99        /// <returns>The event that represents the converted AMQP message.</returns>
 100        ///
 101        /// <remarks>
 102        ///   The caller is assumed to hold ownership over the specified message, including
 103        ///   ensuring proper disposal.
 104        /// </remarks>
 105        ///
 106        public virtual EventData CreateEventFromMessage(AmqpMessage source)
 107        {
 40108            Argument.AssertNotNull(source, nameof(source));
 38109            return BuildEventFromAmqpMessage(source);
 110        }
 111
 112        /// <summary>
 113        ///   Creates an <see cref="AmqpMessage" /> for use in requesting the properties of an Event Hub.
 114        /// </summary>
 115        ///
 116        /// <param name="eventHubName">The name of the Event Hub to query the properties of.</param>
 117        /// <param name="managementAuthorizationToken">The bearer token to use for authorization of management operation
 118        ///
 119        /// <returns>The AMQP message for issuing the request.</returns>
 120        ///
 121        /// <remarks>
 122        ///   The caller is assumed to hold ownership over the message once it has been created, including
 123        ///   ensuring proper disposal.
 124        /// </remarks>
 125        ///
 126        public virtual AmqpMessage CreateEventHubPropertiesRequest(string eventHubName,
 127                                                                   string managementAuthorizationToken)
 128        {
 10129            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 6130            Argument.AssertNotNullOrEmpty(managementAuthorizationToken, nameof(managementAuthorizationToken));
 131
 2132            var request = AmqpMessage.Create();
 2133            request.ApplicationProperties = new ApplicationProperties();
 2134            request.ApplicationProperties.Map[AmqpManagement.ResourceNameKey] = eventHubName;
 2135            request.ApplicationProperties.Map[AmqpManagement.OperationKey] = AmqpManagement.ReadOperationValue;
 2136            request.ApplicationProperties.Map[AmqpManagement.ResourceTypeKey] = AmqpManagement.EventHubResourceTypeValue
 2137            request.ApplicationProperties.Map[AmqpManagement.SecurityTokenKey] = managementAuthorizationToken;
 138
 2139            return request;
 140        }
 141
 142        /// <summary>
 143        ///   Converts a given <see cref="AmqpMessage" /> response to a query for Event Hub properties into the
 144        ///   corresponding <see cref="EventHubProperties" /> representation.
 145        /// </summary>
 146        ///
 147        /// <param name="response">The response message to convert.</param>
 148        ///
 149        /// <returns>The set of properties represented by the response.</returns>
 150        ///
 151        /// <remarks>
 152        ///   The caller is assumed to hold ownership over the specified message, including
 153        ///   ensuring proper disposal.
 154        /// </remarks>
 155        ///
 156        public virtual EventHubProperties CreateEventHubPropertiesFromResponse(AmqpMessage response)
 157        {
 8158            Argument.AssertNotNull(response, nameof(response));
 159
 6160            if (!(response.ValueBody?.Value is AmqpMap responseData))
 161            {
 0162                throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidMessageBo
 163            }
 164
 2165            return new EventHubProperties(
 2166                (string)responseData[AmqpManagement.ResponseMap.Name],
 2167                new DateTimeOffset((DateTime)responseData[AmqpManagement.ResponseMap.CreatedAt], TimeSpan.Zero),
 2168                (string[])responseData[AmqpManagement.ResponseMap.PartitionIdentifiers]);
 169        }
 170
 171        /// <summary>
 172        ///   Creates an <see cref="AmqpMessage" /> for use in requesting the properties of an Event Hub partition.
 173        /// </summary>
 174        ///
 175        /// <param name="eventHubName">The name of the Event Hub to query the properties of.</param>
 176        /// <param name="partitionIdentifier">The identifier of the partition to query the properties of.</param>
 177        /// <param name="managementAuthorizationToken">The bearer token to use for authorization of management operation
 178        ///
 179        /// <returns>The AMQP message for issuing the request.</returns>
 180        ///
 181        /// <remarks>
 182        ///   The caller is assumed to hold ownership over the message once it has been created, including
 183        ///   ensuring proper disposal.
 184        /// </remarks>
 185        ///
 186        public virtual AmqpMessage CreatePartitionPropertiesRequest(string eventHubName,
 187                                                                    string partitionIdentifier,
 188                                                                    string managementAuthorizationToken)
 189        {
 14190            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 10191            Argument.AssertNotNullOrEmpty(partitionIdentifier, nameof(partitionIdentifier));
 6192            Argument.AssertNotNullOrEmpty(managementAuthorizationToken, nameof(managementAuthorizationToken));
 193
 2194            var request = AmqpMessage.Create();
 2195            request.ApplicationProperties = new ApplicationProperties();
 2196            request.ApplicationProperties.Map[AmqpManagement.ResourceNameKey] = eventHubName;
 2197            request.ApplicationProperties.Map[AmqpManagement.PartitionNameKey] = partitionIdentifier;
 2198            request.ApplicationProperties.Map[AmqpManagement.OperationKey] = AmqpManagement.ReadOperationValue;
 2199            request.ApplicationProperties.Map[AmqpManagement.ResourceTypeKey] = AmqpManagement.PartitionResourceTypeValu
 2200            request.ApplicationProperties.Map[AmqpManagement.SecurityTokenKey] = managementAuthorizationToken;
 201
 2202            return request;
 203        }
 204
 205        /// <summary>
 206        ///   Converts a given <see cref="AmqpMessage" /> response to a query for Event Hub partition properties into
 207        ///   the corresponding <see cref="PartitionProperties" /> representation.
 208        /// </summary>
 209        ///
 210        /// <param name="response">The response message to convert.</param>
 211        ///
 212        /// <returns>The set of properties represented by the response.</returns>
 213        ///
 214        /// <remarks>
 215        ///   The caller is assumed to hold ownership over the specified message, including
 216        ///   ensuring proper disposal.
 217        /// </remarks>
 218        ///
 219        public virtual PartitionProperties CreatePartitionPropertiesFromResponse(AmqpMessage response)
 220        {
 8221            Argument.AssertNotNull(response, nameof(response));
 222
 6223            if (!(response.ValueBody?.Value is AmqpMap responseData))
 224            {
 0225                throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidMessageBo
 226            }
 227
 2228            return new PartitionProperties(
 2229                (string)responseData[AmqpManagement.ResponseMap.Name],
 2230                (string)responseData[AmqpManagement.ResponseMap.PartitionIdentifier],
 2231                (bool)responseData[AmqpManagement.ResponseMap.PartitionRuntimeInfoPartitionIsEmpty],
 2232                (long)responseData[AmqpManagement.ResponseMap.PartitionBeginSequenceNumber],
 2233                (long)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedSequenceNumber],
 2234                long.Parse((string)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedOffset], NumberStyles.In
 2235                new DateTimeOffset((DateTime)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedTimeUtc], Time
 236        }
 237
 238        /// <summary>
 239        ///   Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="EventData" />
 240        ///   optionally propagating the custom properties.
 241        /// </summary>
 242        ///
 243        /// <param name="source">The set of events to use as the body of the batch message.</param>
 244        /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci
 245        ///
 246        /// <returns>The batch <see cref="AmqpMessage" /> containing the source events.</returns>
 247        ///
 248        private static AmqpMessage BuildAmqpBatchFromEvents(IEnumerable<EventData> source,
 249                                                            string partitionKey) =>
 140250            BuildAmqpBatchFromMessages(
 230251                source.Select(eventData => BuildAmqpMessageFromEvent(eventData, partitionKey)),
 140252                partitionKey);
 253
 254        /// <summary>
 255        ///   Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="AmqpMessage" />.
 256        /// </summary>
 257        ///
 258        /// <param name="source">The set of messages to use as the body of the batch message.</param>
 259        /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci
 260        ///
 261        /// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns>
 262        ///
 263        private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable<AmqpMessage> source,
 264                                                              string partitionKey)
 265        {
 266            AmqpMessage batchEnvelope;
 267
 162268            var batchMessages = source.ToList();
 269
 162270            if (batchMessages.Count == 1)
 271            {
 82272                batchEnvelope = batchMessages[0];
 273            }
 274            else
 275            {
 80276                batchEnvelope = AmqpMessage.Create(batchMessages.Select(message =>
 80277                {
 120278                    message.Batchable = true;
 120279                    using var messageStream = message.ToStream();
 120280                    return new Data { Value = ReadStreamToArraySegment(messageStream) };
 120281                }));
 282
 80283                batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
 284            }
 285
 162286            if (!string.IsNullOrEmpty(partitionKey))
 287            {
 62288                batchEnvelope.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey;
 289            }
 290
 162291            batchEnvelope.Batchable = true;
 162292            return batchEnvelope;
 293        }
 294
 295        /// <summary>
 296        ///   Builds an <see cref="AmqpMessage" /> from an <see cref="EventData" />.
 297        /// </summary>
 298        ///
 299        /// <param name="source">The event to use as the source of the message.</param>
 300        /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci
 301        ///
 302        /// <returns>The <see cref="AmqpMessage" /> constructed from the source event.</returns>
 303        ///
 304        private static AmqpMessage BuildAmqpMessageFromEvent(EventData source,
 305                                                             string partitionKey)
 306        {
 170307            var body = new ArraySegment<byte>((source.Body.IsEmpty) ? Array.Empty<byte>() : source.Body.ToArray());
 170308            var message = AmqpMessage.Create(new Data { Value = body });
 309
 170310            if (source.Properties?.Count > 0)
 311            {
 34312                message.ApplicationProperties ??= new ApplicationProperties();
 313
 194314                foreach (KeyValuePair<string, object> pair in source.Properties)
 315                {
 64316                    if (TryCreateAmqpPropertyValueForEventProperty(pair.Value, out var amqpValue))
 317                    {
 62318                        message.ApplicationProperties.Map[pair.Key] = amqpValue;
 319                    }
 320                }
 321            }
 322
 168323            if (!string.IsNullOrEmpty(partitionKey))
 324            {
 30325                message.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey;
 326            }
 327
 168328            return message;
 329        }
 330
 331        /// <summary>
 332        ///   Builds an <see cref="EventData" /> from an <see cref="AmqpMessage" />.
 333        /// </summary>
 334        ///
 335        /// <param name="source">The message to use as the source of the event.</param>
 336        ///
 337        /// <returns>The <see cref="EventData" /> constructed from the source message.</returns>
 338        ///
 339        private static EventData BuildEventFromAmqpMessage(AmqpMessage source)
 340        {
 38341            ReadOnlyMemory<byte> body = (source.BodyType.HasFlag(SectionFlag.Data))
 38342                ? ReadStreamToMemory(source.BodyStream)
 38343                : ReadOnlyMemory<byte>.Empty;
 344
 38345            ParsedAnnotations systemAnnotations = ParseSystemAnnotations(source);
 346
 347            // If there were application properties associated with the message, translate them
 348            // to the event.
 349
 38350            var properties = new Dictionary<string, object>();
 351
 38352            if (source.Sections.HasFlag(SectionFlag.ApplicationProperties))
 353            {
 176354                foreach (KeyValuePair<MapKey, object> pair in source.ApplicationProperties.Map)
 355                {
 62356                    if (TryCreateEventPropertyForAmqpProperty(pair.Value, out object propertyValue))
 357                    {
 60358                        properties[pair.Key.ToString()] = propertyValue;
 359                    }
 360                }
 361            }
 362
 38363            return new EventData(
 38364                eventBody: body,
 38365                properties: properties,
 38366                systemProperties: systemAnnotations.ServiceAnnotations,
 38367                sequenceNumber: systemAnnotations.SequenceNumber ?? long.MinValue,
 38368                offset: systemAnnotations.Offset ?? long.MinValue,
 38369                enqueuedTime: systemAnnotations.EnqueuedTime ?? default,
 38370                partitionKey: systemAnnotations.PartitionKey,
 38371                lastPartitionSequenceNumber: systemAnnotations.LastSequenceNumber,
 38372                lastPartitionOffset: systemAnnotations.LastOffset,
 38373                lastPartitionEnqueuedTime: systemAnnotations.LastEnqueuedTime,
 38374                lastPartitionPropertiesRetrievalTime: systemAnnotations.LastReceivedTime);
 375        }
 376
 377        /// <summary>
 378        ///   Parses the annotations set by the Event Hubs service on the <see cref="AmqpMessage"/>
 379        ///   associated with an event, extracting them into a consumable form.
 380        /// </summary>
 381        ///
 382        /// <param name="source">The message to use as the source of the event.</param>
 383        ///
 384        /// <returns>The <see cref="ParsedAnnotations" /> parsed from the source message.</returns>
 385        ///
 386        private static ParsedAnnotations ParseSystemAnnotations(AmqpMessage source)
 387        {
 38388            var systemProperties = new ParsedAnnotations
 38389            {
 38390                ServiceAnnotations = new Dictionary<string, object>()
 38391            };
 392
 393            object amqpValue;
 394            object propertyValue;
 395
 396            // Process the message annotations.
 397
 38398            if (source.Sections.HasFlag(SectionFlag.MessageAnnotations))
 399            {
 12400                Annotations annotations = source.MessageAnnotations.Map;
 12401                var processed = new HashSet<string>();
 402
 12403                if ((annotations.TryGetValue(AmqpProperty.EnqueuedTime, out amqpValue))
 12404                    && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
 405                {
 8406                    systemProperties.EnqueuedTime = propertyValue switch
 8407                    {
 10408                        DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero),
 14409                        long longValue => new DateTimeOffset(longValue, TimeSpan.Zero),
 0410                        _ => (DateTimeOffset)propertyValue
 8411                    };
 412
 8413                    processed.Add(AmqpProperty.EnqueuedTime.ToString());
 414                }
 415
 12416                if ((annotations.TryGetValue(AmqpProperty.SequenceNumber, out amqpValue))
 12417                    && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
 418                {
 4419                    systemProperties.SequenceNumber = (long)propertyValue;
 4420                    processed.Add(AmqpProperty.SequenceNumber.ToString());
 421                }
 422
 12423                if ((annotations.TryGetValue(AmqpProperty.Offset, out amqpValue))
 12424                    && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))
 12425                    && (long.TryParse((string)propertyValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var
 426                {
 6427                    systemProperties.Offset = offset;
 6428                    processed.Add(AmqpProperty.Offset.ToString());
 429                }
 430
 12431                if ((annotations.TryGetValue(AmqpProperty.PartitionKey, out amqpValue))
 12432                    && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
 433                {
 4434                    systemProperties.PartitionKey = (string)propertyValue;
 4435                    processed.Add(AmqpProperty.PartitionKey.ToString());
 436                }
 437
 438                string key;
 439
 76440                foreach (KeyValuePair<MapKey, object> pair in annotations)
 441                {
 26442                    key = pair.Key.ToString();
 443
 26444                    if ((!processed.Contains(key))
 26445                        && (TryCreateEventPropertyForAmqpProperty(pair.Value, out propertyValue)))
 446                    {
 4447                        systemProperties.ServiceAnnotations.Add(key, propertyValue);
 4448                        processed.Add(key);
 449                    }
 450                }
 451            }
 452
 453            // Process the delivery annotations.
 454
 38455            if (source.Sections.HasFlag(SectionFlag.DeliveryAnnotations))
 456            {
 10457                if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedTimeUtc, out amqpValue
 10458                    && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
 459                {
 6460                    systemProperties.LastEnqueuedTime = propertyValue switch
 6461                    {
 8462                        DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero),
 10463                        long longValue => new DateTimeOffset(longValue, TimeSpan.Zero),
 0464                        _ => (DateTimeOffset)propertyValue
 6465                    };
 466                }
 467
 10468                if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedSequenceNumber, out am
 10469                    && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
 470                {
 2471                    systemProperties.LastSequenceNumber = (long)propertyValue;
 472                }
 473
 10474                if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedOffset, out amqpValue)
 10475                    && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))
 10476                    && (long.TryParse((string)propertyValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var
 477                {
 2478                    systemProperties.LastOffset = offset;
 479                }
 480
 10481                if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.LastPartitionPropertiesRetrievalTimeUtc, ou
 10482                    && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
 483                {
 6484                    systemProperties.LastReceivedTime = propertyValue switch
 6485                    {
 8486                        DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero),
 10487                        long longValue => new DateTimeOffset(longValue, TimeSpan.Zero),
 0488                        _ => (DateTimeOffset)propertyValue
 6489                    };
 490                }
 491            }
 492
 493            // Process the properties annotations
 494
 38495            if (source.Sections.HasFlag(SectionFlag.Properties))
 496            {
 2497                Properties properties = source.Properties;
 498
 499                void conditionalAdd(string name, object value, bool condition)
 500                {
 26501                    if (condition)
 502                    {
 2503                        systemProperties.ServiceAnnotations.Add(name, value);
 504                    }
 26505                }
 506
 2507                conditionalAdd(Properties.MessageIdName, properties.MessageId, properties.MessageId != null);
 2508                conditionalAdd(Properties.UserIdName, properties.UserId, properties.UserId.Array != null);
 2509                conditionalAdd(Properties.ToName, properties.To, properties.To != null);
 2510                conditionalAdd(Properties.SubjectName, properties.Subject, properties.Subject != null);
 2511                conditionalAdd(Properties.ReplyToName, properties.ReplyTo, properties.ReplyTo != null);
 2512                conditionalAdd(Properties.CorrelationIdName, properties.CorrelationId, properties.CorrelationId != null)
 2513                conditionalAdd(Properties.ContentTypeName, properties.ContentType, properties.ContentType.Value != null)
 2514                conditionalAdd(Properties.ContentEncodingName, properties.ContentEncoding, properties.ContentEncoding.Va
 2515                conditionalAdd(Properties.AbsoluteExpiryTimeName, properties.AbsoluteExpiryTime, properties.AbsoluteExpi
 2516                conditionalAdd(Properties.CreationTimeName, properties.CreationTime, properties.CreationTime != null);
 2517                conditionalAdd(Properties.GroupIdName, properties.GroupId, properties.GroupId != null);
 2518                conditionalAdd(Properties.GroupSequenceName, properties.GroupSequence, properties.GroupSequence != null)
 2519                conditionalAdd(Properties.ReplyToGroupIdName, properties.ReplyToGroupId, properties.ReplyToGroupId != nu
 520            }
 521
 38522            return systemProperties;
 523        }
 524
 525        /// <summary>
 526        ///   Attempts to create an AMQP property value for a given event property.
 527        /// </summary>
 528        ///
 529        /// <param name="eventPropertyValue">The value of the event property to create an AMQP property value for.</para
 530        /// <param name="amqpPropertyValue">The AMQP property value that was created.</param>
 531        ///
 532        /// <returns><c>true</c> if an AMQP property value was able to be created; otherwise, <c>false</c>.</returns>
 533        ///
 534        private static bool TryCreateAmqpPropertyValueForEventProperty(object eventPropertyValue,
 535                                                                       out object amqpPropertyValue)
 536        {
 64537            amqpPropertyValue = null;
 538
 64539            if (eventPropertyValue == null)
 540            {
 0541                return true;
 542            }
 543
 64544            switch (GetTypeIdentifier(eventPropertyValue))
 545            {
 546                case AmqpProperty.Type.Byte:
 547                case AmqpProperty.Type.SByte:
 548                case AmqpProperty.Type.Int16:
 549                case AmqpProperty.Type.Int32:
 550                case AmqpProperty.Type.Int64:
 551                case AmqpProperty.Type.UInt16:
 552                case AmqpProperty.Type.UInt32:
 553                case AmqpProperty.Type.UInt64:
 554                case AmqpProperty.Type.Single:
 555                case AmqpProperty.Type.Double:
 556                case AmqpProperty.Type.Boolean:
 557                case AmqpProperty.Type.Decimal:
 558                case AmqpProperty.Type.Char:
 559                case AmqpProperty.Type.Guid:
 560                case AmqpProperty.Type.DateTime:
 561                case AmqpProperty.Type.String:
 40562                    amqpPropertyValue = eventPropertyValue;
 40563                    break;
 564
 565                case AmqpProperty.Type.Stream:
 18566                case AmqpProperty.Type.Unknown when eventPropertyValue is Stream:
 16567                    amqpPropertyValue = ReadStreamToArraySegment((Stream)eventPropertyValue);
 16568                    break;
 569
 570                case AmqpProperty.Type.Uri:
 2571                    amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.Uri, ((Uri)eventPropertyValue).Absolut
 2572                    break;
 573
 574                case AmqpProperty.Type.DateTimeOffset:
 2575                    amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.DateTimeOffset, ((DateTimeOffset)event
 2576                    break;
 577
 578                case AmqpProperty.Type.TimeSpan:
 2579                    amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.TimeSpan, ((TimeSpan)eventPropertyValu
 2580                    break;
 581
 582                case AmqpProperty.Type.Unknown:
 2583                    var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, Resources.Faile
 2584                    EventHubsEventSource.Log.UnexpectedException(exception.Message);
 2585                    throw exception;
 586            }
 587
 62588            return (amqpPropertyValue != null);
 589        }
 590
 591        /// <summary>
 592        ///   Attempts to create an event property value for a given AMQP property.
 593        /// </summary>
 594        ///
 595        /// <param name="amqpPropertyValue">The value of the AMQP property to create an event property value for.</param
 596        /// <param name="eventPropertyValue">The event property value that was created.</param>
 597        ///
 598        /// <returns><c>true</c> if an event property value was able to be created; otherwise, <c>false</c>.</returns>
 599        ///
 600        private static bool TryCreateEventPropertyForAmqpProperty(object amqpPropertyValue,
 601                                                                  out object eventPropertyValue)
 602        {
 104603            eventPropertyValue = null;
 604
 104605            if (amqpPropertyValue == null)
 606            {
 0607                return true;
 608            }
 609
 610            // If the property is a simple type, then use it directly.
 611
 104612            switch (GetTypeIdentifier(amqpPropertyValue))
 613            {
 614                case AmqpProperty.Type.Byte:
 615                case AmqpProperty.Type.SByte:
 616                case AmqpProperty.Type.Int16:
 617                case AmqpProperty.Type.Int32:
 618                case AmqpProperty.Type.Int64:
 619                case AmqpProperty.Type.UInt16:
 620                case AmqpProperty.Type.UInt32:
 621                case AmqpProperty.Type.UInt64:
 622                case AmqpProperty.Type.Single:
 623                case AmqpProperty.Type.Double:
 624                case AmqpProperty.Type.Boolean:
 625                case AmqpProperty.Type.Decimal:
 626                case AmqpProperty.Type.Char:
 627                case AmqpProperty.Type.Guid:
 628                case AmqpProperty.Type.DateTime:
 629                case AmqpProperty.Type.String:
 90630                    eventPropertyValue = amqpPropertyValue;
 90631                    return true;
 632
 633                case AmqpProperty.Type.Unknown:
 634                    // An explicitly unknown type will be considered for additional
 635                    // scenarios below.
 636                    break;
 637
 638                default:
 0639                    return false;
 640            }
 641
 642            // Attempt to parse the value against other well-known value scenarios.
 643
 14644            switch (amqpPropertyValue)
 645            {
 646                case AmqpSymbol symbol:
 0647                    eventPropertyValue = symbol.Value;
 0648                    break;
 649
 650                case byte[] array:
 2651                    eventPropertyValue = array;
 2652                    break;
 653
 4654                case ArraySegment<byte> segment when segment.Count == segment.Array.Length:
 2655                    eventPropertyValue = segment.Array;
 2656                    break;
 657
 658                case ArraySegment<byte> segment:
 2659                    var buffer = new byte[segment.Count];
 2660                    Buffer.BlockCopy(segment.Array, segment.Offset, buffer, 0, segment.Count);
 2661                    eventPropertyValue = buffer;
 2662                    break;
 663
 8664                case DescribedType described when (described.Descriptor is AmqpSymbol):
 8665                    eventPropertyValue = TranslateSymbol((AmqpSymbol)described.Descriptor, described.Value);
 8666                    break;
 667
 668                default:
 0669                    var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, Resources.Faile
 0670                    EventHubsEventSource.Log.UnexpectedException(exception.Message);
 0671                    throw exception;
 672            }
 673
 14674            return (eventPropertyValue != null);
 675        }
 676
 677        /// <summary>
 678        ///   Gets the AMQP property type identifier for a given
 679        ///   value.
 680        /// </summary>
 681        ///
 682        /// <param name="value">The value to determine the type identifier for.</param>
 683        ///
 684        /// <returns>The <see cref="AmqpProperty.Type"/> that was identified for the given <paramref name="value"/>.</re
 685        ///
 686        private static AmqpProperty.Type GetTypeIdentifier(object value) =>
 168687            (value == null)
 168688                ? AmqpProperty.Type.Null
 168689                : value.GetType().ToAmqpPropertyType();
 690
 691        /// <summary>
 692        ///   Translates the AMQP symbol into its corresponding typed value, if it belongs to the
 693        ///   set of known types.
 694        /// </summary>
 695        ///
 696        /// <param name="symbol">The symbol to consider.</param>
 697        /// <param name="value">The value of the symbol to translate.</param>
 698        ///
 699        /// <returns>The typed value of the symbol, if it belongs to the well-known set; otherwise, <c>null</c>.</return
 700        ///
 701        private static object TranslateSymbol(AmqpSymbol symbol,
 702                                              object value)
 703        {
 8704            if (symbol.Equals(AmqpProperty.Descriptor.Uri))
 705            {
 2706                return new Uri((string)value);
 707            }
 708
 6709            if (symbol.Equals(AmqpProperty.Descriptor.TimeSpan))
 710            {
 2711                return new TimeSpan((long)value);
 712            }
 713
 4714            if (symbol.Equals(AmqpProperty.Descriptor.DateTimeOffset))
 715            {
 2716                return new DateTimeOffset((long)value, TimeSpan.Zero);
 717            }
 718
 2719            return null;
 720        }
 721
 722        /// <summary>
 723        ///   Converts a stream to an <see cref="ArraySegment{T}" /> representation.
 724        /// </summary>
 725        ///
 726        /// <param name="stream">The stream to read and capture in memory.</param>
 727        ///
 728        /// <returns>The <see cref="ArraySegment{T}" /> containing the stream data.</returns>
 729        ///
 730        private static ArraySegment<byte> ReadStreamToArraySegment(Stream stream)
 731        {
 56732            if (stream == null)
 733            {
 0734                return new ArraySegment<byte>();
 735            }
 736
 56737            using var memStream = new MemoryStream(StreamBufferSizeInBytes);
 56738            stream.CopyTo(memStream, StreamBufferSizeInBytes);
 739
 56740            return new ArraySegment<byte>(memStream.ToArray());
 56741        }
 742
 743        /// <summary>
 744        ///   Converts a stream to a set of memory bytes.
 745        /// </summary>
 746        ///
 747        /// <param name="stream">The stream to read and capture in memory.</param>
 748        ///
 749        /// <returns>The set of memory bytes containing the stream data.</returns>
 750        ///
 751        private static ReadOnlyMemory<byte> ReadStreamToMemory(Stream stream)
 752        {
 34753            if (stream == null)
 754            {
 0755                return ReadOnlyMemory<byte>.Empty;
 756            }
 757
 34758            using var memStream = new MemoryStream(StreamBufferSizeInBytes);
 34759            stream.CopyTo(memStream, StreamBufferSizeInBytes);
 760
 34761            return new ReadOnlyMemory<byte>(memStream.ToArray());
 34762        }
 763
 764        /// <summary>
 765        ///   The set of system annotations set on a message received from the
 766        ///   Event Hubs service.
 767        /// </summary>
 768        ///
 769        private struct ParsedAnnotations
 770        {
 771            /// <summary>The set of weakly typed annotations associated with the message.</summary>
 772            public Dictionary<string, object> ServiceAnnotations;
 773
 774            /// <summary>The sequence number of the event associated with the message.</summary>
 775            public long? SequenceNumber;
 776
 777            /// <summary>The offset of the event associated with the message.</summary>
 778            public long? Offset;
 779
 780            /// <summary>The date and time, in UTC, that the event associated with the message was enqueued.</summary>
 781            public DateTimeOffset? EnqueuedTime;
 782
 783            /// <summary>The partition key that the event associated with the message was published with.</summary>
 784            public string PartitionKey;
 785
 786            /// <summary>The sequence number of the event that was last enqueued in the partition.</summary>
 787            public long? LastSequenceNumber;
 788
 789            /// <summary>The offset of the event that was last enqueued in the partition.</summary>
 790            public long? LastOffset;
 791
 792            /// <summary>The date and time, in UTC, that an event was last enqueued in the partition.</summary>
 793            public DateTimeOffset? LastEnqueuedTime;
 794
 795            /// <summary>The date and time, in UTC, that the last enqueued event information was retrieved from the serv
 796            public DateTimeOffset? LastReceivedTime;
 797        }
 798    }
 799}