| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.Globalization; |
| | 7 | | using System.IO; |
| | 8 | | using System.Linq; |
| | 9 | | using System.Runtime.Serialization; |
| | 10 | | using Azure.Core; |
| | 11 | | using Azure.Messaging.EventHubs.Diagnostics; |
| | 12 | | using Microsoft.Azure.Amqp; |
| | 13 | | using Microsoft.Azure.Amqp.Encoding; |
| | 14 | | using Microsoft.Azure.Amqp.Framing; |
| | 15 | |
|
| | 16 | | namespace Azure.Messaging.EventHubs.Amqp |
| | 17 | | { |
| | 18 | | /// <summary> |
| | 19 | | /// Serves as a translator between client types and AMQP messages for |
| | 20 | | /// communication with the Event Hubs service. |
| | 21 | | /// </summary> |
| | 22 | | /// |
| | 23 | | internal class AmqpMessageConverter |
| | 24 | | { |
| | 25 | | /// <summary>The size, in bytes, to use as a buffer for stream operations.</summary> |
| | 26 | | private const int StreamBufferSizeInBytes = 512; |
| | 27 | |
|
| | 28 | | /// <summary> |
| | 29 | | /// Converts a given <see cref="EventData" /> source into its corresponding |
| | 30 | | /// AMQP representation. |
| | 31 | | /// </summary> |
| | 32 | | /// |
| | 33 | | /// <param name="source">The source event to convert.</param> |
| | 34 | | /// <param name="partitionKey">The partition key to associate with the batch, if any.</param> |
| | 35 | | /// |
| | 36 | | /// <returns>The AMQP message that represents the converted event data.</returns> |
| | 37 | | /// |
| | 38 | | /// <remarks> |
| | 39 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | 40 | | /// ensuring proper disposal. |
| | 41 | | /// </remarks> |
| | 42 | | /// |
| | 43 | | public virtual AmqpMessage CreateMessageFromEvent(EventData source, |
| | 44 | | string partitionKey = null) |
| | 45 | | { |
| 82 | 46 | | Argument.AssertNotNull(source, nameof(source)); |
| 80 | 47 | | return BuildAmqpMessageFromEvent(source, partitionKey); |
| | 48 | | } |
| | 49 | |
|
| | 50 | | /// <summary> |
| | 51 | | /// Converts a given set of <see cref="EventData" /> instances into a batch AMQP representation. |
| | 52 | | /// </summary> |
| | 53 | | /// |
| | 54 | | /// <param name="source">The set of source events to convert.</param> |
| | 55 | | /// <param name="partitionKey">The partition key to associate with the batch, if any.</param> |
| | 56 | | /// |
| | 57 | | /// <returns>The AMQP message that represents a batch containing the converted set of event data.</returns> |
| | 58 | | /// |
| | 59 | | /// <remarks> |
| | 60 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | 61 | | /// ensuring proper disposal. |
| | 62 | | /// </remarks> |
| | 63 | | /// |
| | 64 | | public virtual AmqpMessage CreateBatchFromEvents(IEnumerable<EventData> source, |
| | 65 | | string partitionKey = null) |
| | 66 | | { |
| 142 | 67 | | Argument.AssertNotNull(source, nameof(source)); |
| 140 | 68 | | return BuildAmqpBatchFromEvents(source, partitionKey); |
| | 69 | | } |
| | 70 | |
|
| | 71 | | /// <summary> |
| | 72 | | /// Converts a given set of <see cref="AmqpMessage" /> instances into a batch AMQP representation. |
| | 73 | | /// </summary> |
| | 74 | | /// |
| | 75 | | /// <param name="source">The set of source messages to convert.</param> |
| | 76 | | /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci |
| | 77 | | /// |
| | 78 | | /// <returns>The AMQP message that represents a batch containing the converted set of event data.</returns> |
| | 79 | | /// |
| | 80 | | /// <remarks> |
| | 81 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | 82 | | /// ensuring proper disposal. |
| | 83 | | /// </remarks> |
| | 84 | | /// |
| | 85 | | public virtual AmqpMessage CreateBatchFromMessages(IEnumerable<AmqpMessage> source, |
| | 86 | | string partitionKey = null) |
| | 87 | | { |
| 24 | 88 | | Argument.AssertNotNull(source, nameof(source)); |
| 22 | 89 | | return BuildAmqpBatchFromMessages(source, partitionKey); |
| | 90 | | } |
| | 91 | |
|
| | 92 | | /// <summary> |
| | 93 | | /// Converts a given <see cref="AmqpMessage" /> source into its corresponding |
| | 94 | | /// <see cref="EventData" /> representation. |
| | 95 | | /// </summary> |
| | 96 | | /// |
| | 97 | | /// <param name="source">The source message to convert.</param> |
| | 98 | | /// |
| | 99 | | /// <returns>The event that represents the converted AMQP message.</returns> |
| | 100 | | /// |
| | 101 | | /// <remarks> |
| | 102 | | /// The caller is assumed to hold ownership over the specified message, including |
| | 103 | | /// ensuring proper disposal. |
| | 104 | | /// </remarks> |
| | 105 | | /// |
| | 106 | | public virtual EventData CreateEventFromMessage(AmqpMessage source) |
| | 107 | | { |
| 40 | 108 | | Argument.AssertNotNull(source, nameof(source)); |
| 38 | 109 | | return BuildEventFromAmqpMessage(source); |
| | 110 | | } |
| | 111 | |
|
| | 112 | | /// <summary> |
| | 113 | | /// Creates an <see cref="AmqpMessage" /> for use in requesting the properties of an Event Hub. |
| | 114 | | /// </summary> |
| | 115 | | /// |
| | 116 | | /// <param name="eventHubName">The name of the Event Hub to query the properties of.</param> |
| | 117 | | /// <param name="managementAuthorizationToken">The bearer token to use for authorization of management operation |
| | 118 | | /// |
| | 119 | | /// <returns>The AMQP message for issuing the request.</returns> |
| | 120 | | /// |
| | 121 | | /// <remarks> |
| | 122 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | 123 | | /// ensuring proper disposal. |
| | 124 | | /// </remarks> |
| | 125 | | /// |
| | 126 | | public virtual AmqpMessage CreateEventHubPropertiesRequest(string eventHubName, |
| | 127 | | string managementAuthorizationToken) |
| | 128 | | { |
| 10 | 129 | | Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); |
| 6 | 130 | | Argument.AssertNotNullOrEmpty(managementAuthorizationToken, nameof(managementAuthorizationToken)); |
| | 131 | |
|
| 2 | 132 | | var request = AmqpMessage.Create(); |
| 2 | 133 | | request.ApplicationProperties = new ApplicationProperties(); |
| 2 | 134 | | request.ApplicationProperties.Map[AmqpManagement.ResourceNameKey] = eventHubName; |
| 2 | 135 | | request.ApplicationProperties.Map[AmqpManagement.OperationKey] = AmqpManagement.ReadOperationValue; |
| 2 | 136 | | request.ApplicationProperties.Map[AmqpManagement.ResourceTypeKey] = AmqpManagement.EventHubResourceTypeValue |
| 2 | 137 | | request.ApplicationProperties.Map[AmqpManagement.SecurityTokenKey] = managementAuthorizationToken; |
| | 138 | |
|
| 2 | 139 | | return request; |
| | 140 | | } |
| | 141 | |
|
| | 142 | | /// <summary> |
| | 143 | | /// Converts a given <see cref="AmqpMessage" /> response to a query for Event Hub properties into the |
| | 144 | | /// corresponding <see cref="EventHubProperties" /> representation. |
| | 145 | | /// </summary> |
| | 146 | | /// |
| | 147 | | /// <param name="response">The response message to convert.</param> |
| | 148 | | /// |
| | 149 | | /// <returns>The set of properties represented by the response.</returns> |
| | 150 | | /// |
| | 151 | | /// <remarks> |
| | 152 | | /// The caller is assumed to hold ownership over the specified message, including |
| | 153 | | /// ensuring proper disposal. |
| | 154 | | /// </remarks> |
| | 155 | | /// |
| | 156 | | public virtual EventHubProperties CreateEventHubPropertiesFromResponse(AmqpMessage response) |
| | 157 | | { |
| 8 | 158 | | Argument.AssertNotNull(response, nameof(response)); |
| | 159 | |
|
| 6 | 160 | | if (!(response.ValueBody?.Value is AmqpMap responseData)) |
| | 161 | | { |
| 0 | 162 | | throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidMessageBo |
| | 163 | | } |
| | 164 | |
|
| 2 | 165 | | return new EventHubProperties( |
| 2 | 166 | | (string)responseData[AmqpManagement.ResponseMap.Name], |
| 2 | 167 | | new DateTimeOffset((DateTime)responseData[AmqpManagement.ResponseMap.CreatedAt], TimeSpan.Zero), |
| 2 | 168 | | (string[])responseData[AmqpManagement.ResponseMap.PartitionIdentifiers]); |
| | 169 | | } |
| | 170 | |
|
| | 171 | | /// <summary> |
| | 172 | | /// Creates an <see cref="AmqpMessage" /> for use in requesting the properties of an Event Hub partition. |
| | 173 | | /// </summary> |
| | 174 | | /// |
| | 175 | | /// <param name="eventHubName">The name of the Event Hub to query the properties of.</param> |
| | 176 | | /// <param name="partitionIdentifier">The identifier of the partition to query the properties of.</param> |
| | 177 | | /// <param name="managementAuthorizationToken">The bearer token to use for authorization of management operation |
| | 178 | | /// |
| | 179 | | /// <returns>The AMQP message for issuing the request.</returns> |
| | 180 | | /// |
| | 181 | | /// <remarks> |
| | 182 | | /// The caller is assumed to hold ownership over the message once it has been created, including |
| | 183 | | /// ensuring proper disposal. |
| | 184 | | /// </remarks> |
| | 185 | | /// |
| | 186 | | public virtual AmqpMessage CreatePartitionPropertiesRequest(string eventHubName, |
| | 187 | | string partitionIdentifier, |
| | 188 | | string managementAuthorizationToken) |
| | 189 | | { |
| 14 | 190 | | Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); |
| 10 | 191 | | Argument.AssertNotNullOrEmpty(partitionIdentifier, nameof(partitionIdentifier)); |
| 6 | 192 | | Argument.AssertNotNullOrEmpty(managementAuthorizationToken, nameof(managementAuthorizationToken)); |
| | 193 | |
|
| 2 | 194 | | var request = AmqpMessage.Create(); |
| 2 | 195 | | request.ApplicationProperties = new ApplicationProperties(); |
| 2 | 196 | | request.ApplicationProperties.Map[AmqpManagement.ResourceNameKey] = eventHubName; |
| 2 | 197 | | request.ApplicationProperties.Map[AmqpManagement.PartitionNameKey] = partitionIdentifier; |
| 2 | 198 | | request.ApplicationProperties.Map[AmqpManagement.OperationKey] = AmqpManagement.ReadOperationValue; |
| 2 | 199 | | request.ApplicationProperties.Map[AmqpManagement.ResourceTypeKey] = AmqpManagement.PartitionResourceTypeValu |
| 2 | 200 | | request.ApplicationProperties.Map[AmqpManagement.SecurityTokenKey] = managementAuthorizationToken; |
| | 201 | |
|
| 2 | 202 | | return request; |
| | 203 | | } |
| | 204 | |
|
| | 205 | | /// <summary> |
| | 206 | | /// Converts a given <see cref="AmqpMessage" /> response to a query for Event Hub partition properties into |
| | 207 | | /// the corresponding <see cref="PartitionProperties" /> representation. |
| | 208 | | /// </summary> |
| | 209 | | /// |
| | 210 | | /// <param name="response">The response message to convert.</param> |
| | 211 | | /// |
| | 212 | | /// <returns>The set of properties represented by the response.</returns> |
| | 213 | | /// |
| | 214 | | /// <remarks> |
| | 215 | | /// The caller is assumed to hold ownership over the specified message, including |
| | 216 | | /// ensuring proper disposal. |
| | 217 | | /// </remarks> |
| | 218 | | /// |
| | 219 | | public virtual PartitionProperties CreatePartitionPropertiesFromResponse(AmqpMessage response) |
| | 220 | | { |
| 8 | 221 | | Argument.AssertNotNull(response, nameof(response)); |
| | 222 | |
|
| 6 | 223 | | if (!(response.ValueBody?.Value is AmqpMap responseData)) |
| | 224 | | { |
| 0 | 225 | | throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidMessageBo |
| | 226 | | } |
| | 227 | |
|
| 2 | 228 | | return new PartitionProperties( |
| 2 | 229 | | (string)responseData[AmqpManagement.ResponseMap.Name], |
| 2 | 230 | | (string)responseData[AmqpManagement.ResponseMap.PartitionIdentifier], |
| 2 | 231 | | (bool)responseData[AmqpManagement.ResponseMap.PartitionRuntimeInfoPartitionIsEmpty], |
| 2 | 232 | | (long)responseData[AmqpManagement.ResponseMap.PartitionBeginSequenceNumber], |
| 2 | 233 | | (long)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedSequenceNumber], |
| 2 | 234 | | long.Parse((string)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedOffset], NumberStyles.In |
| 2 | 235 | | new DateTimeOffset((DateTime)responseData[AmqpManagement.ResponseMap.PartitionLastEnqueuedTimeUtc], Time |
| | 236 | | } |
| | 237 | |
|
| | 238 | | /// <summary> |
| | 239 | | /// Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="EventData" /> |
| | 240 | | /// optionally propagating the custom properties. |
| | 241 | | /// </summary> |
| | 242 | | /// |
| | 243 | | /// <param name="source">The set of events to use as the body of the batch message.</param> |
| | 244 | | /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci |
| | 245 | | /// |
| | 246 | | /// <returns>The batch <see cref="AmqpMessage" /> containing the source events.</returns> |
| | 247 | | /// |
| | 248 | | private static AmqpMessage BuildAmqpBatchFromEvents(IEnumerable<EventData> source, |
| | 249 | | string partitionKey) => |
| 140 | 250 | | BuildAmqpBatchFromMessages( |
| 230 | 251 | | source.Select(eventData => BuildAmqpMessageFromEvent(eventData, partitionKey)), |
| 140 | 252 | | partitionKey); |
| | 253 | |
|
| | 254 | | /// <summary> |
| | 255 | | /// Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="AmqpMessage" />. |
| | 256 | | /// </summary> |
| | 257 | | /// |
| | 258 | | /// <param name="source">The set of messages to use as the body of the batch message.</param> |
| | 259 | | /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci |
| | 260 | | /// |
| | 261 | | /// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns> |
| | 262 | | /// |
| | 263 | | private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable<AmqpMessage> source, |
| | 264 | | string partitionKey) |
| | 265 | | { |
| | 266 | | AmqpMessage batchEnvelope; |
| | 267 | |
|
| 162 | 268 | | var batchMessages = source.ToList(); |
| | 269 | |
|
| 162 | 270 | | if (batchMessages.Count == 1) |
| | 271 | | { |
| 82 | 272 | | batchEnvelope = batchMessages[0]; |
| | 273 | | } |
| | 274 | | else |
| | 275 | | { |
| 80 | 276 | | batchEnvelope = AmqpMessage.Create(batchMessages.Select(message => |
| 80 | 277 | | { |
| 120 | 278 | | message.Batchable = true; |
| 120 | 279 | | using var messageStream = message.ToStream(); |
| 120 | 280 | | return new Data { Value = ReadStreamToArraySegment(messageStream) }; |
| 120 | 281 | | })); |
| | 282 | |
|
| 80 | 283 | | batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; |
| | 284 | | } |
| | 285 | |
|
| 162 | 286 | | if (!string.IsNullOrEmpty(partitionKey)) |
| | 287 | | { |
| 62 | 288 | | batchEnvelope.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey; |
| | 289 | | } |
| | 290 | |
|
| 162 | 291 | | batchEnvelope.Batchable = true; |
| 162 | 292 | | return batchEnvelope; |
| | 293 | | } |
| | 294 | |
|
| | 295 | | /// <summary> |
| | 296 | | /// Builds an <see cref="AmqpMessage" /> from an <see cref="EventData" />. |
| | 297 | | /// </summary> |
| | 298 | | /// |
| | 299 | | /// <param name="source">The event to use as the source of the message.</param> |
| | 300 | | /// <param name="partitionKey">The partition key to annotate the AMQP message with; if no partition key is speci |
| | 301 | | /// |
| | 302 | | /// <returns>The <see cref="AmqpMessage" /> constructed from the source event.</returns> |
| | 303 | | /// |
| | 304 | | private static AmqpMessage BuildAmqpMessageFromEvent(EventData source, |
| | 305 | | string partitionKey) |
| | 306 | | { |
| 170 | 307 | | var body = new ArraySegment<byte>((source.Body.IsEmpty) ? Array.Empty<byte>() : source.Body.ToArray()); |
| 170 | 308 | | var message = AmqpMessage.Create(new Data { Value = body }); |
| | 309 | |
|
| 170 | 310 | | if (source.Properties?.Count > 0) |
| | 311 | | { |
| 34 | 312 | | message.ApplicationProperties ??= new ApplicationProperties(); |
| | 313 | |
|
| 194 | 314 | | foreach (KeyValuePair<string, object> pair in source.Properties) |
| | 315 | | { |
| 64 | 316 | | if (TryCreateAmqpPropertyValueForEventProperty(pair.Value, out var amqpValue)) |
| | 317 | | { |
| 62 | 318 | | message.ApplicationProperties.Map[pair.Key] = amqpValue; |
| | 319 | | } |
| | 320 | | } |
| | 321 | | } |
| | 322 | |
|
| 168 | 323 | | if (!string.IsNullOrEmpty(partitionKey)) |
| | 324 | | { |
| 30 | 325 | | message.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey; |
| | 326 | | } |
| | 327 | |
|
| 168 | 328 | | return message; |
| | 329 | | } |
| | 330 | |
|
| | 331 | | /// <summary> |
| | 332 | | /// Builds an <see cref="EventData" /> from an <see cref="AmqpMessage" />. |
| | 333 | | /// </summary> |
| | 334 | | /// |
| | 335 | | /// <param name="source">The message to use as the source of the event.</param> |
| | 336 | | /// |
| | 337 | | /// <returns>The <see cref="EventData" /> constructed from the source message.</returns> |
| | 338 | | /// |
| | 339 | | private static EventData BuildEventFromAmqpMessage(AmqpMessage source) |
| | 340 | | { |
| 38 | 341 | | ReadOnlyMemory<byte> body = (source.BodyType.HasFlag(SectionFlag.Data)) |
| 38 | 342 | | ? ReadStreamToMemory(source.BodyStream) |
| 38 | 343 | | : ReadOnlyMemory<byte>.Empty; |
| | 344 | |
|
| 38 | 345 | | ParsedAnnotations systemAnnotations = ParseSystemAnnotations(source); |
| | 346 | |
|
| | 347 | | // If there were application properties associated with the message, translate them |
| | 348 | | // to the event. |
| | 349 | |
|
| 38 | 350 | | var properties = new Dictionary<string, object>(); |
| | 351 | |
|
| 38 | 352 | | if (source.Sections.HasFlag(SectionFlag.ApplicationProperties)) |
| | 353 | | { |
| 176 | 354 | | foreach (KeyValuePair<MapKey, object> pair in source.ApplicationProperties.Map) |
| | 355 | | { |
| 62 | 356 | | if (TryCreateEventPropertyForAmqpProperty(pair.Value, out object propertyValue)) |
| | 357 | | { |
| 60 | 358 | | properties[pair.Key.ToString()] = propertyValue; |
| | 359 | | } |
| | 360 | | } |
| | 361 | | } |
| | 362 | |
|
| 38 | 363 | | return new EventData( |
| 38 | 364 | | eventBody: body, |
| 38 | 365 | | properties: properties, |
| 38 | 366 | | systemProperties: systemAnnotations.ServiceAnnotations, |
| 38 | 367 | | sequenceNumber: systemAnnotations.SequenceNumber ?? long.MinValue, |
| 38 | 368 | | offset: systemAnnotations.Offset ?? long.MinValue, |
| 38 | 369 | | enqueuedTime: systemAnnotations.EnqueuedTime ?? default, |
| 38 | 370 | | partitionKey: systemAnnotations.PartitionKey, |
| 38 | 371 | | lastPartitionSequenceNumber: systemAnnotations.LastSequenceNumber, |
| 38 | 372 | | lastPartitionOffset: systemAnnotations.LastOffset, |
| 38 | 373 | | lastPartitionEnqueuedTime: systemAnnotations.LastEnqueuedTime, |
| 38 | 374 | | lastPartitionPropertiesRetrievalTime: systemAnnotations.LastReceivedTime); |
| | 375 | | } |
| | 376 | |
|
| | 377 | | /// <summary> |
| | 378 | | /// Parses the annotations set by the Event Hubs service on the <see cref="AmqpMessage"/> |
| | 379 | | /// associated with an event, extracting them into a consumable form. |
| | 380 | | /// </summary> |
| | 381 | | /// |
| | 382 | | /// <param name="source">The message to use as the source of the event.</param> |
| | 383 | | /// |
| | 384 | | /// <returns>The <see cref="ParsedAnnotations" /> parsed from the source message.</returns> |
| | 385 | | /// |
| | 386 | | private static ParsedAnnotations ParseSystemAnnotations(AmqpMessage source) |
| | 387 | | { |
| 38 | 388 | | var systemProperties = new ParsedAnnotations |
| 38 | 389 | | { |
| 38 | 390 | | ServiceAnnotations = new Dictionary<string, object>() |
| 38 | 391 | | }; |
| | 392 | |
|
| | 393 | | object amqpValue; |
| | 394 | | object propertyValue; |
| | 395 | |
|
| | 396 | | // Process the message annotations. |
| | 397 | |
|
| 38 | 398 | | if (source.Sections.HasFlag(SectionFlag.MessageAnnotations)) |
| | 399 | | { |
| 12 | 400 | | Annotations annotations = source.MessageAnnotations.Map; |
| 12 | 401 | | var processed = new HashSet<string>(); |
| | 402 | |
|
| 12 | 403 | | if ((annotations.TryGetValue(AmqpProperty.EnqueuedTime, out amqpValue)) |
| 12 | 404 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | 405 | | { |
| 8 | 406 | | systemProperties.EnqueuedTime = propertyValue switch |
| 8 | 407 | | { |
| 10 | 408 | | DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero), |
| 14 | 409 | | long longValue => new DateTimeOffset(longValue, TimeSpan.Zero), |
| 0 | 410 | | _ => (DateTimeOffset)propertyValue |
| 8 | 411 | | }; |
| | 412 | |
|
| 8 | 413 | | processed.Add(AmqpProperty.EnqueuedTime.ToString()); |
| | 414 | | } |
| | 415 | |
|
| 12 | 416 | | if ((annotations.TryGetValue(AmqpProperty.SequenceNumber, out amqpValue)) |
| 12 | 417 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | 418 | | { |
| 4 | 419 | | systemProperties.SequenceNumber = (long)propertyValue; |
| 4 | 420 | | processed.Add(AmqpProperty.SequenceNumber.ToString()); |
| | 421 | | } |
| | 422 | |
|
| 12 | 423 | | if ((annotations.TryGetValue(AmqpProperty.Offset, out amqpValue)) |
| 12 | 424 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)) |
| 12 | 425 | | && (long.TryParse((string)propertyValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var |
| | 426 | | { |
| 6 | 427 | | systemProperties.Offset = offset; |
| 6 | 428 | | processed.Add(AmqpProperty.Offset.ToString()); |
| | 429 | | } |
| | 430 | |
|
| 12 | 431 | | if ((annotations.TryGetValue(AmqpProperty.PartitionKey, out amqpValue)) |
| 12 | 432 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | 433 | | { |
| 4 | 434 | | systemProperties.PartitionKey = (string)propertyValue; |
| 4 | 435 | | processed.Add(AmqpProperty.PartitionKey.ToString()); |
| | 436 | | } |
| | 437 | |
|
| | 438 | | string key; |
| | 439 | |
|
| 76 | 440 | | foreach (KeyValuePair<MapKey, object> pair in annotations) |
| | 441 | | { |
| 26 | 442 | | key = pair.Key.ToString(); |
| | 443 | |
|
| 26 | 444 | | if ((!processed.Contains(key)) |
| 26 | 445 | | && (TryCreateEventPropertyForAmqpProperty(pair.Value, out propertyValue))) |
| | 446 | | { |
| 4 | 447 | | systemProperties.ServiceAnnotations.Add(key, propertyValue); |
| 4 | 448 | | processed.Add(key); |
| | 449 | | } |
| | 450 | | } |
| | 451 | | } |
| | 452 | |
|
| | 453 | | // Process the delivery annotations. |
| | 454 | |
|
| 38 | 455 | | if (source.Sections.HasFlag(SectionFlag.DeliveryAnnotations)) |
| | 456 | | { |
| 10 | 457 | | if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedTimeUtc, out amqpValue |
| 10 | 458 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | 459 | | { |
| 6 | 460 | | systemProperties.LastEnqueuedTime = propertyValue switch |
| 6 | 461 | | { |
| 8 | 462 | | DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero), |
| 10 | 463 | | long longValue => new DateTimeOffset(longValue, TimeSpan.Zero), |
| 0 | 464 | | _ => (DateTimeOffset)propertyValue |
| 6 | 465 | | }; |
| | 466 | | } |
| | 467 | |
|
| 10 | 468 | | if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedSequenceNumber, out am |
| 10 | 469 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | 470 | | { |
| 2 | 471 | | systemProperties.LastSequenceNumber = (long)propertyValue; |
| | 472 | | } |
| | 473 | |
|
| 10 | 474 | | if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedOffset, out amqpValue) |
| 10 | 475 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)) |
| 10 | 476 | | && (long.TryParse((string)propertyValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var |
| | 477 | | { |
| 2 | 478 | | systemProperties.LastOffset = offset; |
| | 479 | | } |
| | 480 | |
|
| 10 | 481 | | if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.LastPartitionPropertiesRetrievalTimeUtc, ou |
| 10 | 482 | | && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))) |
| | 483 | | { |
| 6 | 484 | | systemProperties.LastReceivedTime = propertyValue switch |
| 6 | 485 | | { |
| 8 | 486 | | DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero), |
| 10 | 487 | | long longValue => new DateTimeOffset(longValue, TimeSpan.Zero), |
| 0 | 488 | | _ => (DateTimeOffset)propertyValue |
| 6 | 489 | | }; |
| | 490 | | } |
| | 491 | | } |
| | 492 | |
|
| | 493 | | // Process the properties annotations |
| | 494 | |
|
| 38 | 495 | | if (source.Sections.HasFlag(SectionFlag.Properties)) |
| | 496 | | { |
| 2 | 497 | | Properties properties = source.Properties; |
| | 498 | |
|
| | 499 | | void conditionalAdd(string name, object value, bool condition) |
| | 500 | | { |
| 26 | 501 | | if (condition) |
| | 502 | | { |
| 2 | 503 | | systemProperties.ServiceAnnotations.Add(name, value); |
| | 504 | | } |
| 26 | 505 | | } |
| | 506 | |
|
| 2 | 507 | | conditionalAdd(Properties.MessageIdName, properties.MessageId, properties.MessageId != null); |
| 2 | 508 | | conditionalAdd(Properties.UserIdName, properties.UserId, properties.UserId.Array != null); |
| 2 | 509 | | conditionalAdd(Properties.ToName, properties.To, properties.To != null); |
| 2 | 510 | | conditionalAdd(Properties.SubjectName, properties.Subject, properties.Subject != null); |
| 2 | 511 | | conditionalAdd(Properties.ReplyToName, properties.ReplyTo, properties.ReplyTo != null); |
| 2 | 512 | | conditionalAdd(Properties.CorrelationIdName, properties.CorrelationId, properties.CorrelationId != null) |
| 2 | 513 | | conditionalAdd(Properties.ContentTypeName, properties.ContentType, properties.ContentType.Value != null) |
| 2 | 514 | | conditionalAdd(Properties.ContentEncodingName, properties.ContentEncoding, properties.ContentEncoding.Va |
| 2 | 515 | | conditionalAdd(Properties.AbsoluteExpiryTimeName, properties.AbsoluteExpiryTime, properties.AbsoluteExpi |
| 2 | 516 | | conditionalAdd(Properties.CreationTimeName, properties.CreationTime, properties.CreationTime != null); |
| 2 | 517 | | conditionalAdd(Properties.GroupIdName, properties.GroupId, properties.GroupId != null); |
| 2 | 518 | | conditionalAdd(Properties.GroupSequenceName, properties.GroupSequence, properties.GroupSequence != null) |
| 2 | 519 | | conditionalAdd(Properties.ReplyToGroupIdName, properties.ReplyToGroupId, properties.ReplyToGroupId != nu |
| | 520 | | } |
| | 521 | |
|
| 38 | 522 | | return systemProperties; |
| | 523 | | } |
| | 524 | |
|
| | 525 | | /// <summary> |
| | 526 | | /// Attempts to create an AMQP property value for a given event property. |
| | 527 | | /// </summary> |
| | 528 | | /// |
| | 529 | | /// <param name="eventPropertyValue">The value of the event property to create an AMQP property value for.</para |
| | 530 | | /// <param name="amqpPropertyValue">The AMQP property value that was created.</param> |
| | 531 | | /// |
| | 532 | | /// <returns><c>true</c> if an AMQP property value was able to be created; otherwise, <c>false</c>.</returns> |
| | 533 | | /// |
| | 534 | | private static bool TryCreateAmqpPropertyValueForEventProperty(object eventPropertyValue, |
| | 535 | | out object amqpPropertyValue) |
| | 536 | | { |
| 64 | 537 | | amqpPropertyValue = null; |
| | 538 | |
|
| 64 | 539 | | if (eventPropertyValue == null) |
| | 540 | | { |
| 0 | 541 | | return true; |
| | 542 | | } |
| | 543 | |
|
| 64 | 544 | | switch (GetTypeIdentifier(eventPropertyValue)) |
| | 545 | | { |
| | 546 | | case AmqpProperty.Type.Byte: |
| | 547 | | case AmqpProperty.Type.SByte: |
| | 548 | | case AmqpProperty.Type.Int16: |
| | 549 | | case AmqpProperty.Type.Int32: |
| | 550 | | case AmqpProperty.Type.Int64: |
| | 551 | | case AmqpProperty.Type.UInt16: |
| | 552 | | case AmqpProperty.Type.UInt32: |
| | 553 | | case AmqpProperty.Type.UInt64: |
| | 554 | | case AmqpProperty.Type.Single: |
| | 555 | | case AmqpProperty.Type.Double: |
| | 556 | | case AmqpProperty.Type.Boolean: |
| | 557 | | case AmqpProperty.Type.Decimal: |
| | 558 | | case AmqpProperty.Type.Char: |
| | 559 | | case AmqpProperty.Type.Guid: |
| | 560 | | case AmqpProperty.Type.DateTime: |
| | 561 | | case AmqpProperty.Type.String: |
| 40 | 562 | | amqpPropertyValue = eventPropertyValue; |
| 40 | 563 | | break; |
| | 564 | |
|
| | 565 | | case AmqpProperty.Type.Stream: |
| 18 | 566 | | case AmqpProperty.Type.Unknown when eventPropertyValue is Stream: |
| 16 | 567 | | amqpPropertyValue = ReadStreamToArraySegment((Stream)eventPropertyValue); |
| 16 | 568 | | break; |
| | 569 | |
|
| | 570 | | case AmqpProperty.Type.Uri: |
| 2 | 571 | | amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.Uri, ((Uri)eventPropertyValue).Absolut |
| 2 | 572 | | break; |
| | 573 | |
|
| | 574 | | case AmqpProperty.Type.DateTimeOffset: |
| 2 | 575 | | amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.DateTimeOffset, ((DateTimeOffset)event |
| 2 | 576 | | break; |
| | 577 | |
|
| | 578 | | case AmqpProperty.Type.TimeSpan: |
| 2 | 579 | | amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.TimeSpan, ((TimeSpan)eventPropertyValu |
| 2 | 580 | | break; |
| | 581 | |
|
| | 582 | | case AmqpProperty.Type.Unknown: |
| 2 | 583 | | var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, Resources.Faile |
| 2 | 584 | | EventHubsEventSource.Log.UnexpectedException(exception.Message); |
| 2 | 585 | | throw exception; |
| | 586 | | } |
| | 587 | |
|
| 62 | 588 | | return (amqpPropertyValue != null); |
| | 589 | | } |
| | 590 | |
|
| | 591 | | /// <summary> |
| | 592 | | /// Attempts to create an event property value for a given AMQP property. |
| | 593 | | /// </summary> |
| | 594 | | /// |
| | 595 | | /// <param name="amqpPropertyValue">The value of the AMQP property to create an event property value for.</param |
| | 596 | | /// <param name="eventPropertyValue">The event property value that was created.</param> |
| | 597 | | /// |
| | 598 | | /// <returns><c>true</c> if an event property value was able to be created; otherwise, <c>false</c>.</returns> |
| | 599 | | /// |
| | 600 | | private static bool TryCreateEventPropertyForAmqpProperty(object amqpPropertyValue, |
| | 601 | | out object eventPropertyValue) |
| | 602 | | { |
| 104 | 603 | | eventPropertyValue = null; |
| | 604 | |
|
| 104 | 605 | | if (amqpPropertyValue == null) |
| | 606 | | { |
| 0 | 607 | | return true; |
| | 608 | | } |
| | 609 | |
|
| | 610 | | // If the property is a simple type, then use it directly. |
| | 611 | |
|
| 104 | 612 | | switch (GetTypeIdentifier(amqpPropertyValue)) |
| | 613 | | { |
| | 614 | | case AmqpProperty.Type.Byte: |
| | 615 | | case AmqpProperty.Type.SByte: |
| | 616 | | case AmqpProperty.Type.Int16: |
| | 617 | | case AmqpProperty.Type.Int32: |
| | 618 | | case AmqpProperty.Type.Int64: |
| | 619 | | case AmqpProperty.Type.UInt16: |
| | 620 | | case AmqpProperty.Type.UInt32: |
| | 621 | | case AmqpProperty.Type.UInt64: |
| | 622 | | case AmqpProperty.Type.Single: |
| | 623 | | case AmqpProperty.Type.Double: |
| | 624 | | case AmqpProperty.Type.Boolean: |
| | 625 | | case AmqpProperty.Type.Decimal: |
| | 626 | | case AmqpProperty.Type.Char: |
| | 627 | | case AmqpProperty.Type.Guid: |
| | 628 | | case AmqpProperty.Type.DateTime: |
| | 629 | | case AmqpProperty.Type.String: |
| 90 | 630 | | eventPropertyValue = amqpPropertyValue; |
| 90 | 631 | | return true; |
| | 632 | |
|
| | 633 | | case AmqpProperty.Type.Unknown: |
| | 634 | | // An explicitly unknown type will be considered for additional |
| | 635 | | // scenarios below. |
| | 636 | | break; |
| | 637 | |
|
| | 638 | | default: |
| 0 | 639 | | return false; |
| | 640 | | } |
| | 641 | |
|
| | 642 | | // Attempt to parse the value against other well-known value scenarios. |
| | 643 | |
|
| 14 | 644 | | switch (amqpPropertyValue) |
| | 645 | | { |
| | 646 | | case AmqpSymbol symbol: |
| 0 | 647 | | eventPropertyValue = symbol.Value; |
| 0 | 648 | | break; |
| | 649 | |
|
| | 650 | | case byte[] array: |
| 2 | 651 | | eventPropertyValue = array; |
| 2 | 652 | | break; |
| | 653 | |
|
| 4 | 654 | | case ArraySegment<byte> segment when segment.Count == segment.Array.Length: |
| 2 | 655 | | eventPropertyValue = segment.Array; |
| 2 | 656 | | break; |
| | 657 | |
|
| | 658 | | case ArraySegment<byte> segment: |
| 2 | 659 | | var buffer = new byte[segment.Count]; |
| 2 | 660 | | Buffer.BlockCopy(segment.Array, segment.Offset, buffer, 0, segment.Count); |
| 2 | 661 | | eventPropertyValue = buffer; |
| 2 | 662 | | break; |
| | 663 | |
|
| 8 | 664 | | case DescribedType described when (described.Descriptor is AmqpSymbol): |
| 8 | 665 | | eventPropertyValue = TranslateSymbol((AmqpSymbol)described.Descriptor, described.Value); |
| 8 | 666 | | break; |
| | 667 | |
|
| | 668 | | default: |
| 0 | 669 | | var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, Resources.Faile |
| 0 | 670 | | EventHubsEventSource.Log.UnexpectedException(exception.Message); |
| 0 | 671 | | throw exception; |
| | 672 | | } |
| | 673 | |
|
| 14 | 674 | | return (eventPropertyValue != null); |
| | 675 | | } |
| | 676 | |
|
| | 677 | | /// <summary> |
| | 678 | | /// Gets the AMQP property type identifier for a given |
| | 679 | | /// value. |
| | 680 | | /// </summary> |
| | 681 | | /// |
| | 682 | | /// <param name="value">The value to determine the type identifier for.</param> |
| | 683 | | /// |
| | 684 | | /// <returns>The <see cref="AmqpProperty.Type"/> that was identified for the given <paramref name="value"/>.</re |
| | 685 | | /// |
| | 686 | | private static AmqpProperty.Type GetTypeIdentifier(object value) => |
| 168 | 687 | | (value == null) |
| 168 | 688 | | ? AmqpProperty.Type.Null |
| 168 | 689 | | : value.GetType().ToAmqpPropertyType(); |
| | 690 | |
|
| | 691 | | /// <summary> |
| | 692 | | /// Translates the AMQP symbol into its corresponding typed value, if it belongs to the |
| | 693 | | /// set of known types. |
| | 694 | | /// </summary> |
| | 695 | | /// |
| | 696 | | /// <param name="symbol">The symbol to consider.</param> |
| | 697 | | /// <param name="value">The value of the symbol to translate.</param> |
| | 698 | | /// |
| | 699 | | /// <returns>The typed value of the symbol, if it belongs to the well-known set; otherwise, <c>null</c>.</return |
| | 700 | | /// |
| | 701 | | private static object TranslateSymbol(AmqpSymbol symbol, |
| | 702 | | object value) |
| | 703 | | { |
| 8 | 704 | | if (symbol.Equals(AmqpProperty.Descriptor.Uri)) |
| | 705 | | { |
| 2 | 706 | | return new Uri((string)value); |
| | 707 | | } |
| | 708 | |
|
| 6 | 709 | | if (symbol.Equals(AmqpProperty.Descriptor.TimeSpan)) |
| | 710 | | { |
| 2 | 711 | | return new TimeSpan((long)value); |
| | 712 | | } |
| | 713 | |
|
| 4 | 714 | | if (symbol.Equals(AmqpProperty.Descriptor.DateTimeOffset)) |
| | 715 | | { |
| 2 | 716 | | return new DateTimeOffset((long)value, TimeSpan.Zero); |
| | 717 | | } |
| | 718 | |
|
| 2 | 719 | | return null; |
| | 720 | | } |
| | 721 | |
|
| | 722 | | /// <summary> |
| | 723 | | /// Converts a stream to an <see cref="ArraySegment{T}" /> representation. |
| | 724 | | /// </summary> |
| | 725 | | /// |
| | 726 | | /// <param name="stream">The stream to read and capture in memory.</param> |
| | 727 | | /// |
| | 728 | | /// <returns>The <see cref="ArraySegment{T}" /> containing the stream data.</returns> |
| | 729 | | /// |
| | 730 | | private static ArraySegment<byte> ReadStreamToArraySegment(Stream stream) |
| | 731 | | { |
| 56 | 732 | | if (stream == null) |
| | 733 | | { |
| 0 | 734 | | return new ArraySegment<byte>(); |
| | 735 | | } |
| | 736 | |
|
| 56 | 737 | | using var memStream = new MemoryStream(StreamBufferSizeInBytes); |
| 56 | 738 | | stream.CopyTo(memStream, StreamBufferSizeInBytes); |
| | 739 | |
|
| 56 | 740 | | return new ArraySegment<byte>(memStream.ToArray()); |
| 56 | 741 | | } |
| | 742 | |
|
| | 743 | | /// <summary> |
| | 744 | | /// Converts a stream to a set of memory bytes. |
| | 745 | | /// </summary> |
| | 746 | | /// |
| | 747 | | /// <param name="stream">The stream to read and capture in memory.</param> |
| | 748 | | /// |
| | 749 | | /// <returns>The set of memory bytes containing the stream data.</returns> |
| | 750 | | /// |
| | 751 | | private static ReadOnlyMemory<byte> ReadStreamToMemory(Stream stream) |
| | 752 | | { |
| 34 | 753 | | if (stream == null) |
| | 754 | | { |
| 0 | 755 | | return ReadOnlyMemory<byte>.Empty; |
| | 756 | | } |
| | 757 | |
|
| 34 | 758 | | using var memStream = new MemoryStream(StreamBufferSizeInBytes); |
| 34 | 759 | | stream.CopyTo(memStream, StreamBufferSizeInBytes); |
| | 760 | |
|
| 34 | 761 | | return new ReadOnlyMemory<byte>(memStream.ToArray()); |
| 34 | 762 | | } |
| | 763 | |
|
| | 764 | | /// <summary> |
| | 765 | | /// The set of system annotations set on a message received from the |
| | 766 | | /// Event Hubs service. |
| | 767 | | /// </summary> |
| | 768 | | /// |
| | 769 | | private struct ParsedAnnotations |
| | 770 | | { |
| | 771 | | /// <summary>The set of weakly typed annotations associated with the message.</summary> |
| | 772 | | public Dictionary<string, object> ServiceAnnotations; |
| | 773 | |
|
| | 774 | | /// <summary>The sequence number of the event associated with the message.</summary> |
| | 775 | | public long? SequenceNumber; |
| | 776 | |
|
| | 777 | | /// <summary>The offset of the event associated with the message.</summary> |
| | 778 | | public long? Offset; |
| | 779 | |
|
| | 780 | | /// <summary>The date and time, in UTC, that the event associated with the message was enqueued.</summary> |
| | 781 | | public DateTimeOffset? EnqueuedTime; |
| | 782 | |
|
| | 783 | | /// <summary>The partition key that the event associated with the message was published with.</summary> |
| | 784 | | public string PartitionKey; |
| | 785 | |
|
| | 786 | | /// <summary>The sequence number of the event that was last enqueued in the partition.</summary> |
| | 787 | | public long? LastSequenceNumber; |
| | 788 | |
|
| | 789 | | /// <summary>The offset of the event that was last enqueued in the partition.</summary> |
| | 790 | | public long? LastOffset; |
| | 791 | |
|
| | 792 | | /// <summary>The date and time, in UTC, that an event was last enqueued in the partition.</summary> |
| | 793 | | public DateTimeOffset? LastEnqueuedTime; |
| | 794 | |
|
| | 795 | | /// <summary>The date and time, in UTC, that the last enqueued event information was retrieved from the serv |
| | 796 | | public DateTimeOffset? LastReceivedTime; |
| | 797 | | } |
| | 798 | | } |
| | 799 | | } |