< Summary

Class:Microsoft.Azure.ServiceBus.Core.MessageSender
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\Core\MessageSender.cs
Covered lines:32
Uncovered lines:291
Coverable lines:323
Total lines:782
Line coverage:9.9% (32 of 323)
Covered branches:9
Total branches:122
Branch coverage:7.3% (9 of 122)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%0%
.ctor(...)-0%0%
.ctor(...)-0%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-87.5%64.29%
get_RegisteredPlugins()-0%100%
get_Path()-100%100%
get_TransferDestinationPath()-100%100%
get_ViaEntityPath()-100%100%
get_OperationTimeout()-0%100%
set_OperationTimeout(...)-0%100%
get_ServiceBusConnection()-100%100%
get_EntityType()-0%100%
get_SendingLinkDestination()-0%100%
get_CbsTokenProvider()-100%100%
get_SendLinkManager()-0%100%
get_RequestResponseLinkManager()-0%100%
SendAsync(...)-0%100%
SendAsync()-0%0%
ScheduleMessageAsync()-0%0%
<ScheduleMessageAsync()-0%100%
CancelScheduledMessageAsync()-0%0%
RegisterPlugin(...)-0%0%
UnregisterPlugin(...)-0%0%
ExecuteRequestResponseAsync()-0%0%
OnClosingAsync()-0%100%
ValidateMessages(...)-0%0%
ValidateMessage(...)-0%0%
CloseSession(...)-0%100%
CloseRequestResponseSession(...)-0%100%
ProcessMessage()-0%0%
ProcessMessages()-0%0%
OnSendAsync()-0%0%
OnScheduleMessageAsync()-0%0%
OnCancelScheduledMessageAsync()-0%0%
CreateLinkAsync()-0%0%
CreateRequestResponseLinkAsync()-0%0%
GetNextDeliveryTag()-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\Core\MessageSender.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus.Core
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Diagnostics;
 9    using System.Linq;
 10    using System.Threading;
 11    using System.Threading.Tasks;
 12    using System.Transactions;
 13    using Microsoft.Azure.Amqp;
 14    using Microsoft.Azure.Amqp.Encoding;
 15    using Microsoft.Azure.Amqp.Framing;
 16    using Microsoft.Azure.ServiceBus.Amqp;
 17    using Microsoft.Azure.ServiceBus.Primitives;
 18
 19    /// <summary>
 20    /// The MessageSender can be used to send messages to Queues or Topics.
 21    /// </summary>
 22    /// <example>
 23    /// Create a new MessageSender to send to a Queue
 24    /// <code>
 25    /// IMessageSender messageSender = new MessageSender(
 26    ///     namespaceConnectionString,
 27    ///     queueName)
 28    /// </code>
 29    ///
 30    /// Send message
 31    /// <code>
 32    /// byte[] data = GetData();
 33    /// await messageSender.SendAsync(data);
 34    /// </code>
 35    /// </example>
 36    /// <remarks>This uses AMQP protocol to communicate with service.</remarks>
 37    public class MessageSender : ClientEntity, IMessageSender
 38    {
 39        int deliveryCount;
 40        readonly ActiveClientLinkManager clientLinkManager;
 41        readonly ServiceBusDiagnosticSource diagnosticSource;
 42        readonly bool isViaSender;
 43
 44        /// <summary>
 45        /// Creates a new AMQP MessageSender.
 46        /// </summary>
 47        /// <param name="connectionStringBuilder">The <see cref="ServiceBusConnectionStringBuilder"/> having entity leve
 48        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bu
 49        /// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
 50        public MessageSender(
 51            ServiceBusConnectionStringBuilder connectionStringBuilder,
 52            RetryPolicy retryPolicy = null)
 053            : this(connectionStringBuilder?.GetNamespaceConnectionString(), connectionStringBuilder?.EntityPath, retryPo
 54        {
 055        }
 56
 57        /// <summary>
 58        /// Creates a new AMQP MessageSender.
 59        /// </summary>
 60        /// <param name="connectionString">Namespace connection string used to communicate with Service Bus. Must not co
 61        /// <param name="entityPath">The path of the entity this sender should connect to.</param>
 62        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bu
 63        /// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
 64        public MessageSender(
 65            string connectionString,
 66            string entityPath,
 67            RetryPolicy retryPolicy = null)
 068            : this(entityPath, null, null, new ServiceBusConnection(connectionString), null, retryPolicy)
 69        {
 070            if (string.IsNullOrWhiteSpace(connectionString))
 71            {
 072                throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
 73            }
 74
 075            this.OwnsConnection = true;
 076        }
 77
 78        /// <summary>
 79        /// Creates a new MessageSender
 80        /// </summary>
 81        /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.
 82        /// <param name="entityPath">Queue path.</param>
 83        /// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
 84        /// <param name="transportType">Transport type.</param>
 85        /// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></
 86        /// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
 87        public MessageSender(
 88            string endpoint,
 89            string entityPath,
 90            ITokenProvider tokenProvider,
 91            TransportType transportType = TransportType.Amqp,
 92            RetryPolicy retryPolicy = null)
 093            : this(entityPath, null, null, new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider
 94        {
 095            this.OwnsConnection = true;
 096        }
 97
 98        /// <summary>
 99        /// Creates a new AMQP MessageSender on a given <see cref="ServiceBusConnection"/>
 100        /// </summary>
 101        /// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
 102        /// <param name="entityPath">The path of the entity this sender should connect to.</param>
 103        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bu
 104        public MessageSender(
 105            ServiceBusConnection serviceBusConnection,
 106            string entityPath,
 107            RetryPolicy retryPolicy = null)
 6108            : this(entityPath, null, null, serviceBusConnection, null, retryPolicy)
 109        {
 6110            this.OwnsConnection = false;
 6111        }
 112
 113        /// <summary>
 114        /// Creates a ViaMessageSender. This can be used to send messages to a destination entity via another another en
 115        /// </summary>
 116        /// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
 117        /// <param name="entityPath">The final destination of the message.</param>
 118        /// <param name="viaEntityPath">The first destination of the message.</param>
 119        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bu
 120        /// <remarks>
 121        /// This is mainly to be used when sending messages in a transaction.
 122        /// When messages need to be sent across entities in a single transaction, this can be used to ensure
 123        /// all the messages land initially in the same entity/partition for local transactions, and then
 124        /// let service bus handle transferring the message to the actual destination.
 125        /// </remarks>
 126        public MessageSender(
 127            ServiceBusConnection serviceBusConnection,
 128            string entityPath,
 129            string viaEntityPath,
 130            RetryPolicy retryPolicy = null)
 6131            :this(viaEntityPath, entityPath, null, serviceBusConnection, null, retryPolicy)
 132        {
 6133            this.OwnsConnection = false;
 6134        }
 135
 136        internal MessageSender(
 137            string entityPath,
 138            string transferDestinationPath,
 139            MessagingEntityType? entityType,
 140            ServiceBusConnection serviceBusConnection,
 141            ICbsTokenProvider cbsTokenProvider,
 142            RetryPolicy retryPolicy)
 12143            : base(nameof(MessageSender), entityPath, retryPolicy ?? RetryPolicy.Default)
 144        {
 12145            MessagingEventSource.Log.MessageSenderCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath);
 146
 12147            if (string.IsNullOrWhiteSpace(entityPath))
 148            {
 0149                throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
 150            }
 151
 12152            this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnect
 12153            this.Path = entityPath;
 12154            this.SendingLinkDestination = entityPath;
 12155            this.EntityType = entityType;
 12156            this.ServiceBusConnection.ThrowIfClosed();
 157
 12158            if (cbsTokenProvider != null)
 159            {
 0160                this.CbsTokenProvider = cbsTokenProvider;
 161            }
 12162            else if (this.ServiceBusConnection.TokenProvider != null)
 163            {
 12164                this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBu
 165            }
 166            else
 167            {
 0168                throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
 169            }
 170
 12171            this.SendLinkManager = new FaultTolerantAmqpObject<SendingAmqpLink>(this.CreateLinkAsync, CloseSession);
 12172            this.RequestResponseLinkManager = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(this.CreateRequestRes
 12173            this.clientLinkManager = new ActiveClientLinkManager(this, this.CbsTokenProvider);
 12174            this.diagnosticSource = new ServiceBusDiagnosticSource(entityPath, serviceBusConnection.Endpoint);
 175
 12176            if (!string.IsNullOrWhiteSpace(transferDestinationPath))
 177            {
 6178                this.isViaSender = true;
 6179                this.TransferDestinationPath = transferDestinationPath;
 6180                this.ViaEntityPath = entityPath;
 181            }
 182
 12183            MessagingEventSource.Log.MessageSenderCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.C
 12184        }
 185
 186        /// <summary>
 187        /// Gets a list of currently registered plugins for this sender.
 188        /// </summary>
 189        /// <seealso cref="RegisterPlugin"/>
 0190        public override IList<ServiceBusPlugin> RegisteredPlugins { get; } = new List<ServiceBusPlugin>();
 191
 192        /// <summary>
 193        /// Gets the entity path of the MessageSender.
 194        /// In the case of a via-sender, this returns the path of the via entity.
 195        /// </summary>
 4196        public override string Path { get; }
 197
 198        /// <summary>
 199        /// In the case of a via-sender, gets the final destination path of the messages; null otherwise.
 200        /// </summary>
 4201        public string TransferDestinationPath { get; }
 202
 203        /// <summary>
 204        /// In the case of a via-sender, the message is sent to <see cref="TransferDestinationPath"/> via <see cref="Via
 205        /// </summary>
 4206        public string ViaEntityPath { get; }
 207
 208        /// <summary>
 209        /// Duration after which individual operations will timeout.
 210        /// </summary>
 211        public override TimeSpan OperationTimeout
 212        {
 0213            get => this.ServiceBusConnection.OperationTimeout;
 0214            set => this.ServiceBusConnection.OperationTimeout = value;
 215        }
 216
 217        /// <summary>
 218        /// Connection object to the service bus namespace.
 219        /// </summary>
 48220        public override ServiceBusConnection ServiceBusConnection { get; }
 221
 0222        internal MessagingEntityType? EntityType { get; }
 223
 0224        internal string SendingLinkDestination { get; set; }
 225
 12226        ICbsTokenProvider CbsTokenProvider { get; }
 227
 0228        FaultTolerantAmqpObject<SendingAmqpLink> SendLinkManager { get; }
 229
 0230        FaultTolerantAmqpObject<RequestResponseAmqpLink> RequestResponseLinkManager { get; }
 231
 232        /// <summary>
 233        /// Sends a message to the entity as described by <see cref="Path"/>.
 234        /// </summary>
 235        public Task SendAsync(Message message)
 236        {
 0237            return this.SendAsync(new[] { message });
 238        }
 239
 240        /// <summary>
 241        /// Sends a list of messages to the entity as described by <see cref="Path"/>.
 242        /// When called on partitioned entities, messages meant for different partitions cannot be batched together.
 243        /// </summary>
 244        public async Task SendAsync(IList<Message> messageList)
 245        {
 0246            this.ThrowIfClosed();
 247
 0248            var count = MessageSender.ValidateMessages(messageList);
 0249            if (count <= 0)
 250            {
 0251                return;
 252            }
 253
 0254            MessagingEventSource.Log.MessageSendStart(this.ClientId, count);
 255
 0256            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0257            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null;
 0258            Task sendTask = null;
 259
 260            try
 261            {
 0262                var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false);
 263
 0264                sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(processedMessages), this.OperationTimeou
 0265                await sendTask.ConfigureAwait(false);
 0266            }
 0267            catch (Exception exception)
 268            {
 0269                if (isDiagnosticSourceEnabled)
 270                {
 0271                    this.diagnosticSource.ReportException(exception);
 272                }
 273
 0274                MessagingEventSource.Log.MessageSendException(this.ClientId, exception);
 0275                throw;
 276            }
 277            finally
 278            {
 0279                this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status);
 280            }
 281
 0282            MessagingEventSource.Log.MessageSendStop(this.ClientId);
 0283        }
 284
 285        /// <summary>
 286        /// Schedules a message to appear on Service Bus at a later time.
 287        /// </summary>
 288        /// <param name="message">The <see cref="Message"/> that needs to be scheduled.</param>
 289        /// <param name="scheduleEnqueueTimeUtc">The UTC time at which the message should be available for processing</p
 290        /// <returns>The sequence number of the message that was scheduled.</returns>
 291        public async Task<long> ScheduleMessageAsync(Message message, DateTimeOffset scheduleEnqueueTimeUtc)
 292        {
 0293            this.ThrowIfClosed();
 0294            if (message == null)
 295            {
 0296                throw Fx.Exception.ArgumentNull(nameof(message));
 297            }
 298
 0299            if (scheduleEnqueueTimeUtc.CompareTo(DateTimeOffset.UtcNow) < 0)
 300            {
 0301                throw Fx.Exception.ArgumentOutOfRange(
 0302                    nameof(scheduleEnqueueTimeUtc),
 0303                    scheduleEnqueueTimeUtc.ToString(),
 0304                    "Cannot schedule messages in the past");
 305            }
 306
 0307            if (this.isViaSender && Transaction.Current != null)
 308            {
 0309                throw new ServiceBusException(false, $"{nameof(ScheduleMessageAsync)} method is not supported in a Via-S
 310            }
 311
 0312            message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime;
 0313            MessageSender.ValidateMessage(message);
 0314            MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc);
 0315            long result = 0;
 316
 0317            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0318            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.ScheduleStart(message, scheduleEnqueue
 0319            Task scheduleTask = null;
 320
 321            try
 322            {
 0323                var processedMessage = await this.ProcessMessage(message).ConfigureAwait(false);
 324
 0325                scheduleTask = this.RetryPolicy.RunOperation(
 0326                    async () =>
 0327                    {
 0328                        result = await this.OnScheduleMessageAsync(processedMessage).ConfigureAwait(false);
 0329                    }, this.OperationTimeout);
 0330                await scheduleTask.ConfigureAwait(false);
 0331            }
 0332            catch (Exception exception)
 333            {
 0334                if (isDiagnosticSourceEnabled)
 335                {
 0336                    this.diagnosticSource.ReportException(exception);
 337                }
 338
 0339                MessagingEventSource.Log.ScheduleMessageException(this.ClientId, exception);
 0340                throw;
 341            }
 342            finally
 343            {
 0344                this.diagnosticSource.ScheduleStop(activity, message, scheduleEnqueueTimeUtc, scheduleTask?.Status, resu
 345            }
 346
 0347            MessagingEventSource.Log.ScheduleMessageStop(this.ClientId);
 0348            return result;
 0349        }
 350
 351        /// <summary>
 352        /// Cancels a message that was scheduled.
 353        /// </summary>
 354        /// <param name="sequenceNumber">The <see cref="Message.SystemPropertiesCollection.SequenceNumber"/> of the mess
 355        public async Task CancelScheduledMessageAsync(long sequenceNumber)
 356        {
 0357            this.ThrowIfClosed();
 0358            if (Transaction.Current != null)
 359            {
 0360                throw new ServiceBusException(false, $"{nameof(CancelScheduledMessageAsync)} method is not supported wit
 361            }
 362
 0363            MessagingEventSource.Log.CancelScheduledMessageStart(this.ClientId, sequenceNumber);
 364
 0365            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0366            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.CancelStart(sequenceNumber) : null;
 0367            Task cancelTask = null;
 368
 369            try
 370            {
 0371                cancelTask = this.RetryPolicy.RunOperation(() => this.OnCancelScheduledMessageAsync(sequenceNumber),
 0372                    this.OperationTimeout);
 0373                await cancelTask.ConfigureAwait(false);
 0374            }
 0375            catch (Exception exception)
 376            {
 0377                if (isDiagnosticSourceEnabled)
 378                {
 0379                    this.diagnosticSource.ReportException(exception);
 380                }
 381
 0382                MessagingEventSource.Log.CancelScheduledMessageException(this.ClientId, exception);
 0383                throw;
 384            }
 385            finally
 386            {
 0387                this.diagnosticSource.CancelStop(activity, sequenceNumber, cancelTask?.Status);
 388            }
 0389            MessagingEventSource.Log.CancelScheduledMessageStop(this.ClientId);
 0390        }
 391
 392        /// <summary>
 393        /// Registers a <see cref="ServiceBusPlugin"/> to be used with this sender.
 394        /// </summary>
 395        /// <param name="serviceBusPlugin">The <see cref="ServiceBusPlugin"/> to register.</param>
 396        public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)
 397        {
 0398            this.ThrowIfClosed();
 0399            if (serviceBusPlugin == null)
 400            {
 0401                throw new ArgumentNullException(nameof(serviceBusPlugin), Resources.ArgumentNullOrWhiteSpace.FormatForUs
 402            }
 403
 0404            if (this.RegisteredPlugins.Any(p => p.GetType() == serviceBusPlugin.GetType()))
 405            {
 0406                throw new ArgumentException(nameof(serviceBusPlugin), Resources.PluginAlreadyRegistered.FormatForUser(se
 407            }
 0408            this.RegisteredPlugins.Add(serviceBusPlugin);
 0409        }
 410
 411        /// <summary>
 412        /// Unregisters a <see cref="ServiceBusPlugin"/>.
 413        /// </summary>
 414        /// <param name="serviceBusPluginName">The name <see cref="ServiceBusPlugin.Name"/> to be unregistered</param>
 415        public override void UnregisterPlugin(string serviceBusPluginName)
 416        {
 0417            this.ThrowIfClosed();
 0418            if (string.IsNullOrWhiteSpace(serviceBusPluginName))
 419            {
 0420                throw new ArgumentNullException(nameof(serviceBusPluginName), Resources.ArgumentNullOrWhiteSpace.FormatF
 421            }
 0422            if (this.RegisteredPlugins.Any(p => p.Name == serviceBusPluginName))
 423            {
 0424                var plugin = this.RegisteredPlugins.First(p => p.Name == serviceBusPluginName);
 0425                this.RegisteredPlugins.Remove(plugin);
 426            }
 0427        }
 428
 429        internal async Task<AmqpResponseMessage> ExecuteRequestResponseAsync(AmqpRequestMessage amqpRequestMessage)
 430        {
 0431            var amqpMessage = amqpRequestMessage.AmqpMessage;
 0432            var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
 433
 0434            ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
 0435            var ambientTransaction = Transaction.Current;
 0436            if (ambientTransaction != null)
 437            {
 0438                transactionId = await AmqpTransactionManager.Instance.EnlistAsync(ambientTransaction, this.ServiceBusCon
 439            }
 440
 0441            if (!this.RequestResponseLinkManager.TryGetOpenedObject(out var requestResponseAmqpLink))
 442            {
 0443                requestResponseAmqpLink = await this.RequestResponseLinkManager.GetOrCreateAsync(timeoutHelper.Remaining
 444            }
 445
 0446            var responseAmqpMessage = await Task.Factory.FromAsync(
 0447                (c, s) => requestResponseAmqpLink.BeginRequest(amqpMessage, transactionId, timeoutHelper.RemainingTime()
 0448                a => requestResponseAmqpLink.EndRequest(a),
 0449                this).ConfigureAwait(false);
 450
 0451            return AmqpResponseMessage.CreateResponse(responseAmqpMessage);
 0452        }
 453
 454        /// <summary>Closes the connection.</summary>
 455        protected override async Task OnClosingAsync()
 456        {
 0457            this.clientLinkManager.Close();
 0458            await this.SendLinkManager.CloseAsync().ConfigureAwait(false);
 0459            await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
 0460        }
 461
 462        static int ValidateMessages(IList<Message> messageList)
 463        {
 0464            var count = 0;
 0465            if (messageList == null)
 466            {
 0467                throw Fx.Exception.ArgumentNull(nameof(messageList));
 468            }
 469
 0470            foreach (var message in messageList)
 471            {
 0472                count++;
 0473                ValidateMessage(message);
 474            }
 475
 0476            return count;
 477        }
 478
 479        static void ValidateMessage(Message message)
 480        {
 0481            if (message.SystemProperties.IsLockTokenSet)
 482            {
 0483                throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received.");
 484            }
 0485        }
 486
 487        static void CloseSession(SendingAmqpLink link)
 488        {
 489            // Note we close the session (which includes the link).
 0490            link.Session.SafeClose();
 0491        }
 492
 493        static void CloseRequestResponseSession(RequestResponseAmqpLink requestResponseAmqpLink)
 494        {
 0495            requestResponseAmqpLink.Session.SafeClose();
 0496        }
 497
 498        async Task<Message> ProcessMessage(Message message)
 499        {
 0500            var processedMessage = message;
 0501            foreach (var plugin in this.RegisteredPlugins)
 502            {
 503                try
 504                {
 0505                    MessagingEventSource.Log.PluginCallStarted(plugin.Name, message.MessageId);
 0506                    processedMessage = await plugin.BeforeMessageSend(message).ConfigureAwait(false);
 0507                    MessagingEventSource.Log.PluginCallCompleted(plugin.Name, message.MessageId);
 0508                }
 0509                catch (Exception ex)
 510                {
 0511                    MessagingEventSource.Log.PluginCallFailed(plugin.Name, message.MessageId, ex);
 0512                    if (!plugin.ShouldContinueOnException)
 513                    {
 0514                        throw;
 515                    }
 0516                }
 0517            }
 0518            return processedMessage;
 0519        }
 520
 521        async Task<IList<Message>> ProcessMessages(IList<Message> messageList)
 522        {
 0523            if (this.RegisteredPlugins.Count < 1)
 524            {
 0525                return messageList;
 526            }
 527
 0528            var processedMessageList = new List<Message>();
 0529            foreach (var message in messageList)
 530            {
 0531                var processedMessage = await this.ProcessMessage(message).ConfigureAwait(false);
 0532                processedMessageList.Add(processedMessage);
 533            }
 534
 0535            return processedMessageList;
 0536        }
 537
 538        async Task OnSendAsync(IList<Message> messageList)
 539        {
 0540            var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
 0541            using (var amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageList))
 542            {
 0543                SendingAmqpLink amqpLink = null;
 544                try
 545                {
 0546                    ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
 0547                    var ambientTransaction = Transaction.Current;
 0548                    if (ambientTransaction != null)
 549                    {
 0550                        transactionId = await AmqpTransactionManager.Instance.EnlistAsync(ambientTransaction, this.Servi
 551                    }
 552
 0553                    if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink))
 554                    {
 0555                        amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureA
 556                    }
 0557                    if (amqpLink.Settings.MaxMessageSize.HasValue)
 558                    {
 0559                        var size = (ulong)amqpMessage.SerializedMessageSize;
 0560                        if (size > amqpLink.Settings.MaxMessageSize.Value)
 561                        {
 0562                            throw new MessageSizeExceededException(Resources.AmqpMessageSizeExceeded.FormatForUser(amqpM
 563                        }
 564                    }
 565
 0566                    var outcome = await amqpLink.SendMessageAsync(amqpMessage, this.GetNextDeliveryTag(), transactionId,
 567
 0568                    if (outcome.DescriptorCode != Accepted.Code)
 569                    {
 0570                        var rejected = (Rejected)outcome;
 0571                        throw Fx.Exception.AsError(rejected.Error.ToMessagingContractException());
 572                    }
 0573                }
 574                catch (Exception exception)
 575                {
 0576                    throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.S
 577                }
 0578            }
 0579        }
 580
 581        async Task<long> OnScheduleMessageAsync(Message message)
 582        {
 0583            using (var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message))
 584            {
 0585                var request = AmqpRequestMessage.CreateRequest(
 0586                        ManagementConstants.Operations.ScheduleMessageOperation,
 0587                        this.OperationTimeout,
 0588                        null);
 589
 0590                SendingAmqpLink sendLink = null;
 591
 592                try
 593                {
 0594                    if (this.SendLinkManager.TryGetOpenedObject(out sendLink))
 595                    {
 0596                        request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = 
 597                    }
 598
 0599                    ArraySegment<byte>[] payload = amqpMessage.GetPayload();
 0600                    var buffer = new BufferListStream(payload);
 0601                    ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);
 602
 0603                    var entry = new AmqpMap();
 604                    {
 0605                        entry[ManagementConstants.Properties.Message] = value;
 0606                        entry[ManagementConstants.Properties.MessageId] = message.MessageId;
 607
 0608                        if (!string.IsNullOrWhiteSpace(message.SessionId))
 609                        {
 0610                            entry[ManagementConstants.Properties.SessionId] = message.SessionId;
 611                        }
 612
 0613                        if (!string.IsNullOrWhiteSpace(message.PartitionKey))
 614                        {
 0615                            entry[ManagementConstants.Properties.PartitionKey] = message.PartitionKey;
 616                        }
 617
 0618                        if (!string.IsNullOrWhiteSpace(message.ViaPartitionKey))
 619                        {
 0620                            entry[ManagementConstants.Properties.ViaPartitionKey] = message.ViaPartitionKey;
 621                        }
 622                    }
 623
 0624                    request.Map[ManagementConstants.Properties.Messages] = new List<AmqpMap> { entry };
 625
 0626                    var response = await this.ExecuteRequestResponseAsync(request).ConfigureAwait(false);
 0627                    if (response.StatusCode == AmqpResponseStatusCode.OK)
 628                    {
 0629                        var sequenceNumbers = response.GetValue<long[]>(ManagementConstants.Properties.SequenceNumbers);
 0630                        if (sequenceNumbers == null || sequenceNumbers.Length < 1)
 631                        {
 0632                            throw new ServiceBusException(true, "Could not schedule message successfully.");
 633                        }
 634
 0635                        return sequenceNumbers[0];
 636
 637                    }
 638                    else
 639                    {
 0640                        throw response.ToMessagingContractException();
 641                    }
 642                }
 643                catch (Exception exception)
 644                {
 0645                    throw AmqpExceptionHelper.GetClientException(exception, sendLink?.GetTrackingId(), null, sendLink?.S
 646                }
 647            }
 0648        }
 649
 650        async Task OnCancelScheduledMessageAsync(long sequenceNumber)
 651        {
 0652            var request =
 0653                AmqpRequestMessage.CreateRequest(
 0654                    ManagementConstants.Operations.CancelScheduledMessageOperation,
 0655                    this.OperationTimeout,
 0656                    null);
 657
 0658            SendingAmqpLink sendLink = null;
 659
 660            try
 661            {
 0662                if (this.SendLinkManager.TryGetOpenedObject(out sendLink))
 663                {
 0664                    request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = send
 665                }
 666
 0667                request.Map[ManagementConstants.Properties.SequenceNumbers] = new[] { sequenceNumber };
 668
 0669                var response = await this.ExecuteRequestResponseAsync(request).ConfigureAwait(false);
 670
 0671                if (response.StatusCode != AmqpResponseStatusCode.OK)
 672                {
 0673                    throw response.ToMessagingContractException();
 674                }
 0675            }
 676            catch (Exception exception)
 677            {
 0678                throw AmqpExceptionHelper.GetClientException(exception, sendLink?.GetTrackingId(), null, sendLink?.Sessi
 679            }
 0680        }
 681
 682        async Task<SendingAmqpLink> CreateLinkAsync(TimeSpan timeout)
 683        {
 0684            MessagingEventSource.Log.AmqpSendLinkCreateStart(this.ClientId, this.EntityType, this.SendingLinkDestination
 685
 0686            var amqpLinkSettings = new AmqpLinkSettings
 0687            {
 0688                Role = false,
 0689                InitialDeliveryCount = 0,
 0690                Target = new Target { Address = this.SendingLinkDestination },
 0691                Source = new Source { Address = this.ClientId },
 0692            };
 0693            if (this.EntityType != null)
 694            {
 0695                amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, (int)this.EntityType);
 696            }
 697
 0698            var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.SendingLinkDestination);
 699
 700            string[] audience;
 0701            if (this.isViaSender)
 702            {
 0703                var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.TransferDestinatio
 0704                audience = new string[] { endpointUri.AbsoluteUri, transferDestinationEndpointUri.AbsoluteUri };
 0705                amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.TransferDestinationPat
 706            }
 707            else
 708            {
 0709                audience = new string[] { endpointUri.AbsoluteUri };
 710            }
 711
 0712            string[] claims = {ClaimConstants.Send};
 0713            var amqpSendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.SendingLinkDestination, this.ServiceBus
 0714            Tuple<AmqpObject, DateTime> linkDetails = await amqpSendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().Conf
 715
 0716            var sendingAmqpLink = (SendingAmqpLink) linkDetails.Item1;
 0717            var activeSendReceiveClientLink = new ActiveSendReceiveClientLink(
 0718                sendingAmqpLink,
 0719                endpointUri,
 0720                audience,
 0721                claims,
 0722                linkDetails.Item2);
 723
 0724            this.clientLinkManager.SetActiveSendReceiveLink(activeSendReceiveClientLink);
 725
 0726            MessagingEventSource.Log.AmqpSendLinkCreateStop(this.ClientId);
 0727            return sendingAmqpLink;
 0728        }
 729
 730        async Task<RequestResponseAmqpLink> CreateRequestResponseLinkAsync(TimeSpan timeout)
 731        {
 0732            var entityPath = this.SendingLinkDestination + '/' + AmqpClientConstants.ManagementAddress;
 0733            var amqpLinkSettings = new AmqpLinkSettings();
 0734            amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);
 735
 0736            var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, entityPath);
 737
 738            string[] audience;
 0739            if (this.isViaSender)
 740            {
 0741                var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.TransferDestinatio
 0742                audience = new string[] { endpointUri.AbsoluteUri, transferDestinationEndpointUri.AbsoluteUri };
 0743                amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.TransferDestinationPat
 744            }
 745            else
 746            {
 0747                audience = new string[] { endpointUri.AbsoluteUri };
 748            }
 749
 0750            string[] claims = { ClaimConstants.Manage, ClaimConstants.Send };
 0751            var amqpRequestResponseLinkCreator = new AmqpRequestResponseLinkCreator(
 0752                entityPath,
 0753                this.ServiceBusConnection,
 0754                endpointUri,
 0755                audience,
 0756                claims,
 0757                this.CbsTokenProvider,
 0758                amqpLinkSettings,
 0759                this.ClientId);
 760
 0761            Tuple<AmqpObject, DateTime> linkDetails =
 0762                await amqpRequestResponseLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
 763
 0764            var requestResponseAmqpLink = (RequestResponseAmqpLink) linkDetails.Item1;
 0765            var activeRequestResponseClientLink = new ActiveRequestResponseLink(
 0766                requestResponseAmqpLink,
 0767                endpointUri,
 0768                audience,
 0769                claims,
 0770                linkDetails.Item2);
 0771            this.clientLinkManager.SetActiveRequestResponseLink(activeRequestResponseClientLink);
 772
 0773            return requestResponseAmqpLink;
 0774        }
 775
 776        ArraySegment<byte> GetNextDeliveryTag()
 777        {
 0778            var deliveryId = Interlocked.Increment(ref this.deliveryCount);
 0779            return new ArraySegment<byte>(BitConverter.GetBytes(deliveryId));
 780        }
 781    }
 782}