< Summary

Class:Azure.Messaging.ServiceBus.Amqp.AmqpMessageConverter
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpMessageConverter.cs
Covered lines:99
Uncovered lines:188
Coverable lines:287
Total lines:699
Line coverage:34.4% (99 of 287)
Covered branches:119
Total branches:235
Branch coverage:50.6% (119 of 235)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
BatchSBMessagesAsAmqpMessage(...)-100%100%
BuildAmqpBatchFromMessage(...)-68.75%0%
BuildAmqpBatchFromMessages(...)-47.62%50%
ReadStreamToArraySegment(...)-0%0%
SBMessageToAmqpMessage(...)-86.21%80%
AmqpMessageToSBMessage(...)-66.1%58.33%
GetRuleDescriptionMap(...)-0%0%
GetRuleDescription(...)-0%100%
GetFilter(...)-0%0%
GetRuleAction(...)-0%0%
TryGetAmqpObjectFromNetObject(...)-22.22%55.26%
TryGetNetObjectFromAmqpObject(...)-18.18%52.17%
StreamToBytes(...)-0%0%
ToData(...)-0%100%
GetSqlRuleFilterMap(...)-0%100%
GetCorrelationRuleFilterMap(...)-0%0%
GetRuleActionMap(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpMessageConverter.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections;
 6using System.Collections.Generic;
 7using System.IO;
 8using System.Linq;
 9using System.Runtime.Serialization;
 10using Azure.Core;
 11using Azure.Messaging.ServiceBus.Amqp.Framing;
 12using Azure.Messaging.ServiceBus.Management;
 13using Azure.Messaging.ServiceBus.Primitives;
 14using Microsoft.Azure.Amqp;
 15using Microsoft.Azure.Amqp.Encoding;
 16using Microsoft.Azure.Amqp.Framing;
 17using SBMessage = Azure.Messaging.ServiceBus.ServiceBusMessage;
 18
 19namespace Azure.Messaging.ServiceBus.Amqp
 20{
 21    internal static class AmqpMessageConverter
 22    {
 23        private const string EnqueuedTimeUtcName = "x-opt-enqueued-time";
 24        private const string ScheduledEnqueueTimeUtcName = "x-opt-scheduled-enqueue-time";
 25        private const string SequenceNumberName = "x-opt-sequence-number";
 26        private const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number";
 27        private const string LockedUntilName = "x-opt-locked-until";
 28        private const string PartitionKeyName = "x-opt-partition-key";
 29        private const string PartitionIdName = "x-opt-partition-id";
 30        private const string ViaPartitionKeyName = "x-opt-via-partition-key";
 31        private const string DeadLetterSourceName = "x-opt-deadletter-source";
 32        private const string TimeSpanName = AmqpConstants.Vendor + ":timespan";
 33        private const string UriName = AmqpConstants.Vendor + ":uri";
 34        private const string DateTimeOffsetName = AmqpConstants.Vendor + ":datetime-offset";
 35        private const int GuidSize = 16;
 36
 37        /// <summary>The size, in bytes, to use as a buffer for stream operations.</summary>
 38        private const int StreamBufferSizeInBytes = 512;
 39
 40        public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<SBMessage> source)
 41        {
 2642            Argument.AssertNotNull(source, nameof(source));
 2643            return BuildAmqpBatchFromMessage(source);
 44        }
 45
 46        /// <summary>
 47        ///   Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="SBMessage" />
 48        ///   optionally propagating the custom properties.
 49        /// </summary>
 50        ///
 51        /// <param name="source">The set of messages to use as the body of the batch message.</param>
 52        ///
 53        /// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns>
 54        ///
 55        private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<SBMessage> source)
 56        {
 2657            AmqpMessage firstAmqpMessage = null;
 2658            SBMessage firstMessage = null;
 59
 2660            return BuildAmqpBatchFromMessages(
 2661                source.Select(sbMessage =>
 2662                {
 063                    if (firstAmqpMessage == null)
 2664                    {
 065                        firstAmqpMessage = SBMessageToAmqpMessage(sbMessage);
 066                        firstMessage = sbMessage;
 067                        return firstAmqpMessage;
 2668                    }
 2669                    else
 2670                    {
 071                        return SBMessageToAmqpMessage(sbMessage);
 2672                    }
 2673                }).ToList(), firstMessage);
 74        }
 75
 76        /// <summary>
 77        ///   Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="AmqpMessage" />.
 78        /// </summary>
 79        ///
 80        /// <param name="batchMessages">The set of messages to use as the body of the batch message.</param>
 81        /// <param name="firstMessage"></param>
 82        ///
 83        /// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns>
 84        ///
 85        private static AmqpMessage BuildAmqpBatchFromMessages(
 86            IList<AmqpMessage> batchMessages,
 87            SBMessage firstMessage = null)
 88        {
 89            AmqpMessage batchEnvelope;
 90
 2691            if (batchMessages.Count == 1)
 92            {
 093                batchEnvelope = batchMessages[0];
 94            }
 95            else
 96            {
 2697                batchEnvelope = AmqpMessage.Create(batchMessages.Select(message =>
 2698                {
 099                    message.Batchable = true;
 0100                    using var messageStream = message.ToStream();
 0101                    return new Data { Value = ReadStreamToArraySegment(messageStream) };
 0102                }));
 103
 26104                batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
 105            }
 106
 26107            if (firstMessage?.MessageId != null)
 108            {
 0109                batchEnvelope.Properties.MessageId = firstMessage.MessageId;
 110            }
 26111            if (firstMessage?.SessionId != null)
 112            {
 0113                batchEnvelope.Properties.GroupId = firstMessage.SessionId;
 114            }
 115
 26116            if (firstMessage?.PartitionKey != null)
 117            {
 0118                batchEnvelope.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] =
 0119                    firstMessage.PartitionKey;
 120            }
 121
 26122            if (firstMessage?.ViaPartitionKey != null)
 123            {
 0124                batchEnvelope.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] =
 0125                    firstMessage.ViaPartitionKey;
 126            }
 127
 26128            batchEnvelope.Batchable = true;
 26129            return batchEnvelope;
 130        }
 131
 132        /// <summary>
 133        ///   Converts a stream to an <see cref="ArraySegment{T}" /> representation.
 134        /// </summary>
 135        ///
 136        /// <param name="stream">The stream to read and capture in memory.</param>
 137        ///
 138        /// <returns>The <see cref="ArraySegment{T}" /> containing the stream data.</returns>
 139        ///
 140        private static ArraySegment<byte> ReadStreamToArraySegment(Stream stream)
 141        {
 0142            if (stream == null)
 143            {
 0144                return new ArraySegment<byte>();
 145            }
 146
 0147            using var memStream = new MemoryStream(StreamBufferSizeInBytes);
 0148            stream.CopyTo(memStream, StreamBufferSizeInBytes);
 149
 0150            return new ArraySegment<byte>(memStream.ToArray());
 0151        }
 152
 153        public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage)
 154        {
 74155            var amqpMessage = sbMessage.ToAmqpMessage();
 74156            amqpMessage.Properties.MessageId = sbMessage.MessageId;
 74157            amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId;
 74158            amqpMessage.Properties.ContentType = sbMessage.ContentType;
 74159            amqpMessage.Properties.Subject = sbMessage.Label;
 74160            amqpMessage.Properties.To = sbMessage.To;
 74161            amqpMessage.Properties.ReplyTo = sbMessage.ReplyTo;
 74162            amqpMessage.Properties.GroupId = sbMessage.SessionId;
 74163            amqpMessage.Properties.ReplyToGroupId = sbMessage.ReplyToSessionId;
 164
 74165            if (sbMessage.TimeToLive != TimeSpan.MaxValue)
 166            {
 2167                amqpMessage.Header.Ttl = (uint)sbMessage.TimeToLive.TotalMilliseconds;
 2168                amqpMessage.Properties.CreationTime = DateTime.UtcNow;
 169
 2170                if (AmqpConstants.MaxAbsoluteExpiryTime - amqpMessage.Properties.CreationTime.Value > sbMessage.TimeToLi
 171                {
 2172                    amqpMessage.Properties.AbsoluteExpiryTime = amqpMessage.Properties.CreationTime.Value + sbMessage.Ti
 173                }
 174                else
 175                {
 0176                    amqpMessage.Properties.AbsoluteExpiryTime = AmqpConstants.MaxAbsoluteExpiryTime;
 177                }
 178            }
 179
 74180            if ((sbMessage.ScheduledEnqueueTime != null) && sbMessage.ScheduledEnqueueTime > DateTimeOffset.MinValue)
 181            {
 0182                amqpMessage.MessageAnnotations.Map.Add(ScheduledEnqueueTimeUtcName, sbMessage.ScheduledEnqueueTime.UtcDa
 183            }
 184
 74185            if (sbMessage.PartitionKey != null)
 186            {
 2187                amqpMessage.MessageAnnotations.Map.Add(PartitionKeyName, sbMessage.PartitionKey);
 188            }
 189
 74190            if (sbMessage.ViaPartitionKey != null)
 191            {
 2192                amqpMessage.MessageAnnotations.Map.Add(ViaPartitionKeyName, sbMessage.ViaPartitionKey);
 193            }
 194
 74195            if (sbMessage.Properties != null && sbMessage.Properties.Count > 0)
 196            {
 2197                if (amqpMessage.ApplicationProperties == null)
 198                {
 0199                    amqpMessage.ApplicationProperties = new ApplicationProperties();
 200                }
 201
 8202                foreach (var pair in sbMessage.Properties)
 203                {
 2204                    if (TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out var amqpObject))
 205                    {
 2206                        amqpMessage.ApplicationProperties.Map.Add(pair.Key, amqpObject);
 207                    }
 208                    else
 209                    {
 0210                        throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetT
 211                    }
 212                }
 213            }
 214
 74215            return amqpMessage;
 216        }
 217
 218        public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false)
 219        {
 8220            Argument.AssertNotNull(amqpMessage, nameof(amqpMessage));
 221
 8222            ServiceBusReceivedMessage sbMessage = amqpMessage.ToServiceBusReceivedMessage();
 8223            var sections = amqpMessage.Sections;
 8224            if ((sections & SectionFlag.Header) != 0)
 225            {
 6226                if (amqpMessage.Header.Ttl != null)
 227                {
 2228                    sbMessage.SentMessage.TimeToLive = TimeSpan.FromMilliseconds(amqpMessage.Header.Ttl.Value);
 229                }
 230
 6231                if (amqpMessage.Header.DeliveryCount != null)
 232                {
 4233                    sbMessage.DeliveryCount = isPeeked ? (int)(amqpMessage.Header.DeliveryCount.Value) : (int)(amqpMessa
 234                }
 235            }
 236
 8237            if ((sections & SectionFlag.Properties) != 0)
 238            {
 2239                if (amqpMessage.Properties.MessageId != null)
 240                {
 2241                    sbMessage.SentMessage.MessageId = amqpMessage.Properties.MessageId.ToString();
 242                }
 243
 2244                if (amqpMessage.Properties.CorrelationId != null)
 245                {
 2246                    sbMessage.SentMessage.CorrelationId = amqpMessage.Properties.CorrelationId.ToString();
 247                }
 248
 2249                if (amqpMessage.Properties.ContentType.Value != null)
 250                {
 2251                    sbMessage.SentMessage.ContentType = amqpMessage.Properties.ContentType.Value;
 252                }
 253
 2254                if (amqpMessage.Properties.Subject != null)
 255                {
 2256                    sbMessage.SentMessage.Label = amqpMessage.Properties.Subject;
 257                }
 258
 2259                if (amqpMessage.Properties.To != null)
 260                {
 2261                    sbMessage.SentMessage.To = amqpMessage.Properties.To.ToString();
 262                }
 263
 2264                if (amqpMessage.Properties.ReplyTo != null)
 265                {
 2266                    sbMessage.SentMessage.ReplyTo = amqpMessage.Properties.ReplyTo.ToString();
 267                }
 268
 2269                if (amqpMessage.Properties.GroupId != null)
 270                {
 2271                    sbMessage.SentMessage.SessionId = amqpMessage.Properties.GroupId;
 272                }
 273
 2274                if (amqpMessage.Properties.ReplyToGroupId != null)
 275                {
 2276                    sbMessage.SentMessage.ReplyToSessionId = amqpMessage.Properties.ReplyToGroupId;
 277                }
 278            }
 279
 280            // Do application properties before message annotations, because the application properties
 281            // can be updated by entries from message annotation.
 8282            if ((sections & SectionFlag.ApplicationProperties) != 0)
 283            {
 8284                foreach (var pair in amqpMessage.ApplicationProperties.Map)
 285                {
 2286                    if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject))
 287                    {
 2288                        sbMessage.SentMessage.Properties[pair.Key.ToString()] = netObject;
 289                    }
 290                }
 291            }
 292
 8293            if ((sections & SectionFlag.MessageAnnotations) != 0)
 294            {
 12295                foreach (var pair in amqpMessage.MessageAnnotations.Map)
 296                {
 4297                    var key = pair.Key.ToString();
 298                    switch (key)
 299                    {
 300                        case EnqueuedTimeUtcName:
 0301                            sbMessage.EnqueuedTime = (DateTime)pair.Value;
 0302                            break;
 303                        case ScheduledEnqueueTimeUtcName:
 0304                            sbMessage.SentMessage.ScheduledEnqueueTime = (DateTime)pair.Value;
 0305                            break;
 306                        case SequenceNumberName:
 0307                            sbMessage.SequenceNumber = (long)pair.Value;
 0308                            break;
 309                        case EnqueueSequenceNumberName:
 0310                            sbMessage.EnqueuedSequenceNumber = (long)pair.Value;
 0311                            break;
 312                        case LockedUntilName:
 0313                            sbMessage.LockedUntil = (DateTime)pair.Value >= DateTimeOffset.MaxValue.UtcDateTime ?
 0314                                DateTimeOffset.MaxValue : (DateTime)pair.Value;
 0315                            break;
 316                        case PartitionKeyName:
 2317                            sbMessage.SentMessage.PartitionKey = (string)pair.Value;
 2318                            break;
 319                        case PartitionIdName:
 0320                            sbMessage.PartitionId = (short)pair.Value;
 0321                            break;
 322                        case ViaPartitionKeyName:
 2323                            sbMessage.SentMessage.ViaPartitionKey = (string)pair.Value;
 2324                            break;
 325                        case DeadLetterSourceName:
 0326                            sbMessage.DeadLetterSource = (string)pair.Value;
 0327                            break;
 328                        default:
 0329                            if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netOb
 330                            {
 0331                                sbMessage.SentMessage.Properties[key] = netObject;
 332                            }
 333                            break;
 334                    }
 335                }
 336            }
 337
 8338            if (amqpMessage.DeliveryTag.Count == GuidSize)
 339            {
 0340                var guidBuffer = new byte[GuidSize];
 0341                Buffer.BlockCopy(amqpMessage.DeliveryTag.Array, amqpMessage.DeliveryTag.Offset, guidBuffer, 0, GuidSize)
 0342                sbMessage.LockTokenGuid = new Guid(guidBuffer);
 343            }
 344
 8345            amqpMessage.Dispose();
 346
 8347            return sbMessage;
 348        }
 349
 350        public static AmqpMap GetRuleDescriptionMap(RuleProperties description)
 351        {
 0352            var ruleDescriptionMap = new AmqpMap();
 353
 0354            switch (description.Filter)
 355            {
 356                case SqlRuleFilter sqlRuleFilter:
 0357                    var filterMap = GetSqlRuleFilterMap(sqlRuleFilter);
 0358                    ruleDescriptionMap[ManagementConstants.Properties.SqlRuleFilter] = filterMap;
 0359                    break;
 360                case CorrelationRuleFilter correlationFilter:
 0361                    var correlationFilterMap = GetCorrelationRuleFilterMap(correlationFilter);
 0362                    ruleDescriptionMap[ManagementConstants.Properties.CorrelationRuleFilter] = correlationFilterMap;
 0363                    break;
 364                default:
 0365                    throw new NotSupportedException(
 0366                        Resources.RuleFilterNotSupported.FormatForUser(
 0367                            description.Filter.GetType(),
 0368                            nameof(SqlRuleFilter),
 0369                            nameof(CorrelationRuleFilter)));
 370            }
 371
 0372            var amqpAction = GetRuleActionMap(description.Action as SqlRuleAction);
 0373            ruleDescriptionMap[ManagementConstants.Properties.SqlRuleAction] = amqpAction;
 0374            ruleDescriptionMap[ManagementConstants.Properties.RuleName] = description.Name;
 375
 0376            return ruleDescriptionMap;
 377        }
 378
 379        public static RuleProperties GetRuleDescription(AmqpRuleDescriptionCodec amqpDescription)
 380        {
 0381            var filter = GetFilter(amqpDescription.Filter);
 0382            var ruleAction = GetRuleAction(amqpDescription.Action);
 383
 0384            var ruleDescription = new RuleProperties(amqpDescription.RuleName, filter)
 0385            {
 0386                Action = ruleAction
 0387            };
 388
 0389            return ruleDescription;
 390        }
 391
 392        public static RuleFilter GetFilter(AmqpRuleFilterCodec amqpFilter)
 393        {
 394            RuleFilter filter;
 395
 0396            switch (amqpFilter.DescriptorCode)
 397            {
 398                case AmqpSqlRuleFilterCodec.Code:
 0399                    var amqpSqlFilter = (AmqpSqlRuleFilterCodec)amqpFilter;
 0400                    filter = new SqlRuleFilter(amqpSqlFilter.Expression);
 0401                    break;
 402
 403                case AmqpTrueRuleFilterCodec.Code:
 0404                    filter = new TrueRuleFilter();
 0405                    break;
 406
 407                case AmqpFalseRuleFilterCodec.Code:
 0408                    filter = new FalseRuleFilter();
 0409                    break;
 410
 411                case AmqpCorrelationRuleFilterCodec.Code:
 0412                    var amqpCorrelationFilter = (AmqpCorrelationRuleFilterCodec)amqpFilter;
 0413                    var correlationFilter = new CorrelationRuleFilter
 0414                    {
 0415                        CorrelationId = amqpCorrelationFilter.CorrelationId,
 0416                        MessageId = amqpCorrelationFilter.MessageId,
 0417                        To = amqpCorrelationFilter.To,
 0418                        ReplyTo = amqpCorrelationFilter.ReplyTo,
 0419                        Label = amqpCorrelationFilter.Label,
 0420                        SessionId = amqpCorrelationFilter.SessionId,
 0421                        ReplyToSessionId = amqpCorrelationFilter.ReplyToSessionId,
 0422                        ContentType = amqpCorrelationFilter.ContentType
 0423                    };
 424
 0425                    foreach (var property in amqpCorrelationFilter.Properties)
 426                    {
 0427                        correlationFilter.Properties.Add(property.Key.Key.ToString(), property.Value);
 428                    }
 429
 0430                    filter = correlationFilter;
 0431                    break;
 432
 433                default:
 0434                    throw new NotSupportedException($"Unknown filter descriptor code: {amqpFilter.DescriptorCode}");
 435            }
 436
 0437            return filter;
 438        }
 439
 440        private static RuleAction GetRuleAction(AmqpRuleActionCodec amqpAction)
 441        {
 442            RuleAction action;
 443
 0444            if (amqpAction.DescriptorCode == AmqpEmptyRuleActionCodec.Code)
 445            {
 0446                action = null;
 447            }
 0448            else if (amqpAction.DescriptorCode == AmqpSqlRuleActionCodec.Code)
 449            {
 0450                var amqpSqlRuleAction = (AmqpSqlRuleActionCodec)amqpAction;
 0451                var sqlRuleAction = new SqlRuleAction(amqpSqlRuleAction.SqlExpression);
 452
 0453                action = sqlRuleAction;
 454            }
 455            else
 456            {
 0457                throw new NotSupportedException($"Unknown action descriptor code: {amqpAction.DescriptorCode}");
 458            }
 459
 0460            return action;
 461        }
 462
 463        internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType mappingType, out object amqpObj
 464        {
 2465            amqpObject = null;
 2466            if (netObject == null)
 467            {
 0468                return true;
 469            }
 470
 2471            switch (SerializationUtilities.GetTypeId(netObject))
 472            {
 473                case PropertyValueType.Byte:
 474                case PropertyValueType.SByte:
 475                case PropertyValueType.Int16:
 476                case PropertyValueType.Int32:
 477                case PropertyValueType.Int64:
 478                case PropertyValueType.UInt16:
 479                case PropertyValueType.UInt32:
 480                case PropertyValueType.UInt64:
 481                case PropertyValueType.Single:
 482                case PropertyValueType.Double:
 483                case PropertyValueType.Boolean:
 484                case PropertyValueType.Decimal:
 485                case PropertyValueType.Char:
 486                case PropertyValueType.Guid:
 487                case PropertyValueType.DateTime:
 488                case PropertyValueType.String:
 2489                    amqpObject = netObject;
 2490                    break;
 491                case PropertyValueType.Stream:
 0492                    if (mappingType == MappingType.ApplicationProperty)
 493                    {
 0494                        amqpObject = StreamToBytes((Stream)netObject);
 495                    }
 0496                    break;
 497                case PropertyValueType.Uri:
 0498                    amqpObject = new DescribedType((AmqpSymbol)UriName, ((Uri)netObject).AbsoluteUri);
 0499                    break;
 500                case PropertyValueType.DateTimeOffset:
 0501                    amqpObject = new DescribedType((AmqpSymbol)DateTimeOffsetName, ((DateTimeOffset)netObject).UtcTicks)
 0502                    break;
 503                case PropertyValueType.TimeSpan:
 0504                    amqpObject = new DescribedType((AmqpSymbol)TimeSpanName, ((TimeSpan)netObject).Ticks);
 0505                    break;
 506                case PropertyValueType.Unknown:
 0507                    if (netObject is Stream netObjectAsStream)
 508                    {
 0509                        if (mappingType == MappingType.ApplicationProperty)
 510                        {
 0511                            amqpObject = StreamToBytes(netObjectAsStream);
 512                        }
 513                    }
 0514                    else if (mappingType == MappingType.ApplicationProperty)
 515                    {
 0516                        throw new SerializationException(Resources.FailedToSerializeUnsupportedType.FormatForUser(netObj
 517                    }
 0518                    else if (netObject is byte[] netObjectAsByteArray)
 519                    {
 0520                        amqpObject = new ArraySegment<byte>(netObjectAsByteArray);
 521                    }
 0522                    else if (netObject is IList)
 523                    {
 524                        // Array is also an IList
 0525                        amqpObject = netObject;
 526                    }
 0527                    else if (netObject is IDictionary netObjectAsDictionary)
 528                    {
 0529                        amqpObject = new AmqpMap(netObjectAsDictionary);
 530                    }
 531                    break;
 532            }
 533
 2534            return amqpObject != null;
 535        }
 536
 537        private static bool TryGetNetObjectFromAmqpObject(object amqpObject, MappingType mappingType, out object netObje
 538        {
 2539            netObject = null;
 2540            if (amqpObject == null)
 541            {
 0542                return true;
 543            }
 544
 2545            switch (SerializationUtilities.GetTypeId(amqpObject))
 546            {
 547                case PropertyValueType.Byte:
 548                case PropertyValueType.SByte:
 549                case PropertyValueType.Int16:
 550                case PropertyValueType.Int32:
 551                case PropertyValueType.Int64:
 552                case PropertyValueType.UInt16:
 553                case PropertyValueType.UInt32:
 554                case PropertyValueType.UInt64:
 555                case PropertyValueType.Single:
 556                case PropertyValueType.Double:
 557                case PropertyValueType.Boolean:
 558                case PropertyValueType.Decimal:
 559                case PropertyValueType.Char:
 560                case PropertyValueType.Guid:
 561                case PropertyValueType.DateTime:
 562                case PropertyValueType.String:
 2563                    netObject = amqpObject;
 2564                    break;
 565                case PropertyValueType.Unknown:
 0566                    if (amqpObject is AmqpSymbol amqpObjectAsAmqpSymbol)
 567                    {
 0568                        netObject = (amqpObjectAsAmqpSymbol).Value;
 569                    }
 0570                    else if (amqpObject is ArraySegment<byte> amqpObjectAsArraySegment)
 571                    {
 0572                        ArraySegment<byte> binValue = amqpObjectAsArraySegment;
 0573                        if (binValue.Count == binValue.Array.Length)
 574                        {
 0575                            netObject = binValue.Array;
 576                        }
 577                        else
 578                        {
 0579                            var buffer = new byte[binValue.Count];
 0580                            Buffer.BlockCopy(binValue.Array, binValue.Offset, buffer, 0, binValue.Count);
 0581                            netObject = buffer;
 582                        }
 583                    }
 0584                    else if (amqpObject is DescribedType amqpObjectAsDescribedType)
 585                    {
 0586                        if (amqpObjectAsDescribedType.Descriptor is AmqpSymbol)
 587                        {
 0588                            var amqpSymbol = (AmqpSymbol)amqpObjectAsDescribedType.Descriptor;
 0589                            if (amqpSymbol.Equals((AmqpSymbol)UriName))
 590                            {
 0591                                netObject = new Uri((string)amqpObjectAsDescribedType.Value);
 592                            }
 0593                            else if (amqpSymbol.Equals((AmqpSymbol)TimeSpanName))
 594                            {
 0595                                netObject = new TimeSpan((long)amqpObjectAsDescribedType.Value);
 596                            }
 0597                            else if (amqpSymbol.Equals((AmqpSymbol)DateTimeOffsetName))
 598                            {
 0599                                netObject = new DateTimeOffset(new DateTime((long)amqpObjectAsDescribedType.Value, DateT
 600                            }
 601                        }
 602                    }
 0603                    else if (mappingType == MappingType.ApplicationProperty)
 604                    {
 0605                        throw new SerializationException(Resources.FailedToSerializeUnsupportedType.FormatForUser(amqpOb
 606                    }
 0607                    else if (amqpObject is AmqpMap map)
 608                    {
 0609                        var dictionary = new Dictionary<string, object>();
 0610                        foreach (var pair in map)
 611                        {
 0612                            dictionary.Add(pair.Key.ToString(), pair.Value);
 613                        }
 614
 0615                        netObject = dictionary;
 616                    }
 617                    else
 618                    {
 0619                        netObject = amqpObject;
 620                    }
 621                    break;
 622            }
 623
 2624            return netObject != null;
 625        }
 626
 627        private static ArraySegment<byte> StreamToBytes(Stream stream)
 628        {
 629            ArraySegment<byte> buffer;
 0630            if (stream == null || stream.Length < 1)
 631            {
 0632                buffer = default;
 633            }
 634            else
 635            {
 0636                using (var memoryStream = new MemoryStream(512))
 637                {
 0638                    stream.CopyTo(memoryStream, 512);
 0639                    buffer = new ArraySegment<byte>(memoryStream.ToArray());
 0640                }
 641            }
 642
 0643            return buffer;
 644        }
 645
 646        private static Data ToData(AmqpMessage message)
 647        {
 0648            ArraySegment<byte>[] payload = message.GetPayload();
 0649            var buffer = new BufferListStream(payload);
 0650            ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);
 0651            return new Data { Value = value };
 652        }
 653
 654        internal static AmqpMap GetSqlRuleFilterMap(SqlRuleFilter sqlRuleFilter)
 655        {
 0656            var amqpFilterMap = new AmqpMap
 0657            {
 0658                [ManagementConstants.Properties.Expression] = sqlRuleFilter.SqlExpression
 0659            };
 0660            return amqpFilterMap;
 661        }
 662
 663        internal static AmqpMap GetCorrelationRuleFilterMap(CorrelationRuleFilter correlationRuleFilter)
 664        {
 0665            var correlationRuleFilterMap = new AmqpMap
 0666            {
 0667                [ManagementConstants.Properties.CorrelationId] = correlationRuleFilter.CorrelationId,
 0668                [ManagementConstants.Properties.MessageId] = correlationRuleFilter.MessageId,
 0669                [ManagementConstants.Properties.To] = correlationRuleFilter.To,
 0670                [ManagementConstants.Properties.ReplyTo] = correlationRuleFilter.ReplyTo,
 0671                [ManagementConstants.Properties.Label] = correlationRuleFilter.Label,
 0672                [ManagementConstants.Properties.SessionId] = correlationRuleFilter.SessionId,
 0673                [ManagementConstants.Properties.ReplyToSessionId] = correlationRuleFilter.ReplyToSessionId,
 0674                [ManagementConstants.Properties.ContentType] = correlationRuleFilter.ContentType
 0675            };
 676
 0677            var propertiesMap = new AmqpMap();
 0678            foreach (var property in correlationRuleFilter.Properties)
 679            {
 0680                propertiesMap[new MapKey(property.Key)] = property.Value;
 681            }
 682
 0683            correlationRuleFilterMap[ManagementConstants.Properties.CorrelationRuleFilterProperties] = propertiesMap;
 684
 0685            return correlationRuleFilterMap;
 686        }
 687
 688        internal static AmqpMap GetRuleActionMap(SqlRuleAction sqlRuleAction)
 689        {
 0690            AmqpMap ruleActionMap = null;
 0691            if (sqlRuleAction != null)
 692            {
 0693                ruleActionMap = new AmqpMap { [ManagementConstants.Properties.Expression] = sqlRuleAction.SqlExpression 
 694            }
 695
 0696            return ruleActionMap;
 697        }
 698    }
 699}