| | | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | | 2 | | // Licensed under the MIT License. |
| | | 3 | | |
| | | 4 | | using System; |
| | | 5 | | using System.Collections.Generic; |
| | | 6 | | using System.Globalization; |
| | | 7 | | using System.IO; |
| | | 8 | | using System.Linq; |
| | | 9 | | using System.Runtime.Serialization; |
| | | 10 | | using Azure.Core; |
| | | 11 | | using Azure.Messaging.EventHubs.Diagnostics; |
| | | 12 | | using Microsoft.Azure.Amqp; |
| | | 13 | | using Microsoft.Azure.Amqp.Encoding; |
| | | 14 | | using Microsoft.Azure.Amqp.Framing; |
| | | 15 | | |
| | | 16 | | namespace Azure.Messaging.EventHubs.Amqp |
| | | 17 | | { |
| | | 18 | | /// <summary> |
| | | 19 | | /// Serves as a translator between client types and AMQP messages for |
| | | 20 | | /// communication with the Event Hubs service. |
| | | 21 | | /// </summary> |
| | | 22 | | /// |
| | | 23 | | internal class AmqpMessageConverter |
| | | 24 | | { |
| | | 25 | | /// <summary>The size, in bytes, to use as a buffer for stream operations.</summary> |
| | | 26 | | private const int StreamBufferSizeInBytes = 512; |
| | | 27 | | |
| | | 28 | | /// <summary> |
| | | 29 | | /// Converts a given <see cref="EventData" /> source into its corresponding |
| | | 30 | | /// AMQP representation. |
| | | 31 | | /// </summary> |
| | | 32 | | /// |
| | | 33 | | /// <param name="source">The source event to convert.</param> |
| | | 34 | | /// <param name="partitionKey">The partition key to associate with the batch, if any.</param> |
| | | 35 | | /// |
| | | 36 | | /// <returns>The AMQP message that represents the converted event data.</returns> |
| | | 37 | | /// |
| | | 38 | | /// <remarks> |
| | | 39 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | | 40 | | /// ensuring proper disposal. |
| | | 41 | | /// </remarks> |
| | | 42 | | /// |
| | | 43 | | public virtual AmqpMessage CreateMessageFromEvent(EventData source, |
| | | 44 | | string partitionKey = null) |
| | | 45 | | { |
| | 82 | 46 | | Argument.AssertNotNull(source, nameof(source)); |
| | 80 | 47 | | return BuildAmqpMessageFromEvent(source, partitionKey); |
| | | 48 | | } |
| | | 49 | | |
| | | 50 | | /// <summary> |
| | | 51 | | /// Converts a given set of <see cref="EventData" /> instances into a batch AMQP representation. |
| | | 52 | | /// </summary> |
| | | 53 | | /// |
| | | 54 | | /// <param name="source">The set of source events to convert.</param> |
| | | 55 | | /// <param name="partitionKey">The partition key to associate with the batch, if any.</param> |
| | | 56 | | /// |
| | | 57 | | /// <returns>The AMQP message that represents a batch containing the converted set of event data.</returns> |
| | | 58 | | /// |
| | | 59 | | /// <remarks> |
| | | 60 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | | 61 | | /// ensuring proper disposal. |
| | | 62 | | /// </remarks> |
| | | 63 | | /// |
| | | 64 | | public virtual AmqpMessage CreateBatchFromEvents(IEnumerable<EventData> source, |
| | | 65 | | string partitionKey = null) |
| | | 66 | | { |
| | 142 | 67 | | Argument.AssertNotNull(source, nameof(source)); |
| | 140 | 68 | | return BuildAmqpBatchFromEvents(source, partitionKey); |
| | | 69 | | } |
| | | 70 | | |
| | | 71 | | /// <summary> |
| | | 72 | | /// Converts a given set of <see cref="AmqpMessage" /> instances into a batch AMQP representation. |
| | | 73 | | /// </summary> |
| | | 74 | | /// |
| | | 75 | | /// <param name="source">The set of source messages to convert.</param> |
| | | 76 | | /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci |
| | | 77 | | /// |
| | | 78 | | /// <returns>The AMQP message that represents a batch containing the converted set of event data.</returns> |
| | | 79 | | /// |
| | | 80 | | /// <remarks> |
| | | 81 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | | 82 | | /// ensuring proper disposal. |
| | | 83 | | /// </remarks> |
| | | 84 | | /// |
| | | 85 | | public virtual AmqpMessage CreateBatchFromMessages(IEnumerable<AmqpMessage> source, |
| | | 86 | | string partitionKey = null) |
| | | 87 | | { |
| | 24 | 88 | | Argument.AssertNotNull(source, nameof(source)); |
| | 22 | 89 | | return BuildAmqpBatchFromMessages(source, partitionKey); |
| | | 90 | | } |
| | | 91 | | |
| | | 92 | | /// <summary> |
| | | 93 | | /// Converts a given <see cref="AmqpMessage" /> source into its corresponding |
| | | 94 | | /// <see cref="EventData" /> representation. |
| | | 95 | | /// </summary> |
| | | 96 | | /// |
| | | 97 | | /// <param name="source">The source message to convert.</param> |
| | | 98 | | /// |
| | | 99 | | /// <returns>The event that represents the converted AMQP message.</returns> |
| | | 100 | | /// |
| | | 101 | | /// <remarks> |
| | | 102 | | /// The caller is assumed to hold ownership over the specified message, including |
| | | 103 | | /// ensuring proper disposal. |
| | | 104 | | /// </remarks> |
| | | 105 | | /// |
| | | 106 | | public virtual EventData CreateEventFromMessage(AmqpMessage source) |
| | | 107 | | { |
| | 40 | 108 | | Argument.AssertNotNull(source, nameof(source)); |
| | 38 | 109 | | return BuildEventFromAmqpMessage(source); |
| | | 110 | | } |
| | | 111 | | |
| | | 112 | | /// <summary> |
| | | 113 | | /// Creates an <see cref="AmqpMessage" /> for use in requesting the properties of an Event Hub. |
| | | 114 | | /// </summary> |
| | | 115 | | /// |
| | | 116 | | /// <param name="eventHubName">The name of the Event Hub to query the properties of.</param> |
| | | 117 | | /// <param name="managementAuthorizationToken">The bearer token to use for authorization of management operation |
| | | 118 | | /// |
| | | 119 | | /// <returns>The AMQP message for issuing the request.</returns> |
| | | 120 | | /// |
| | | 121 | | /// <remarks> |
| | | 122 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | | 123 | | /// ensuring proper disposal. |
| | | 124 | | /// </remarks> |
| | | 125 | | /// |
| | | 126 | | public virtual AmqpMessage CreateEventHubPropertiesRequest(string eventHubName, |
| | | 127 | | string managementAuthorizationToken) |
| | | 128 | | { |
| | 10 | 129 | | Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); |
| | 6 | 130 | | Argument.AssertNotNullOrEmpty(managementAuthorizationToken, nameof(managementAuthorizationToken)); |
| | | 131 | | |
| | 2 | 132 | | var request = AmqpMessage.Create(); |
| | 2 | 133 | | request.ApplicationProperties = new ApplicationProperties(); |
| | 2 | 134 | | request.ApplicationProperties.Map[AmqpManagement.ResourceNameKey] = eventHubName; |
| | 2 | 135 | | request.ApplicationProperties.Map[AmqpManagement.OperationKey] = AmqpManagement.ReadOperationValue; |
| | 2 | 136 | | request.ApplicationProperties.Map[AmqpManagement.ResourceTypeKey] = AmqpManagement.EventHubResourceTypeValue |
| | 2 | 137 | | request.ApplicationProperties.Map[AmqpManagement.SecurityTokenKey] = managementAuthorizationToken; |
| | | 138 | | |
| | 2 | 139 | | return request; |
| | | 140 | | } |
| | | 141 | | |
| | | 142 | | /// <summary> |
| | | 143 | | /// Converts a given <see cref="AmqpMessage" /> response to a query for Event Hub properties into the |
| | | 144 | | /// corresponding <see cref="EventHubProperties" /> representation. |
| | | 145 | | /// </summary> |
| | | 146 | | /// |
| | | 147 | | /// <param name="response">The response message to convert.</param> |
| | | 148 | | /// |
| | | 149 | | /// <returns>The set of properties represented by the response.</returns> |
| | | 150 | | /// |
| | | 151 | | /// <remarks> |
| | | 152 | | /// The caller is assumed to hold ownership over the specified message, including |
| | | 153 | | /// ensuring proper disposal. |
| | | 154 | | /// </remarks> |
| | | 155 | | /// |
| | | 156 | | public virtual EventHubProperties CreateEventHubPropertiesFromResponse(AmqpMessage response) |
| | | 157 | | { |
| | 8 | 158 | | Argument.AssertNotNull(response, nameof(response)); |
| | | 159 | | |
| | 6 | 160 | | if (!(response.ValueBody?.Value is AmqpMap responseData)) |
| | | 161 | | { |
| | 0 | 162 | | throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidMessageBo |
| | | 163 | | } |
| | | 164 | | |
| | 2 | 165 | | return new EventHubProperties( |
| | 2 | 166 | | (string)responseData[AmqpManagement.ResponseMap.Name], |
| | 2 | 167 | | new DateTimeOffset((DateTime)responseData[AmqpManagement.ResponseMap.CreatedAt], TimeSpan.Zero), |
| | 2 | 168 | | (string[])responseData[AmqpManagement.ResponseMap.PartitionIdentifiers]); |
| | | 169 | | } |
| | | 170 | | |
| | | 171 | | /// <summary> |
| | | 172 | | /// Creates an <see cref="AmqpMessage" /> for use in requesting the properties of an Event Hub partition. |
| | | 173 | | /// </summary> |
| | | 174 | | /// |
| | | 175 | | /// <param name="eventHubName">The name of the Event Hub to query the properties of.</param> |
| | | 176 | | /// <param name="partitionIdentifier">The identifier of the partition to query the properties of.</param> |
| | | 177 | | /// <param name="managementAuthorizationToken">The bearer token to use for authorization of management operation |
| | | 178 | | /// |
| | | 179 | | /// <returns>The AMQP message for issuing the request.</returns> |
| | | 180 | | /// |
| | | 181 | | /// <remarks> |
| | | 182 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | | 183 | | /// ensuring proper disposal. |
| | | 184 | | /// </remarks> |
| | | 185 | | /// |
| | | 186 | | public virtual AmqpMessage CreatePartitionPropertiesRequest(string eventHubName, |
| | | 187 | | string partitionIdentifier, |
| | | 188 | | string managementAuthorizationToken) |
| | | 189 | | { |
| | 14 | 190 | | Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); |
| | 10 | 191 | | Argument.AssertNotNullOrEmpty(partitionIdentifier, nameof(partitionIdentifier)); |
| | 6 | 192 | | Argument.AssertNotNullOrEmpty(managementAuthorizationToken, nameof(managementAuthorizationToken)); |
| | | 193 | | |
| | 2 | 194 | | var request = AmqpMessage.Create(); |
| | 2 | 195 | | request.ApplicationProperties = new ApplicationProperties(); |
| | 2 | 196 | | request.ApplicationProperties.Map[AmqpManagement.ResourceNameKey] = eventHubName; |
| | 2 | 197 | | request.ApplicationProperties.Map[AmqpManagement.PartitionNameKey] = partitionIdentifier; |
| | 2 | 198 | | request.ApplicationProperties.Map[AmqpManagement.OperationKey] = AmqpManagement.ReadOperationValue; |
| | 2 | 199 | | request.ApplicationProperties.Map[AmqpManagement.ResourceTypeKey] = AmqpManagement.PartitionResourceTypeValu |
| | 2 | 200 | | request.ApplicationProperties.Map[AmqpManagement.SecurityTokenKey] = managementAuthorizationToken; |
| | | 201 | | |
| | 2 | 202 | | return request; |
| | | 203 | | } |
| | | 204 | | |
| | | 205 | | /// <summary> |
| | | 206 | | /// Converts a given <see cref="AmqpMessage" /> response to a query for Event Hub partition properties into |
| | | 207 | | /// the corresponding <see cref="PartitionProperties" /> representation. |
| | | 208 | | /// </summary> |
| | | 209 | | /// |
| | | 210 | | /// <param name="response">The response message to convert.</param> |
| | | 211 | | /// |
| | | 212 | | /// <returns>The set of properties represented by the response.</returns> |
| | | 213 | | /// |
| | | 214 | | /// <remarks> |
| | | 215 | | /// The caller is assumed to hold ownership over the specified message, including |
| | | 216 | | /// ensuring proper disposal. |
| | | 217 | | /// </remarks> |
| | | 218 | | /// |
| | | 219 | | public virtual PartitionProperties CreatePartitionPropertiesFromResponse(AmqpMessage response) |
| | | 220 | | { |
| | 8 | 221 | | Argument.AssertNotNull(response, nameof(response)); |
| | | 222 | | |
| | 6 | 223 | | if (!(response.ValueBody?.Value is AmqpMap responseData)) |
| | | 224 | | { |
| | 0 | 225 | | throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidMessageBo |
| | | 226 | | } |
| | | 227 | | |
| | 2 | 228 | | return new PartitionProperties( |
| | 2 | 229 | | (string)responseData[AmqpManagement.ResponseMap.Name], |
| | 2 | 230 | | (string)responseData[AmqpManagement.ResponseMap.PartitionIdentifier], |
| | 2 | 231 | | (bool)responseData[AmqpManagement.ResponseMap.PartitionRuntimeInfoPartitionIsEmpty], |
| | 2 | 232 | | (long)responseData[AmqpManagement.ResponseMap.PartitionBeginSequenceNumber], |
| | 2 | 233 | | (long)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedSequenceNumber], |
| | 2 | 234 | | long.Parse((string)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedOffset], NumberStyles.In |
| | 2 | 235 | | new DateTimeOffset((DateTime)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedTimeUtc], Time |
| | | 236 | | } |
| | | 237 | | |
| | | 238 | | /// <summary> |
| | | 239 | | /// Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="EventData" /> |
| | | 240 | | /// optionally propagating the custom properties. |
| | | 241 | | /// </summary> |
| | | 242 | | /// |
| | | 243 | | /// <param name="source">The set of events to use as the body of the batch message.</param> |
| | | 244 | | /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci |
| | | 245 | | /// |
| | | 246 | | /// <returns>The batch <see cref="AmqpMessage" /> containing the source events.</returns> |
| | | 247 | | /// |
| | | 248 | | private static AmqpMessage BuildAmqpBatchFromEvents(IEnumerable<EventData> source, |
| | | 249 | | string partitionKey) => |
| | 140 | 250 | | BuildAmqpBatchFromMessages( |
| | 230 | 251 | | source.Select(eventData => BuildAmqpMessageFromEvent(eventData, partitionKey)), |
| | 140 | 252 | | partitionKey); |
| | | 253 | | |
| | | 254 | | /// <summary> |
| | | 255 | | /// Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="AmqpMessage" />. |
| | | 256 | | /// </summary> |
| | | 257 | | /// |
| | | 258 | | /// <param name="source">The set of messages to use as the body of the batch message.</param> |
| | | 259 | | /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci |
| | | 260 | | /// |
| | | 261 | | /// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns> |
| | | 262 | | /// |
| | | 263 | | private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable<AmqpMessage> source, |
| | | 264 | | string partitionKey) |
| | | 265 | | { |
| | | 266 | | AmqpMessage batchEnvelope; |
| | | 267 | | |
| | 162 | 268 | | var batchMessages = source.ToList(); |
| | | 269 | | |
| | 162 | 270 | | if (batchMessages.Count == 1) |
| | | 271 | | { |
| | 82 | 272 | | batchEnvelope = batchMessages[0]; |
| | | 273 | | } |
| | | 274 | | else |
| | | 275 | | { |
| | 80 | 276 | | batchEnvelope = AmqpMessage.Create(batchMessages.Select(message => |
| | 80 | 277 | | { |
| | 120 | 278 | | message.Batchable = true; |
| | 120 | 279 | | using var messageStream = message.ToStream(); |
| | 120 | 280 | | return new Data { Value = ReadStreamToArraySegment(messageStream) }; |
| | 120 | 281 | | })); |
| | | 282 | | |
| | 80 | 283 | | batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; |
| | | 284 | | } |
| | | 285 | | |
| | 162 | 286 | | if (!string.IsNullOrEmpty(partitionKey)) |
| | | 287 | | { |
| | 62 | 288 | | batchEnvelope.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey; |
| | | 289 | | } |
| | | 290 | | |
| | 162 | 291 | | batchEnvelope.Batchable = true; |
| | 162 | 292 | | return batchEnvelope; |
| | | 293 | | } |
| | | 294 | | |
| | | 295 | | /// <summary> |
| | | 296 | | /// Builds an <see cref="AmqpMessage" /> from an <see cref="EventData" />. |
| | | 297 | | /// </summary> |
| | | 298 | | /// |
| | | 299 | | /// <param name="source">The event to use as the source of the message.</param> |
| | | 300 | | /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci |
| | | 301 | | /// |
| | | 302 | | /// <returns>The <see cref="AmqpMessage" /> constructed from the source event.</returns> |
| | | 303 | | /// |
| | | 304 | | private static AmqpMessage BuildAmqpMessageFromEvent(EventData source, |
| | | 305 | | string partitionKey) |
| | | 306 | | { |
| | 170 | 307 | | var body = new ArraySegment<byte>((source.Body.IsEmpty) ? Array.Empty<byte>() : source.Body.ToArray()); |
| | 170 | 308 | | var message = AmqpMessage.Create(new Data { Value = body }); |
| | | 309 | | |
| | 170 | 310 | | if (source.Properties?.Count > 0) |
| | | 311 | | { |
| | 34 | 312 | | message.ApplicationProperties ??= new ApplicationProperties(); |
| | | 313 | | |
| | 194 | 314 | | foreach (KeyValuePair<string, object> pair in source.Properties) |
| | | 315 | | { |
| | 64 | 316 | | if (TryCreateAmqpPropertyValueForEventProperty(pair.Value, out var amqpValue)) |
| | | 317 | | { |
| | 62 | 318 | | message.ApplicationProperties.Map[pair.Key] = amqpValue; |
| | | 319 | | } |
| | | 320 | | } |
| | | 321 | | } |
| | | 322 | | |
| | 168 | 323 | | if (!string.IsNullOrEmpty(partitionKey)) |
| | | 324 | | { |
| | 30 | 325 | | message.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey; |
| | | 326 | | } |
| | | 327 | | |
| | 168 | 328 | | return message; |
| | | 329 | | } |
| | | 330 | | |
| | | 331 | | /// <summary> |
| | | 332 | | /// Builds an <see cref="EventData" /> from an <see cref="AmqpMessage" />. |
| | | 333 | | /// </summary> |
| | | 334 | | /// |
| | | 335 | | /// <param name="source">The message to use as the source of the event.</param> |
| | | 336 | | /// |
| | | 337 | | /// <returns>The <see cref="EventData" /> constructed from the source message.</returns> |
| | | 338 | | /// |
| | | 339 | | private static EventData BuildEventFromAmqpMessage(AmqpMessage source) |
| | | 340 | | { |
| | 38 | 341 | | ReadOnlyMemory<byte> body = (source.BodyType.HasFlag(SectionFlag.Data)) |
| | 38 | 342 | | ? ReadStreamToMemory(source.BodyStream) |
| | 38 | 343 | | : ReadOnlyMemory<byte>.Empty; |
| | | 344 | | |
| | 38 | 345 | | ParsedAnnotations systemAnnotations = ParseSystemAnnotations(source); |
| | | 346 | | |
| | | 347 | | // If there were application properties associated with the message, translate them |
| | | 348 | | // to the event. |
| | | 349 | | |
| | 38 | 350 | | var properties = new Dictionary<string, object>(); |
| | | 351 | | |
| | 38 | 352 | | if (source.Sections.HasFlag(SectionFlag.ApplicationProperties)) |
| | | 353 | | { |
| | 176 | 354 | | foreach (KeyValuePair<MapKey, object> pair in source.ApplicationProperties.Map) |
| | | 355 | | { |
| | 62 | 356 | | if (TryCreateEventPropertyForAmqpProperty(pair.Value, out object propertyValue)) |
| | | 357 | | { |
| | 60 | 358 | | properties[pair.Key.ToString()] = propertyValue; |
| | | 359 | | } |
| | | 360 | | } |
| | | 361 | | } |
| | | 362 | | |
| | 38 | 363 | | return new EventData( |
| | 38 | 364 | | eventBody: body, |
| | 38 | 365 | | properties: properties, |
| | 38 | 366 | | systemProperties: systemAnnotations.ServiceAnnotations, |
| | 38 | 367 | | sequenceNumber: systemAnnotations.SequenceNumber ?? long.MinValue, |
| | 38 | 368 | | offset: systemAnnotations.Offset ?? long.MinValue, |
| | 38 | 369 | | enqueuedTime: systemAnnotations.EnqueuedTime ?? default, |
| | 38 | 370 | | partitionKey: systemAnnotations.PartitionKey, |
| | 38 | 371 | | lastPartitionSequenceNumber: systemAnnotations.LastSequenceNumber, |
| | 38 | 372 | | lastPartitionOffset: systemAnnotations.LastOffset, |
| | 38 | 373 | | lastPartitionEnqueuedTime: systemAnnotations.LastEnqueuedTime, |
| | 38 | 374 | | lastPartitionPropertiesRetrievalTime: systemAnnotations.LastReceivedTime); |
| | | 375 | | } |
| | | 376 | | |
| | | 377 | | /// <summary> |
| | | 378 | | /// Parses the annotations set by the Event Hubs service on the <see cref="AmqpMessage"/> |
| | | 379 | | /// associated with an event, extracting them into a consumable form. |
| | | 380 | | /// </summary> |
| | | 381 | | /// |
| | | 382 | | /// <param name="source">The message to use as the source of the event.</param> |
| | | 383 | | /// |
| | | 384 | | /// <returns>The <see cref="ParsedAnnotations" /> parsed from the source message.</returns> |
| | | 385 | | /// |
| | | 386 | | private static ParsedAnnotations ParseSystemAnnotations(AmqpMessage source) |
| | | 387 | | { |
| | 38 | 388 | | var systemProperties = new ParsedAnnotations |
| | 38 | 389 | | { |
| | 38 | 390 | | ServiceAnnotations = new Dictionary<string, object>() |
| | 38 | 391 | | }; |
| | | 392 | | |
| | | 393 | | object amqpValue; |
| | | 394 | | object propertyValue; |
| | | 395 | | |
| | | 396 | | // Process the message annotations. |
| | | 397 | | |
| | 38 | 398 | | if (source.Sections.HasFlag(SectionFlag.MessageAnnotations)) |
| | | 399 | | { |
| | 12 | 400 | | Annotations annotations = source.MessageAnnotations.Map; |
| | 12 | 401 | | var processed = new HashSet<string>(); |
| | | 402 | | |
| | 12 | 403 | | if ((annotations.TryGetValue(AmqpProperty.EnqueuedTime, out amqpValue)) |
| | 12 | 404 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | | 405 | | { |
| | 8 | 406 | | systemProperties.EnqueuedTime = propertyValue switch |
| | 8 | 407 | | { |
| | 10 | 408 | | DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero), |
| | 14 | 409 | | long longValue => new DateTimeOffset(longValue, TimeSpan.Zero), |
| | 0 | 410 | | _ => (DateTimeOffset)propertyValue |
| | 8 | 411 | | }; |
| | | 412 | | |
| | 8 | 413 | | processed.Add(AmqpProperty.EnqueuedTime.ToString()); |
| | | 414 | | } |
| | | 415 | | |
| | 12 | 416 | | if ((annotations.TryGetValue(AmqpProperty.SequenceNumber, out amqpValue)) |
| | 12 | 417 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | | 418 | | { |
| | 4 | 419 | | systemProperties.SequenceNumber = (long)propertyValue; |
| | 4 | 420 | | processed.Add(AmqpProperty.SequenceNumber.ToString()); |
| | | 421 | | } |
| | | 422 | | |
| | 12 | 423 | | if ((annotations.TryGetValue(AmqpProperty.Offset, out amqpValue)) |
| | 12 | 424 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)) |
| | 12 | 425 | | && (long.TryParse((string)propertyValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var |
| | | 426 | | { |
| | 6 | 427 | | systemProperties.Offset = offset; |
| | 6 | 428 | | processed.Add(AmqpProperty.Offset.ToString()); |
| | | 429 | | } |
| | | 430 | | |
| | 12 | 431 | | if ((annotations.TryGetValue(AmqpProperty.PartitionKey, out amqpValue)) |
| | 12 | 432 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | | 433 | | { |
| | 4 | 434 | | systemProperties.PartitionKey = (string)propertyValue; |
| | 4 | 435 | | processed.Add(AmqpProperty.PartitionKey.ToString()); |
| | | 436 | | } |
| | | 437 | | |
| | | 438 | | string key; |
| | | 439 | | |
| | 76 | 440 | | foreach (KeyValuePair<MapKey, object> pair in annotations) |
| | | 441 | | { |
| | 26 | 442 | | key = pair.Key.ToString(); |
| | | 443 | | |
| | 26 | 444 | | if ((!processed.Contains(key)) |
| | 26 | 445 | | && (TryCreateEventPropertyForAmqpProperty(pair.Value, out propertyValue))) |
| | | 446 | | { |
| | 4 | 447 | | systemProperties.ServiceAnnotations.Add(key, propertyValue); |
| | 4 | 448 | | processed.Add(key); |
| | | 449 | | } |
| | | 450 | | } |
| | | 451 | | } |
| | | 452 | | |
| | | 453 | | // Process the delivery annotations. |
| | | 454 | | |
| | 38 | 455 | | if (source.Sections.HasFlag(SectionFlag.DeliveryAnnotations)) |
| | | 456 | | { |
| | 10 | 457 | | if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedTimeUtc, out amqpValue |
| | 10 | 458 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | | 459 | | { |
| | 6 | 460 | | systemProperties.LastEnqueuedTime = propertyValue switch |
| | 6 | 461 | | { |
| | 8 | 462 | | DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero), |
| | 10 | 463 | | long longValue => new DateTimeOffset(longValue, TimeSpan.Zero), |
| | 0 | 464 | | _ => (DateTimeOffset)propertyValue |
| | 6 | 465 | | }; |
| | | 466 | | } |
| | | 467 | | |
| | 10 | 468 | | if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedSequenceNumber, out am |
| | 10 | 469 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | | 470 | | { |
| | 2 | 471 | | systemProperties.LastSequenceNumber = (long)propertyValue; |
| | | 472 | | } |
| | | 473 | | |
| | 10 | 474 | | if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedOffset, out amqpValue) |
| | 10 | 475 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)) |
| | 10 | 476 | | && (long.TryParse((string)propertyValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var |
| | | 477 | | { |
| | 2 | 478 | | systemProperties.LastOffset = offset; |
| | | 479 | | } |
| | | 480 | | |
| | 10 | 481 | | if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.LastPartitionPropertiesRetrievalTimeUtc, ou |
| | 10 | 482 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | | 483 | | { |
| | 6 | 484 | | systemProperties.LastReceivedTime = propertyValue switch |
| | 6 | 485 | | { |
| | 8 | 486 | | DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero), |
| | 10 | 487 | | long longValue => new DateTimeOffset(longValue, TimeSpan.Zero), |
| | 0 | 488 | | _ => (DateTimeOffset)propertyValue |
| | 6 | 489 | | }; |
| | | 490 | | } |
| | | 491 | | } |
| | | 492 | | |
| | | 493 | | // Process the properties annotations |
| | | 494 | | |
| | 38 | 495 | | if (source.Sections.HasFlag(SectionFlag.Properties)) |
| | | 496 | | { |
| | 2 | 497 | | Properties properties = source.Properties; |
| | | 498 | | |
| | | 499 | | void conditionalAdd(string name, object value, bool condition) |
| | | 500 | | { |
| | 26 | 501 | | if (condition) |
| | | 502 | | { |
| | 2 | 503 | | systemProperties.ServiceAnnotations.Add(name, value); |
| | | 504 | | } |
| | 26 | 505 | | } |
| | | 506 | | |
| | 2 | 507 | | conditionalAdd(Properties.MessageIdName, properties.MessageId, properties.MessageId != null); |
| | 2 | 508 | | conditionalAdd(Properties.UserIdName, properties.UserId, properties.UserId.Array != null); |
| | 2 | 509 | | conditionalAdd(Properties.ToName, properties.To, properties.To != null); |
| | 2 | 510 | | conditionalAdd(Properties.SubjectName, properties.Subject, properties.Subject != null); |
| | 2 | 511 | | conditionalAdd(Properties.ReplyToName, properties.ReplyTo, properties.ReplyTo != null); |
| | 2 | 512 | | conditionalAdd(Properties.CorrelationIdName, properties.CorrelationId, properties.CorrelationId != null) |
| | 2 | 513 | | conditionalAdd(Properties.ContentTypeName, properties.ContentType, properties.ContentType.Value != null) |
| | 2 | 514 | | conditionalAdd(Properties.ContentEncodingName, properties.ContentEncoding, properties.ContentEncoding.Va |
| | 2 | 515 | | conditionalAdd(Properties.AbsoluteExpiryTimeName, properties.AbsoluteExpiryTime, properties.AbsoluteExpi |
| | 2 | 516 | | conditionalAdd(Properties.CreationTimeName, properties.CreationTime, properties.CreationTime != null); |
| | 2 | 517 | | conditionalAdd(Properties.GroupIdName, properties.GroupId, properties.GroupId != null); |
| | 2 | 518 | | conditionalAdd(Properties.GroupSequenceName, properties.GroupSequence, properties.GroupSequence != null) |
| | 2 | 519 | | conditionalAdd(Properties.ReplyToGroupIdName, properties.ReplyToGroupId, properties.ReplyToGroupId != nu |
| | | 520 | | } |
| | | 521 | | |
| | 38 | 522 | | return systemProperties; |
| | | 523 | | } |
| | | 524 | | |
| | | 525 | | /// <summary> |
| | | 526 | | /// Attempts to create an AMQP property value for a given event property. |
| | | 527 | | /// </summary> |
| | | 528 | | /// |
| | | 529 | | /// <param name="eventPropertyValue">The value of the event property to create an AMQP property value for.</para |
| | | 530 | | /// <param name="amqpPropertyValue">The AMQP property value that was created.</param> |
| | | 531 | | /// |
| | | 532 | | /// <returns><c>true</c> if an AMQP property value was able to be created; otherwise, <c>false</c>.</returns> |
| | | 533 | | /// |
| | | 534 | | private static bool TryCreateAmqpPropertyValueForEventProperty(object eventPropertyValue, |
| | | 535 | | out object amqpPropertyValue) |
| | | 536 | | { |
| | 64 | 537 | | amqpPropertyValue = null; |
| | | 538 | | |
| | 64 | 539 | | if (eventPropertyValue == null) |
| | | 540 | | { |
| | 0 | 541 | | return true; |
| | | 542 | | } |
| | | 543 | | |
| | 64 | 544 | | switch (GetTypeIdentifier(eventPropertyValue)) |
| | | 545 | | { |
| | | 546 | | case AmqpProperty.Type.Byte: |
| | | 547 | | case AmqpProperty.Type.SByte: |
| | | 548 | | case AmqpProperty.Type.Int16: |
| | | 549 | | case AmqpProperty.Type.Int32: |
| | | 550 | | case AmqpProperty.Type.Int64: |
| | | 551 | | case AmqpProperty.Type.UInt16: |
| | | 552 | | case AmqpProperty.Type.UInt32: |
| | | 553 | | case AmqpProperty.Type.UInt64: |
| | | 554 | | case AmqpProperty.Type.Single: |
| | | 555 | | case AmqpProperty.Type.Double: |
| | | 556 | | case AmqpProperty.Type.Boolean: |
| | | 557 | | case AmqpProperty.Type.Decimal: |
| | | 558 | | case AmqpProperty.Type.Char: |
| | | 559 | | case AmqpProperty.Type.Guid: |
| | | 560 | | case AmqpProperty.Type.DateTime: |
| | | 561 | | case AmqpProperty.Type.String: |
| | 40 | 562 | | amqpPropertyValue = eventPropertyValue; |
| | 40 | 563 | | break; |
| | | 564 | | |
| | | 565 | | case AmqpProperty.Type.Stream: |
| | 18 | 566 | | case AmqpProperty.Type.Unknown when eventPropertyValue is Stream: |
| | 16 | 567 | | amqpPropertyValue = ReadStreamToArraySegment((Stream)eventPropertyValue); |
| | 16 | 568 | | break; |
| | | 569 | | |
| | | 570 | | case AmqpProperty.Type.Uri: |
| | 2 | 571 | | amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.Uri, ((Uri)eventPropertyValue).Absolut |
| | 2 | 572 | | break; |
| | | 573 | | |
| | | 574 | | case AmqpProperty.Type.DateTimeOffset: |
| | 2 | 575 | | amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.DateTimeOffset, ((DateTimeOffset)event |
| | 2 | 576 | | break; |
| | | 577 | | |
| | | 578 | | case AmqpProperty.Type.TimeSpan: |
| | 2 | 579 | | amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.TimeSpan, ((TimeSpan)eventPropertyValu |
| | 2 | 580 | | break; |
| | | 581 | | |
| | | 582 | | case AmqpProperty.Type.Unknown: |
| | 2 | 583 | | var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, Resources.Faile |
| | 2 | 584 | | EventHubsEventSource.Log.UnexpectedException(exception.Message); |
| | 2 | 585 | | throw exception; |
| | | 586 | | } |
| | | 587 | | |
| | 62 | 588 | | return (amqpPropertyValue != null); |
| | | 589 | | } |
| | | 590 | | |
| | | 591 | | /// <summary> |
| | | 592 | | /// Attempts to create an event property value for a given AMQP property. |
| | | 593 | | /// </summary> |
| | | 594 | | /// |
| | | 595 | | /// <param name="amqpPropertyValue">The value of the AMQP property to create an event property value for.</param |
| | | 596 | | /// <param name="eventPropertyValue">The event property value that was created.</param> |
| | | 597 | | /// |
| | | 598 | | /// <returns><c>true</c> if an event property value was able to be created; otherwise, <c>false</c>.</returns> |
| | | 599 | | /// |
| | | 600 | | private static bool TryCreateEventPropertyForAmqpProperty(object amqpPropertyValue, |
| | | 601 | | out object eventPropertyValue) |
| | | 602 | | { |
| | 104 | 603 | | eventPropertyValue = null; |
| | | 604 | | |
| | 104 | 605 | | if (amqpPropertyValue == null) |
| | | 606 | | { |
| | 0 | 607 | | return true; |
| | | 608 | | } |
| | | 609 | | |
| | | 610 | | // If the property is a simple type, then use it directly. |
| | | 611 | | |
| | 104 | 612 | | switch (GetTypeIdentifier(amqpPropertyValue)) |
| | | 613 | | { |
| | | 614 | | case AmqpProperty.Type.Byte: |
| | | 615 | | case AmqpProperty.Type.SByte: |
| | | 616 | | case AmqpProperty.Type.Int16: |
| | | 617 | | case AmqpProperty.Type.Int32: |
| | | 618 | | case AmqpProperty.Type.Int64: |
| | | 619 | | case AmqpProperty.Type.UInt16: |
| | | 620 | | case AmqpProperty.Type.UInt32: |
| | | 621 | | case AmqpProperty.Type.UInt64: |
| | | 622 | | case AmqpProperty.Type.Single: |
| | | 623 | | case AmqpProperty.Type.Double: |
| | | 624 | | case AmqpProperty.Type.Boolean: |
| | | 625 | | case AmqpProperty.Type.Decimal: |
| | | 626 | | case AmqpProperty.Type.Char: |
| | | 627 | | case AmqpProperty.Type.Guid: |
| | | 628 | | case AmqpProperty.Type.DateTime: |
| | | 629 | | case AmqpProperty.Type.String: |
| | 90 | 630 | | eventPropertyValue = amqpPropertyValue; |
| | 90 | 631 | | return true; |
| | | 632 | | |
| | | 633 | | case AmqpProperty.Type.Unknown: |
| | | 634 | | // An explicitly unknown type will be considered for additional |
| | | 635 | | // scenarios below. |
| | | 636 | | break; |
| | | 637 | | |
| | | 638 | | default: |
| | 0 | 639 | | return false; |
| | | 640 | | } |
| | | 641 | | |
| | | 642 | | // Attempt to parse the value against other well-known value scenarios. |
| | | 643 | | |
| | 14 | 644 | | switch (amqpPropertyValue) |
| | | 645 | | { |
| | | 646 | | case AmqpSymbol symbol: |
| | 0 | 647 | | eventPropertyValue = symbol.Value; |
| | 0 | 648 | | break; |
| | | 649 | | |
| | | 650 | | case byte[] array: |
| | 2 | 651 | | eventPropertyValue = array; |
| | 2 | 652 | | break; |
| | | 653 | | |
| | 4 | 654 | | case ArraySegment<byte> segment when segment.Count == segment.Array.Length: |
| | 2 | 655 | | eventPropertyValue = segment.Array; |
| | 2 | 656 | | break; |
| | | 657 | | |
| | | 658 | | case ArraySegment<byte> segment: |
| | 2 | 659 | | var buffer = new byte[segment.Count]; |
| | 2 | 660 | | Buffer.BlockCopy(segment.Array, segment.Offset, buffer, 0, segment.Count); |
| | 2 | 661 | | eventPropertyValue = buffer; |
| | 2 | 662 | | break; |
| | | 663 | | |
| | 8 | 664 | | case DescribedType described when (described.Descriptor is AmqpSymbol): |
| | 8 | 665 | | eventPropertyValue = TranslateSymbol((AmqpSymbol)described.Descriptor, described.Value); |
| | 8 | 666 | | break; |
| | | 667 | | |
| | | 668 | | default: |
| | 0 | 669 | | var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, Resources.Faile |
| | 0 | 670 | | EventHubsEventSource.Log.UnexpectedException(exception.Message); |
| | 0 | 671 | | throw exception; |
| | | 672 | | } |
| | | 673 | | |
| | 14 | 674 | | return (eventPropertyValue != null); |
| | | 675 | | } |
| | | 676 | | |
| | | 677 | | /// <summary> |
| | | 678 | | /// Gets the AMQP property type identifier for a given |
| | | 679 | | /// value. |
| | | 680 | | /// </summary> |
| | | 681 | | /// |
| | | 682 | | /// <param name="value">The value to determine the type identifier for.</param> |
| | | 683 | | /// |
| | | 684 | | /// <returns>The <see cref="AmqpProperty.Type"/> that was identified for the given <paramref name="value"/>.</re |
| | | 685 | | /// |
| | | 686 | | private static AmqpProperty.Type GetTypeIdentifier(object value) => |
| | 168 | 687 | | (value == null) |
| | 168 | 688 | | ? AmqpProperty.Type.Null |
| | 168 | 689 | | : value.GetType().ToAmqpPropertyType(); |
| | | 690 | | |
| | | 691 | | /// <summary> |
| | | 692 | | /// Translates the AMQP symbol into its corresponding typed value, if it belongs to the |
| | | 693 | | /// set of known types. |
| | | 694 | | /// </summary> |
| | | 695 | | /// |
| | | 696 | | /// <param name="symbol">The symbol to consider.</param> |
| | | 697 | | /// <param name="value">The value of the symbol to translate.</param> |
| | | 698 | | /// |
| | | 699 | | /// <returns>The typed value of the symbol, if it belongs to the well-known set; otherwise, <c>null</c>.</return |
| | | 700 | | /// |
| | | 701 | | private static object TranslateSymbol(AmqpSymbol symbol, |
| | | 702 | | object value) |
| | | 703 | | { |
| | 8 | 704 | | if (symbol.Equals(AmqpProperty.Descriptor.Uri)) |
| | | 705 | | { |
| | 2 | 706 | | return new Uri((string)value); |
| | | 707 | | } |
| | | 708 | | |
| | 6 | 709 | | if (symbol.Equals(AmqpProperty.Descriptor.TimeSpan)) |
| | | 710 | | { |
| | 2 | 711 | | return new TimeSpan((long)value); |
| | | 712 | | } |
| | | 713 | | |
| | 4 | 714 | | if (symbol.Equals(AmqpProperty.Descriptor.DateTimeOffset)) |
| | | 715 | | { |
| | 2 | 716 | | return new DateTimeOffset((long)value, TimeSpan.Zero); |
| | | 717 | | } |
| | | 718 | | |
| | 2 | 719 | | return null; |
| | | 720 | | } |
| | | 721 | | |
| | | 722 | | /// <summary> |
| | | 723 | | /// Converts a stream to an <see cref="ArraySegment{T}" /> representation. |
| | | 724 | | /// </summary> |
| | | 725 | | /// |
| | | 726 | | /// <param name="stream">The stream to read and capture in memory.</param> |
| | | 727 | | /// |
| | | 728 | | /// <returns>The <see cref="ArraySegment{T}" /> containing the stream data.</returns> |
| | | 729 | | /// |
| | | 730 | | private static ArraySegment<byte> ReadStreamToArraySegment(Stream stream) |
| | | 731 | | { |
| | 56 | 732 | | if (stream == null) |
| | | 733 | | { |
| | 0 | 734 | | return new ArraySegment<byte>(); |
| | | 735 | | } |
| | | 736 | | |
| | 56 | 737 | | using var memStream = new MemoryStream(StreamBufferSizeInBytes); |
| | 56 | 738 | | stream.CopyTo(memStream, StreamBufferSizeInBytes); |
| | | 739 | | |
| | 56 | 740 | | return new ArraySegment<byte>(memStream.ToArray()); |
| | 56 | 741 | | } |
| | | 742 | | |
| | | 743 | | /// <summary> |
| | | 744 | | /// Converts a stream to a set of memory bytes. |
| | | 745 | | /// </summary> |
| | | 746 | | /// |
| | | 747 | | /// <param name="stream">The stream to read and capture in memory.</param> |
| | | 748 | | /// |
| | | 749 | | /// <returns>The set of memory bytes containing the stream data.</returns> |
| | | 750 | | /// |
| | | 751 | | private static ReadOnlyMemory<byte> ReadStreamToMemory(Stream stream) |
| | | 752 | | { |
| | 34 | 753 | | if (stream == null) |
| | | 754 | | { |
| | 0 | 755 | | return ReadOnlyMemory<byte>.Empty; |
| | | 756 | | } |
| | | 757 | | |
| | 34 | 758 | | using var memStream = new MemoryStream(StreamBufferSizeInBytes); |
| | 34 | 759 | | stream.CopyTo(memStream, StreamBufferSizeInBytes); |
| | | 760 | | |
| | 34 | 761 | | return new ReadOnlyMemory<byte>(memStream.ToArray()); |
| | 34 | 762 | | } |
| | | 763 | | |
| | | 764 | | /// <summary> |
| | | 765 | | /// The set of system annotations set on a message received from the |
| | | 766 | | /// Event Hubs service. |
| | | 767 | | /// </summary> |
| | | 768 | | /// |
| | | 769 | | private struct ParsedAnnotations |
| | | 770 | | { |
| | | 771 | | /// <summary>The set of weakly typed annotations associated with the message.</summary> |
| | | 772 | | public Dictionary<string, object> ServiceAnnotations; |
| | | 773 | | |
| | | 774 | | /// <summary>The sequence number of the event associated with the message.</summary> |
| | | 775 | | public long? SequenceNumber; |
| | | 776 | | |
| | | 777 | | /// <summary>The offset of the event associated with the message.</summary> |
| | | 778 | | public long? Offset; |
| | | 779 | | |
| | | 780 | | /// <summary>The date and time, in UTC, that the event associated with the message was enqueued.</summary> |
| | | 781 | | public DateTimeOffset? EnqueuedTime; |
| | | 782 | | |
| | | 783 | | /// <summary>The partition key that the event associated with the message was published with.</summary> |
| | | 784 | | public string PartitionKey; |
| | | 785 | | |
| | | 786 | | /// <summary>The sequence number of the event that was last enqueued in the partition.</summary> |
| | | 787 | | public long? LastSequenceNumber; |
| | | 788 | | |
| | | 789 | | /// <summary>The offset of the event that was last enqueued in the partition.</summary> |
| | | 790 | | public long? LastOffset; |
| | | 791 | | |
| | | 792 | | /// <summary>The date and time, in UTC, that an event was last enqueued in the partition.</summary> |
| | | 793 | | public DateTimeOffset? LastEnqueuedTime; |
| | | 794 | | |
| | | 795 | | /// <summary>The date and time, in UTC, that the last enqueued event information was retrieved from the serv |
| | | 796 | | public DateTimeOffset? LastReceivedTime; |
| | | 797 | | } |
| | | 798 | | } |
| | | 799 | | } |