< Summary

Class:Microsoft.Azure.ServiceBus.Amqp.AmqpMessageConverter
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\Amqp\AmqpMessageConverter.cs
Covered lines:102
Uncovered lines:194
Coverable lines:296
Total lines:703
Line coverage:34.4% (102 of 296)
Covered branches:140
Total branches:259
Branch coverage:54% (140 of 259)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
BatchSBMessagesAsAmqpMessage(...)-0%0%
SBMessageToAmqpMessage(...)-86.21%79.17%
AmqpMessageToSBMessage(...)-70%64.15%
GetRuleDescriptionMap(...)-0%0%
GetRuleDescription(...)-0%100%
GetFilter(...)-0%0%
GetRuleAction(...)-0%0%
TryGetAmqpObjectFromNetObject(...)-22.22%55.26%
TryGetNetObjectFromAmqpObject(...)-45.45%69.57%
StreamToBytes(...)-0%0%
ToData(...)-0%100%
GetSqlFilterMap(...)-0%100%
GetCorrelationFilterMap(...)-0%0%
GetRuleActionMap(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\Amqp\AmqpMessageConverter.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus.Amqp
 5{
 6    using System;
 7    using System.Collections;
 8    using System.Collections.Generic;
 9    using System.IO;
 10    using System.Runtime.Serialization;
 11    using Azure.Amqp;
 12    using Azure.Amqp.Encoding;
 13    using Azure.Amqp.Framing;
 14    using Framing;
 15    using Primitives;
 16    using SBMessage = Message;
 17
 18    static class AmqpMessageConverter
 19    {
 20        const string EnqueuedTimeUtcName = "x-opt-enqueued-time";
 21        const string ScheduledEnqueueTimeUtcName = "x-opt-scheduled-enqueue-time";
 22        const string SequenceNumberName = "x-opt-sequence-number";
 23        const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number";
 24        const string LockedUntilName = "x-opt-locked-until";
 25        const string PublisherName = "x-opt-publisher";
 26        const string PartitionKeyName = "x-opt-partition-key";
 27        const string PartitionIdName = "x-opt-partition-id";
 28        const string ViaPartitionKeyName = "x-opt-via-partition-key";
 29        const string DeadLetterSourceName = "x-opt-deadletter-source";
 30        const string TimeSpanName = AmqpConstants.Vendor + ":timespan";
 31        const string UriName = AmqpConstants.Vendor + ":uri";
 32        const string DateTimeOffsetName = AmqpConstants.Vendor + ":datetime-offset";
 33        const int GuidSize = 16;
 34
 35        public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<SBMessage> sbMessages)
 36        {
 037            if (sbMessages == null)
 38            {
 039                throw Fx.Exception.ArgumentNull(nameof(sbMessages));
 40            }
 41
 42            AmqpMessage amqpMessage;
 043            AmqpMessage firstAmqpMessage = null;
 044            SBMessage firstMessage = null;
 045            List<Data> dataList = null;
 046            var messageCount = 0;
 047            foreach (var sbMessage in sbMessages)
 48            {
 049                messageCount++;
 50
 051                amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(sbMessage);
 052                if (firstAmqpMessage == null)
 53                {
 054                    firstAmqpMessage = amqpMessage;
 055                    firstMessage = sbMessage;
 056                    continue;
 57                }
 58
 059                if (dataList == null)
 60                {
 061                    dataList = new List<Data> { ToData(firstAmqpMessage) };
 62                }
 63
 064                dataList.Add(ToData(amqpMessage));
 65            }
 66
 067            if (messageCount == 1 && firstAmqpMessage != null)
 68            {
 069                firstAmqpMessage.Batchable = true;
 070                return firstAmqpMessage;
 71            }
 72
 073            amqpMessage = AmqpMessage.Create(dataList);
 074            amqpMessage.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
 75
 076            if (firstMessage.MessageId != null)
 77            {
 078                amqpMessage.Properties.MessageId = firstMessage.MessageId;
 79            }
 080            if (firstMessage.SessionId != null)
 81            {
 082                amqpMessage.Properties.GroupId = firstMessage.SessionId;
 83            }
 84
 085            if (firstMessage.PartitionKey != null)
 86            {
 087                amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] =
 088                    firstMessage.PartitionKey;
 89            }
 90
 091            if (firstMessage.ViaPartitionKey != null)
 92            {
 093                amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] =
 094                    firstMessage.ViaPartitionKey;
 95            }
 96
 097            amqpMessage.Batchable = true;
 098            return amqpMessage;
 99        }
 100
 101        public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage)
 102        {
 4103            var amqpMessage = sbMessage.Body == null ? AmqpMessage.Create() : AmqpMessage.Create(new Data { Value = new 
 104
 4105            amqpMessage.Properties.MessageId = sbMessage.MessageId;
 4106            amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId;
 4107            amqpMessage.Properties.ContentType = sbMessage.ContentType;
 4108            amqpMessage.Properties.Subject = sbMessage.Label;
 4109            amqpMessage.Properties.To = sbMessage.To;
 4110            amqpMessage.Properties.ReplyTo = sbMessage.ReplyTo;
 4111            amqpMessage.Properties.GroupId = sbMessage.SessionId;
 4112            amqpMessage.Properties.ReplyToGroupId = sbMessage.ReplyToSessionId;
 113
 4114            if (sbMessage.TimeToLive != TimeSpan.MaxValue)
 115            {
 2116                amqpMessage.Header.Ttl = sbMessage.TimeToLive.TotalMilliseconds < UInt32.MaxValue ? (uint)sbMessage.Time
 2117                amqpMessage.Properties.CreationTime = DateTime.UtcNow;
 118
 2119                if (AmqpConstants.MaxAbsoluteExpiryTime - amqpMessage.Properties.CreationTime.Value > sbMessage.TimeToLi
 120                {
 2121                    amqpMessage.Properties.AbsoluteExpiryTime = amqpMessage.Properties.CreationTime.Value + sbMessage.Ti
 122                }
 123                else
 124                {
 0125                    amqpMessage.Properties.AbsoluteExpiryTime = AmqpConstants.MaxAbsoluteExpiryTime;
 126                }
 127            }
 128
 4129            if ((sbMessage.ScheduledEnqueueTimeUtc != null) && sbMessage.ScheduledEnqueueTimeUtc > DateTime.MinValue)
 130            {
 0131                amqpMessage.MessageAnnotations.Map.Add(ScheduledEnqueueTimeUtcName, sbMessage.ScheduledEnqueueTimeUtc);
 132            }
 133
 4134            if (sbMessage.PartitionKey != null)
 135            {
 2136                amqpMessage.MessageAnnotations.Map.Add(PartitionKeyName, sbMessage.PartitionKey);
 137            }
 138
 4139            if (sbMessage.ViaPartitionKey != null)
 140            {
 2141                amqpMessage.MessageAnnotations.Map.Add(ViaPartitionKeyName, sbMessage.ViaPartitionKey);
 142            }
 143
 4144            if (sbMessage.UserProperties != null && sbMessage.UserProperties.Count > 0)
 145            {
 2146                if (amqpMessage.ApplicationProperties == null)
 147                {
 0148                    amqpMessage.ApplicationProperties = new ApplicationProperties();
 149                }
 150
 8151                foreach (var pair in sbMessage.UserProperties)
 152                {
 2153                    if (TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out var amqpObject))
 154                    {
 2155                        amqpMessage.ApplicationProperties.Map.Add(pair.Key, amqpObject);
 156                    }
 157                    else
 158                    {
 0159                        throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetT
 160                    }
 161                }
 162            }
 163
 4164            return amqpMessage;
 165        }
 166
 167        public static SBMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false)
 168        {
 14169            if (amqpMessage == null)
 170            {
 0171                throw Fx.Exception.ArgumentNull(nameof(amqpMessage));
 172            }
 173
 174            SBMessage sbMessage;
 175
 14176            if ((amqpMessage.BodyType & SectionFlag.AmqpValue) != 0
 14177                && amqpMessage.ValueBody.Value != null)
 178            {
 8179                sbMessage = new SBMessage();
 180
 8181                if (TryGetNetObjectFromAmqpObject(amqpMessage.ValueBody.Value, MappingType.MessageBody, out var dotNetOb
 182                {
 8183                    sbMessage.SystemProperties.BodyObject = dotNetObject;
 184                }
 185                else
 186                {
 0187                    sbMessage.SystemProperties.BodyObject = amqpMessage.ValueBody.Value;
 188                }
 189            }
 6190            else if ((amqpMessage.BodyType & SectionFlag.Data) != 0
 6191                && amqpMessage.DataBody != null)
 192            {
 6193                var dataSegments = new List<byte>();
 24194                foreach (var data in amqpMessage.DataBody)
 195                {
 6196                    if (data.Value is byte[] byteArrayValue)
 197                    {
 2198                        dataSegments.AddRange(byteArrayValue);
 199                    }
 4200                    else if (data.Value is ArraySegment<byte> arraySegmentValue)
 201                    {
 202                        byte[] byteArray;
 4203                        if (arraySegmentValue.Count == arraySegmentValue.Array.Length)
 204                        {
 4205                            byteArray = arraySegmentValue.Array;
 206                        }
 207                        else
 208                        {
 0209                            byteArray = new byte[arraySegmentValue.Count];
 0210                            Array.ConstrainedCopy(arraySegmentValue.Array, arraySegmentValue.Offset, byteArray, 0, array
 211                        }
 4212                        dataSegments.AddRange(byteArray);
 213                    }
 214                }
 6215                sbMessage = new SBMessage(dataSegments.ToArray());
 216            }
 217            else
 218            {
 0219                sbMessage = new SBMessage();
 220            }
 221
 14222            var sections = amqpMessage.Sections;
 14223            if ((sections & SectionFlag.Header) != 0)
 224            {
 6225                if (amqpMessage.Header.Ttl != null)
 226                {
 2227                    sbMessage.TimeToLive = TimeSpan.FromMilliseconds(amqpMessage.Header.Ttl.Value);
 228                }
 229
 6230                if (amqpMessage.Header.DeliveryCount != null)
 231                {
 4232                    sbMessage.SystemProperties.DeliveryCount = isPeeked ? (int)(amqpMessage.Header.DeliveryCount.Value) 
 233                }
 234            }
 235
 14236            if ((sections & SectionFlag.Properties) != 0)
 237            {
 2238                if (amqpMessage.Properties.MessageId != null)
 239                {
 2240                    sbMessage.MessageId = amqpMessage.Properties.MessageId.ToString();
 241                }
 242
 2243                if (amqpMessage.Properties.CorrelationId != null)
 244                {
 2245                    sbMessage.CorrelationId = amqpMessage.Properties.CorrelationId.ToString();
 246                }
 247
 2248                if (amqpMessage.Properties.ContentType.Value != null)
 249                {
 2250                    sbMessage.ContentType = amqpMessage.Properties.ContentType.Value;
 251                }
 252
 2253                if (amqpMessage.Properties.Subject != null)
 254                {
 2255                    sbMessage.Label = amqpMessage.Properties.Subject;
 256                }
 257
 2258                if (amqpMessage.Properties.To != null)
 259                {
 2260                    sbMessage.To = amqpMessage.Properties.To.ToString();
 261                }
 262
 2263                if (amqpMessage.Properties.ReplyTo != null)
 264                {
 2265                    sbMessage.ReplyTo = amqpMessage.Properties.ReplyTo.ToString();
 266                }
 267
 2268                if (amqpMessage.Properties.GroupId != null)
 269                {
 2270                    sbMessage.SessionId = amqpMessage.Properties.GroupId;
 271                }
 272
 2273                if (amqpMessage.Properties.ReplyToGroupId != null)
 274                {
 2275                    sbMessage.ReplyToSessionId = amqpMessage.Properties.ReplyToGroupId;
 276                }
 277
 2278                if (amqpMessage.Properties.CreationTime.HasValue && amqpMessage.Properties.AbsoluteExpiryTime.HasValue)
 279                {
 280                    // Overwrite TimeToLive from AbsoluteExpiryTime
 2281                    sbMessage.TimeToLive = amqpMessage.Properties.AbsoluteExpiryTime.Value - amqpMessage.Properties.Crea
 282                }
 283            }
 284
 285            // Do application properties before message annotations, because the application properties
 286            // can be updated by entries from message annotation.
 14287            if ((sections & SectionFlag.ApplicationProperties) != 0)
 288            {
 8289                foreach (var pair in amqpMessage.ApplicationProperties.Map)
 290                {
 2291                    if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject))
 292                    {
 2293                        sbMessage.UserProperties[pair.Key.ToString()] = netObject;
 294                    }
 295                }
 296            }
 297
 14298            if ((sections & SectionFlag.MessageAnnotations) != 0)
 299            {
 12300                foreach (var pair in amqpMessage.MessageAnnotations.Map)
 301                {
 4302                    var key = pair.Key.ToString();
 303                    switch (key)
 304                    {
 305                        case EnqueuedTimeUtcName:
 0306                            sbMessage.SystemProperties.EnqueuedTimeUtc = (DateTime)pair.Value;
 0307                            break;
 308                        case ScheduledEnqueueTimeUtcName:
 0309                            sbMessage.ScheduledEnqueueTimeUtc = (DateTime)pair.Value;
 0310                            break;
 311                        case SequenceNumberName:
 0312                            sbMessage.SystemProperties.SequenceNumber = (long)pair.Value;
 0313                            break;
 314                        case EnqueueSequenceNumberName:
 0315                            sbMessage.SystemProperties.EnqueuedSequenceNumber = (long)pair.Value;
 0316                            break;
 317                        case LockedUntilName:
 0318                            sbMessage.SystemProperties.LockedUntilUtc = (DateTime)pair.Value;
 0319                            break;
 320                        case PartitionKeyName:
 2321                            sbMessage.PartitionKey = (string)pair.Value;
 2322                            break;
 323                        case PartitionIdName:
 0324                            sbMessage.SystemProperties.PartitionId = (short)pair.Value;
 0325                            break;
 326                        case ViaPartitionKeyName:
 2327                            sbMessage.ViaPartitionKey = (string)pair.Value;
 2328                            break;
 329                        case DeadLetterSourceName:
 0330                            sbMessage.SystemProperties.DeadLetterSource = (string)pair.Value;
 0331                            break;
 332                        default:
 0333                            if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netOb
 334                            {
 0335                                sbMessage.UserProperties[key] = netObject;
 336                            }
 337                            break;
 338                    }
 339                }
 340            }
 341
 14342            if (amqpMessage.DeliveryTag.Count == GuidSize)
 343            {
 0344                var guidBuffer = new byte[GuidSize];
 0345                Buffer.BlockCopy(amqpMessage.DeliveryTag.Array, amqpMessage.DeliveryTag.Offset, guidBuffer, 0, GuidSize)
 0346                sbMessage.SystemProperties.LockTokenGuid = new Guid(guidBuffer);
 347            }
 348
 14349            amqpMessage.Dispose();
 350
 14351            return sbMessage;
 352        }
 353
 354        public static AmqpMap GetRuleDescriptionMap(RuleDescription description)
 355        {
 0356            var ruleDescriptionMap = new AmqpMap();
 357
 0358            switch (description.Filter)
 359            {
 360                case SqlFilter sqlFilter:
 0361                    var filterMap = GetSqlFilterMap(sqlFilter);
 0362                    ruleDescriptionMap[ManagementConstants.Properties.SqlFilter] = filterMap;
 0363                    break;
 364                case CorrelationFilter correlationFilter:
 0365                    var correlationFilterMap = GetCorrelationFilterMap(correlationFilter);
 0366                    ruleDescriptionMap[ManagementConstants.Properties.CorrelationFilter] = correlationFilterMap;
 0367                    break;
 368                default:
 0369                    throw new NotSupportedException(
 0370                        Resources.RuleFilterNotSupported.FormatForUser(
 0371                            description.Filter.GetType(),
 0372                            nameof(SqlFilter),
 0373                            nameof(CorrelationFilter)));
 374            }
 375
 0376            var amqpAction = GetRuleActionMap(description.Action as SqlRuleAction);
 0377            ruleDescriptionMap[ManagementConstants.Properties.SqlRuleAction] = amqpAction;
 0378            ruleDescriptionMap[ManagementConstants.Properties.RuleName] = description.Name;
 379
 0380            return ruleDescriptionMap;
 381        }
 382
 383        public static RuleDescription GetRuleDescription(AmqpRuleDescriptionCodec amqpDescription)
 384        {
 0385            var filter = GetFilter(amqpDescription.Filter);
 0386            var ruleAction = GetRuleAction(amqpDescription.Action);
 387
 0388            var ruleDescription = new RuleDescription(amqpDescription.RuleName, filter)
 0389            {
 0390                Action = ruleAction
 0391            };
 392
 0393            return ruleDescription;
 394        }
 395
 396        public static Filter GetFilter(AmqpFilterCodec amqpFilter)
 397        {
 398            Filter filter;
 399
 0400            switch (amqpFilter.DescriptorCode)
 401            {
 402                case AmqpSqlFilterCodec.Code:
 0403                    var amqpSqlFilter = (AmqpSqlFilterCodec)amqpFilter;
 0404                    filter = new SqlFilter(amqpSqlFilter.Expression);
 0405                    break;
 406
 407                case AmqpTrueFilterCodec.Code:
 0408                    filter = new TrueFilter();
 0409                    break;
 410
 411                case AmqpFalseFilterCodec.Code:
 0412                    filter = new FalseFilter();
 0413                    break;
 414
 415                case AmqpCorrelationFilterCodec.Code:
 0416                    var amqpCorrelationFilter = (AmqpCorrelationFilterCodec)amqpFilter;
 0417                    var correlationFilter = new CorrelationFilter
 0418                    {
 0419                        CorrelationId = amqpCorrelationFilter.CorrelationId,
 0420                        MessageId = amqpCorrelationFilter.MessageId,
 0421                        To = amqpCorrelationFilter.To,
 0422                        ReplyTo = amqpCorrelationFilter.ReplyTo,
 0423                        Label = amqpCorrelationFilter.Label,
 0424                        SessionId = amqpCorrelationFilter.SessionId,
 0425                        ReplyToSessionId = amqpCorrelationFilter.ReplyToSessionId,
 0426                        ContentType = amqpCorrelationFilter.ContentType
 0427                    };
 428
 0429                    foreach (var property in amqpCorrelationFilter.Properties)
 430                    {
 0431                        correlationFilter.Properties.Add(property.Key.Key.ToString(), property.Value);
 432                    }
 433
 0434                    filter = correlationFilter;
 0435                    break;
 436
 437                default:
 0438                    throw new NotSupportedException($"Unknown filter descriptor code: {amqpFilter.DescriptorCode}");
 439            }
 440
 0441            return filter;
 442        }
 443
 444        static RuleAction GetRuleAction(AmqpRuleActionCodec amqpAction)
 445        {
 446            RuleAction action;
 447
 0448            if (amqpAction.DescriptorCode == AmqpEmptyRuleActionCodec.Code)
 449            {
 0450                action = null;
 451            }
 0452            else if (amqpAction.DescriptorCode == AmqpSqlRuleActionCodec.Code)
 453            {
 0454                var amqpSqlAction = (AmqpSqlRuleActionCodec)amqpAction;
 0455                var sqlAction = new SqlRuleAction(amqpSqlAction.SqlExpression);
 456
 0457                action = sqlAction;
 458            }
 459            else
 460            {
 0461                throw new NotSupportedException($"Unknown action descriptor code: {amqpAction.DescriptorCode}");
 462            }
 463
 0464            return action;
 465        }
 466
 467        internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType mappingType, out object amqpObj
 468        {
 2469            amqpObject = null;
 2470            if (netObject == null)
 471            {
 0472                return true;
 473            }
 474
 2475            switch (SerializationUtilities.GetTypeId(netObject))
 476            {
 477                case PropertyValueType.Byte:
 478                case PropertyValueType.SByte:
 479                case PropertyValueType.Int16:
 480                case PropertyValueType.Int32:
 481                case PropertyValueType.Int64:
 482                case PropertyValueType.UInt16:
 483                case PropertyValueType.UInt32:
 484                case PropertyValueType.UInt64:
 485                case PropertyValueType.Single:
 486                case PropertyValueType.Double:
 487                case PropertyValueType.Boolean:
 488                case PropertyValueType.Decimal:
 489                case PropertyValueType.Char:
 490                case PropertyValueType.Guid:
 491                case PropertyValueType.DateTime:
 492                case PropertyValueType.String:
 2493                    amqpObject = netObject;
 2494                    break;
 495                case PropertyValueType.Stream:
 0496                    if (mappingType == MappingType.ApplicationProperty)
 497                    {
 0498                        amqpObject = StreamToBytes((Stream)netObject);
 499                    }
 0500                    break;
 501                case PropertyValueType.Uri:
 0502                    amqpObject = new DescribedType((AmqpSymbol)UriName, ((Uri)netObject).AbsoluteUri);
 0503                    break;
 504                case PropertyValueType.DateTimeOffset:
 0505                    amqpObject = new DescribedType((AmqpSymbol)DateTimeOffsetName, ((DateTimeOffset)netObject).UtcTicks)
 0506                    break;
 507                case PropertyValueType.TimeSpan:
 0508                    amqpObject = new DescribedType((AmqpSymbol)TimeSpanName, ((TimeSpan)netObject).Ticks);
 0509                    break;
 510                case PropertyValueType.Unknown:
 0511                    if (netObject is Stream netObjectAsStream)
 512                    {
 0513                        if (mappingType == MappingType.ApplicationProperty)
 514                        {
 0515                            amqpObject = StreamToBytes(netObjectAsStream);
 516                        }
 517                    }
 0518                    else if (mappingType == MappingType.ApplicationProperty)
 519                    {
 0520                        throw Fx.Exception.AsError(new SerializationException(Resources.FailedToSerializeUnsupportedType
 521                    }
 0522                    else if (netObject is byte[] netObjectAsByteArray)
 523                    {
 0524                        amqpObject = new ArraySegment<byte>(netObjectAsByteArray);
 525                    }
 0526                    else if (netObject is IList)
 527                    {
 528                        // Array is also an IList
 0529                        amqpObject = netObject;
 530                    }
 0531                    else if (netObject is IDictionary netObjectAsDictionary)
 532                    {
 0533                        amqpObject = new AmqpMap(netObjectAsDictionary);
 534                    }
 535                    break;
 536            }
 537
 2538            return amqpObject != null;
 539        }
 540
 541        static bool TryGetNetObjectFromAmqpObject(object amqpObject, MappingType mappingType, out object netObject)
 542        {
 10543            netObject = null;
 10544            if (amqpObject == null)
 545            {
 0546                return true;
 547            }
 548
 10549            switch (SerializationUtilities.GetTypeId(amqpObject))
 550            {
 551                case PropertyValueType.Byte:
 552                case PropertyValueType.SByte:
 553                case PropertyValueType.Int16:
 554                case PropertyValueType.Int32:
 555                case PropertyValueType.Int64:
 556                case PropertyValueType.UInt16:
 557                case PropertyValueType.UInt32:
 558                case PropertyValueType.UInt64:
 559                case PropertyValueType.Single:
 560                case PropertyValueType.Double:
 561                case PropertyValueType.Boolean:
 562                case PropertyValueType.Decimal:
 563                case PropertyValueType.Char:
 564                case PropertyValueType.Guid:
 565                case PropertyValueType.DateTime:
 566                case PropertyValueType.String:
 2567                    netObject = amqpObject;
 2568                    break;
 569                case PropertyValueType.Unknown:
 8570                    if (amqpObject is AmqpSymbol amqpObjectAsAmqpSymbol)
 571                    {
 0572                        netObject = (amqpObjectAsAmqpSymbol).Value;
 573                    }
 8574                    else if (amqpObject is ArraySegment<byte> amqpObjectAsArraySegment)
 575                    {
 6576                        ArraySegment<byte> binValue = amqpObjectAsArraySegment;
 6577                        if (binValue.Count == binValue.Array.Length)
 578                        {
 6579                            netObject = binValue.Array;
 580                        }
 581                        else
 582                        {
 0583                            var buffer = new byte[binValue.Count];
 0584                            Buffer.BlockCopy(binValue.Array, binValue.Offset, buffer, 0, binValue.Count);
 0585                            netObject = buffer;
 586                        }
 587                    }
 2588                    else if (amqpObject is DescribedType amqpObjectAsDescribedType)
 589                    {
 0590                        if (amqpObjectAsDescribedType.Descriptor is AmqpSymbol)
 591                        {
 0592                            var amqpSymbol = (AmqpSymbol)amqpObjectAsDescribedType.Descriptor;
 0593                            if (amqpSymbol.Equals((AmqpSymbol)UriName))
 594                            {
 0595                                netObject = new Uri((string)amqpObjectAsDescribedType.Value);
 596                            }
 0597                            else if (amqpSymbol.Equals((AmqpSymbol)TimeSpanName))
 598                            {
 0599                                netObject = new TimeSpan((long)amqpObjectAsDescribedType.Value);
 600                            }
 0601                            else if (amqpSymbol.Equals((AmqpSymbol)DateTimeOffsetName))
 602                            {
 0603                                netObject = new DateTimeOffset(new DateTime((long)amqpObjectAsDescribedType.Value, DateT
 604                            }
 605                        }
 606                    }
 2607                    else if (mappingType == MappingType.ApplicationProperty)
 608                    {
 0609                        throw Fx.Exception.AsError(new SerializationException(Resources.FailedToSerializeUnsupportedType
 610                    }
 2611                    else if (amqpObject is AmqpMap map)
 612                    {
 0613                        var dictionary = new Dictionary<string, object>();
 0614                        foreach (var pair in map)
 615                        {
 0616                            dictionary.Add(pair.Key.ToString(), pair.Value);
 617                        }
 618
 0619                        netObject = dictionary;
 620                    }
 621                    else
 622                    {
 2623                        netObject = amqpObject;
 624                    }
 625                    break;
 626            }
 627
 10628            return netObject != null;
 629        }
 630
 631        static ArraySegment<byte> StreamToBytes(Stream stream)
 632        {
 633            ArraySegment<byte> buffer;
 0634            if (stream == null || stream.Length < 1)
 635            {
 0636                buffer = default;
 637            }
 638            else
 639            {
 0640                using (var memoryStream = new MemoryStream(512))
 641                {
 0642                    stream.CopyTo(memoryStream, 512);
 0643                    buffer = new ArraySegment<byte>(memoryStream.ToArray());
 0644                }
 645            }
 646
 0647            return buffer;
 648        }
 649
 650        private static Data ToData(AmqpMessage message)
 651        {
 0652            ArraySegment<byte>[] payload = message.GetPayload();
 0653            var buffer = new BufferListStream(payload);
 0654            ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);
 0655            return new Data { Value = value };
 656        }
 657
 658        static AmqpMap GetSqlFilterMap(SqlFilter sqlFilter)
 659        {
 0660            var amqpFilterMap = new AmqpMap
 0661            {
 0662                [ManagementConstants.Properties.Expression] = sqlFilter.SqlExpression
 0663            };
 0664            return amqpFilterMap;
 665        }
 666
 667        static AmqpMap GetCorrelationFilterMap(CorrelationFilter correlationFilter)
 668        {
 0669            var correlationFilterMap = new AmqpMap
 0670            {
 0671                [ManagementConstants.Properties.CorrelationId] = correlationFilter.CorrelationId,
 0672                [ManagementConstants.Properties.MessageId] = correlationFilter.MessageId,
 0673                [ManagementConstants.Properties.To] = correlationFilter.To,
 0674                [ManagementConstants.Properties.ReplyTo] = correlationFilter.ReplyTo,
 0675                [ManagementConstants.Properties.Label] = correlationFilter.Label,
 0676                [ManagementConstants.Properties.SessionId] = correlationFilter.SessionId,
 0677                [ManagementConstants.Properties.ReplyToSessionId] = correlationFilter.ReplyToSessionId,
 0678                [ManagementConstants.Properties.ContentType] = correlationFilter.ContentType
 0679            };
 680
 0681            var propertiesMap = new AmqpMap();
 0682            foreach (var property in correlationFilter.Properties)
 683            {
 0684                propertiesMap[new MapKey(property.Key)] = property.Value;
 685            }
 686
 0687            correlationFilterMap[ManagementConstants.Properties.CorrelationFilterProperties] = propertiesMap;
 688
 0689            return correlationFilterMap;
 690        }
 691
 692        static AmqpMap GetRuleActionMap(SqlRuleAction sqlRuleAction)
 693        {
 0694            AmqpMap ruleActionMap = null;
 0695            if (sqlRuleAction != null)
 696            {
 0697                ruleActionMap = new AmqpMap { [ManagementConstants.Properties.Expression] = sqlRuleAction.SqlExpression 
 698            }
 699
 0700            return ruleActionMap;
 701        }
 702    }
 703}