| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.Linq; |
| | 7 | | using Azure.Core; |
| | 8 | | using Microsoft.Azure.Amqp; |
| | 9 | | using Microsoft.Azure.Amqp.Framing; |
| | 10 | |
|
| | 11 | | namespace Azure.Messaging.ServiceBus.Amqp |
| | 12 | | { |
| | 13 | | internal static class AmqpMessageExtensions |
| | 14 | | { |
| | 15 | | public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) => |
| 74 | 16 | | AmqpMessage.Create(new Data { Value = new ArraySegment<byte>(message.Body.Bytes.IsEmpty ? Array.Empty<byte>( |
| | 17 | |
|
| | 18 | | private static byte[] GetByteArray(this Data data) |
| | 19 | | { |
| 4 | 20 | | switch (data.Value) |
| | 21 | | { |
| | 22 | | case byte[] byteArray: |
| 2 | 23 | | return byteArray; |
| 2 | 24 | | case ArraySegment<byte> arraySegment when arraySegment.Count == arraySegment.Array?.Length: |
| 2 | 25 | | return arraySegment.Array; |
| | 26 | | case ArraySegment<byte> arraySegment: |
| 0 | 27 | | var bytes = new byte[arraySegment.Count]; |
| 0 | 28 | | Array.ConstrainedCopy( |
| 0 | 29 | | sourceArray: arraySegment.Array, |
| 0 | 30 | | sourceIndex: arraySegment.Offset, |
| 0 | 31 | | destinationArray: bytes, |
| 0 | 32 | | destinationIndex: 0, |
| 0 | 33 | | length: arraySegment.Count); |
| 0 | 34 | | return bytes; |
| | 35 | | default: |
| 0 | 36 | | return null; |
| | 37 | | } |
| | 38 | | } |
| | 39 | |
|
| | 40 | | private static IEnumerable<byte[]> GetDataViaDataBody(AmqpMessage message) |
| | 41 | | { |
| 16 | 42 | | foreach (Data data in (message.DataBody ?? Enumerable.Empty<Data>())) |
| | 43 | | { |
| 4 | 44 | | byte[] bytes = data.GetByteArray(); |
| 4 | 45 | | if (bytes != null) |
| | 46 | | { |
| 4 | 47 | | yield return bytes; |
| | 48 | | } |
| | 49 | | } |
| 4 | 50 | | } |
| | 51 | |
|
| | 52 | | // Returns via the out parameter the flattened collection of bytes. |
| | 53 | | // A majority of the time, data will only contain 1 element. |
| | 54 | | // The method is optimized for this situation to return the pre-existing array. |
| | 55 | | private static byte[] ConvertAndFlattenData(IEnumerable<byte[]> data) |
| | 56 | | { |
| 4 | 57 | | byte[] flattened = null; |
| 4 | 58 | | List<byte> flattenedList = null; |
| 4 | 59 | | var dataCount = 0; |
| 16 | 60 | | foreach (byte[] byteArray in data) |
| | 61 | | { |
| | 62 | | // Only the first array is needed if it is the only valid array. |
| | 63 | | // This should be the case 99% of the time. |
| 4 | 64 | | if (dataCount == 0) |
| | 65 | | { |
| 4 | 66 | | flattened = byteArray; |
| | 67 | | } |
| | 68 | | else |
| | 69 | | { |
| | 70 | | // We defer creating this list since this case will rarely happen. |
| 0 | 71 | | flattenedList ??= new List<byte>(flattened!); |
| 0 | 72 | | flattenedList.AddRange(byteArray); |
| | 73 | | } |
| | 74 | |
|
| 4 | 75 | | dataCount++; |
| | 76 | | } |
| | 77 | |
|
| 4 | 78 | | if (dataCount > 1) |
| | 79 | | { |
| 0 | 80 | | flattened = flattenedList!.ToArray(); |
| | 81 | | } |
| | 82 | |
|
| 4 | 83 | | return flattened; |
| | 84 | | } |
| | 85 | |
|
| | 86 | | private static ServiceBusMessage CreateAmqpDataMessage(IEnumerable<byte[]> data) => |
| 4 | 87 | | new ServiceBusMessage(BinaryData.FromMemory(ConvertAndFlattenData(data) ?? ReadOnlyMemory<byte>.Empty)); |
| | 88 | |
|
| | 89 | | public static ServiceBusReceivedMessage ToServiceBusReceivedMessage(this AmqpMessage message) |
| | 90 | | { |
| 8 | 91 | | if ((message.BodyType & SectionFlag.Data) != 0 && message.DataBody != null) |
| | 92 | | { |
| 4 | 93 | | return new ServiceBusReceivedMessage { SentMessage = CreateAmqpDataMessage(GetDataViaDataBody(message)) |
| | 94 | | } |
| | 95 | |
|
| 4 | 96 | | return new ServiceBusReceivedMessage(); |
| | 97 | | } |
| | 98 | | } |
| | 99 | | } |