< Summary

Class:Microsoft.Azure.ServiceBus.Core.MessageReceiver
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\Core\MessageReceiver.cs
Covered lines:0
Uncovered lines:672
Coverable lines:672
Total lines:1698
Line coverage:0% (0 of 672)
Covered branches:0
Total branches:296
Branch coverage:0% (0 of 296)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-0%100%
.ctor(...)-0%0%
.ctor(...)-0%0%
.ctor(...)-0%100%
.ctor(...)-0%100%
.ctor(...)-0%0%
get_RegisteredPlugins()-0%100%
get_ReceiveMode()-0%100%
get_PrefetchCount()-0%100%
set_PrefetchCount(...)-0%0%
get_LastPeekedSequenceNumber()-0%100%
set_LastPeekedSequenceNumber(...)-0%0%
get_Path()-0%100%
get_OperationTimeout()-0%100%
set_OperationTimeout(...)-0%100%
get_ServiceBusConnection()-0%100%
get_LockedUntilUtcInternal()-0%100%
get_SessionIdInternal()-0%100%
get_EntityType()-0%100%
get_LinkException()-0%100%
get_CbsTokenProvider()-0%100%
get_ReceiveLinkManager()-0%100%
get_RequestResponseLinkManager()-0%100%
ReceiveAsync()-0%100%
ReceiveAsync()-0%0%
ReceiveAsync(...)-0%100%
ReceiveAsync()-0%0%
<ReceiveAsync()-0%100%
ReceiveDeferredMessageAsync()-0%0%
ReceiveDeferredMessageAsync()-0%0%
<ReceiveDeferredMessageAsync()-0%100%
CompleteAsync(...)-0%100%
CompleteAsync()-0%0%
AbandonAsync()-0%0%
DeferAsync()-0%0%
DeadLetterAsync()-0%0%
DeadLetterAsync()-0%0%
RenewLockAsync()-0%100%
RenewLockAsync()-0%0%
<RenewLockAsync()-0%100%
PeekAsync()-0%100%
PeekAsync(...)-0%100%
PeekBySequenceNumberAsync()-0%0%
PeekBySequenceNumberAsync()-0%0%
<PeekBySequenceNumberAsync()-0%100%
RegisterMessageHandler(...)-0%100%
RegisterMessageHandler(...)-0%100%
RegisterPlugin(...)-0%0%
UnregisterPlugin(...)-0%0%
GetSessionReceiverLinkAsync()-0%0%
ExecuteRequestResponseAsync()-0%0%
OnClosingAsync()-0%0%
OnReceiveAsync()-0%0%
OnPeekAsync()-0%0%
OnReceiveDeferredMessageAsync()-0%0%
OnCompleteAsync(...)-0%0%
OnAbandonAsync(...)-0%0%
OnDeferAsync(...)-0%0%
OnDeadLetterAsync(...)-0%0%
OnRenewLockAsync()-0%0%
OnMessageHandler(...)-0%0%
CloseSession(...)-0%100%
CloseRequestResponseSession(...)-0%100%
ProcessMessage()-0%0%
ProcessMessages()-0%0%
DisposeMessagesAsync()-0%0%
DisposeMessageRequestResponseAsync()-0%0%
CreateLinkAsync()-0%0%
CreateRequestResponseLinkAsync()-0%100%
OnSessionReceiverLinkClosed(...)-0%0%
ConvertLockTokensToDeliveryTags(...)-0%100%
ThrowIfNotPeekLockMode()-0%0%
ThrowIfSessionLockLost()-0%0%
GetAbandonOutcome(...)-0%100%
GetDeferOutcome(...)-0%100%
GetModifiedOutcome(...)-0%0%
GetRejectedOutcome(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\Core\MessageReceiver.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus.Core
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Diagnostics;
 9    using System.Linq;
 10    using System.Threading;
 11    using System.Threading.Tasks;
 12    using System.Transactions;
 13    using Microsoft.Azure.Amqp;
 14    using Microsoft.Azure.Amqp.Encoding;
 15    using Microsoft.Azure.Amqp.Framing;
 16    using Microsoft.Azure.ServiceBus.Amqp;
 17    using Microsoft.Azure.ServiceBus.Primitives;
 18
 19    /// <summary>
 20    /// The MessageReceiver can be used to receive messages from Queues and Subscriptions and acknowledge them.
 21    /// </summary>
 22    /// <example>
 23    /// Create a new MessageReceiver to receive a message from a Subscription
 24    /// <code>
 25    /// IMessageReceiver messageReceiver = new MessageReceiver(
 26    ///     namespaceConnectionString,
 27    ///     EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName),
 28    ///     ReceiveMode.PeekLock);
 29    /// </code>
 30    ///
 31    /// Receive a message from the Subscription.
 32    /// <code>
 33    /// var message = await messageReceiver.ReceiveAsync();
 34    /// await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
 35    /// </code>
 36    /// </example>
 37    /// <remarks>
 38    /// The MessageReceiver provides advanced functionality that is not found in the
 39    /// <see cref="QueueClient" /> or <see cref="SubscriptionClient" />. For instance,
 40    /// <see cref="ReceiveAsync()"/>, which allows you to receive messages on demand, but also requires
 41    /// you to manually renew locks using <see cref="RenewLockAsync(Message)"/>.
 42    /// It uses AMQP protocol to communicate with service.
 43    /// </remarks>
 44    public class MessageReceiver : ClientEntity, IMessageReceiver
 45    {
 046        private static readonly TimeSpan DefaultBatchFlushInterval = TimeSpan.FromMilliseconds(20);
 47
 48        readonly ConcurrentExpiringSet<Guid> requestResponseLockedMessages;
 49        readonly bool isSessionReceiver;
 50        readonly object messageReceivePumpSyncLock;
 51        readonly ActiveClientLinkManager clientLinkManager;
 52        readonly ServiceBusDiagnosticSource diagnosticSource;
 53
 54        int prefetchCount;
 55        long lastPeekedSequenceNumber;
 56        MessageReceivePump receivePump;
 57        CancellationTokenSource receivePumpCancellationTokenSource;
 58
 59        /// <summary>
 60        /// Creates a new MessageReceiver from a <see cref="ServiceBusConnectionStringBuilder"/>.
 61        /// </summary>
 62        /// <param name="connectionStringBuilder">The <see cref="ServiceBusConnectionStringBuilder"/> having entity leve
 63        /// <param name="receiveMode">The <see cref="ServiceBus.ReceiveMode"/> used to specify how messages are received
 64        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bu
 65        /// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this 
 66        /// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
 67        /// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
 68        public MessageReceiver(
 69            ServiceBusConnectionStringBuilder connectionStringBuilder,
 70            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 71            RetryPolicy retryPolicy = null,
 72            int prefetchCount = Constants.DefaultClientPrefetchCount)
 073            : this(connectionStringBuilder?.GetNamespaceConnectionString(), connectionStringBuilder?.EntityPath, receive
 74        {
 075        }
 76
 77        /// <summary>
 78        /// Creates a new MessageReceiver from a specified connection string and entity path.
 79        /// </summary>
 80        /// <param name="connectionString">Namespace connection string used to communicate with Service Bus. Must not co
 81        /// <param name="entityPath">The path of the entity for this receiver. For Queues this will be the name, but for
 82        /// You can use <see cref="EntityNameHelper.FormatSubscriptionPath(string, string)"/>, to help create this path.
 83        /// <param name="receiveMode">The <see cref="ServiceBus.ReceiveMode"/> used to specify how messages are received
 84        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bu
 85        /// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this 
 86        /// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
 87        /// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
 88        public MessageReceiver(
 89            string connectionString,
 90            string entityPath,
 91            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 92            RetryPolicy retryPolicy = null,
 93            int prefetchCount = Constants.DefaultClientPrefetchCount)
 094            : this(entityPath, null, receiveMode, new ServiceBusConnection(connectionString), null, retryPolicy, prefetc
 95        {
 096            if (string.IsNullOrWhiteSpace(connectionString))
 97            {
 098                throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
 99            }
 100
 0101            this.OwnsConnection = true;
 0102        }
 103
 104        /// <summary>
 105        /// Creates a new MessageReceiver from a specified endpoint, entity path, and token provider.
 106        /// </summary>
 107        /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.
 108        /// <param name="entityPath">Queue path.</param>
 109        /// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
 110        /// <param name="transportType">Transport type.</param>
 111        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 112        /// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></
 113        /// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this 
 114        /// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
 115        /// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
 116        public MessageReceiver(
 117            string endpoint,
 118            string entityPath,
 119            ITokenProvider tokenProvider,
 120            TransportType transportType = TransportType.Amqp,
 121            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 122            RetryPolicy retryPolicy = null,
 123            int prefetchCount = Constants.DefaultClientPrefetchCount)
 0124            : this(entityPath, null, receiveMode, new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenP
 125        {
 0126            this.OwnsConnection = true;
 0127        }
 128
 129        /// <summary>
 130        /// Creates a new AMQP MessageReceiver on a given <see cref="ServiceBusConnection"/>
 131        /// </summary>
 132        /// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
 133        /// <param name="entityPath">The path of the entity for this receiver. For Queues this will be the name, but for
 134        /// You can use <see cref="EntityNameHelper.FormatSubscriptionPath(string, string)"/>, to help create this path.
 135        /// <param name="receiveMode">The <see cref="ServiceBus.ReceiveMode"/> used to specify how messages are received
 136        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with Service Bu
 137        /// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this 
 138        /// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
 139        public MessageReceiver(
 140            ServiceBusConnection serviceBusConnection,
 141            string entityPath,
 142            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 143            RetryPolicy retryPolicy = null,
 144            int prefetchCount = Constants.DefaultClientPrefetchCount)
 0145            : this(entityPath, null, receiveMode, serviceBusConnection, null, retryPolicy, prefetchCount)
 146        {
 0147            this.OwnsConnection = false;
 0148        }
 149
 150        internal MessageReceiver(
 151            string entityPath,
 152            MessagingEntityType? entityType,
 153            ReceiveMode receiveMode,
 154            ServiceBusConnection serviceBusConnection,
 155            ICbsTokenProvider cbsTokenProvider,
 156            RetryPolicy retryPolicy,
 157            int prefetchCount = Constants.DefaultClientPrefetchCount,
 158            string sessionId = null,
 159            bool isSessionReceiver = false)
 0160            : base(nameof(MessageReceiver), entityPath, retryPolicy ?? RetryPolicy.Default)
 161        {
 0162            MessagingEventSource.Log.MessageReceiverCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath, re
 163
 0164            if (string.IsNullOrWhiteSpace(entityPath))
 165            {
 0166                throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
 167            }
 168
 0169            this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnect
 0170            this.ReceiveMode = receiveMode;
 0171            this.Path = entityPath;
 0172            this.EntityType = entityType;
 0173            this.ServiceBusConnection.ThrowIfClosed();
 174
 0175            if (cbsTokenProvider != null)
 176            {
 0177                this.CbsTokenProvider = cbsTokenProvider;
 178            }
 0179            else if (this.ServiceBusConnection.TokenProvider != null)
 180            {
 0181                this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBu
 182            }
 183            else
 184            {
 0185                throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
 186            }
 187
 0188            this.SessionIdInternal = sessionId;
 0189            this.isSessionReceiver = isSessionReceiver;
 0190            this.ReceiveLinkManager = new FaultTolerantAmqpObject<ReceivingAmqpLink>(this.CreateLinkAsync, CloseSession)
 0191            this.RequestResponseLinkManager = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(this.CreateRequestRes
 0192            this.requestResponseLockedMessages = new ConcurrentExpiringSet<Guid>();
 0193            this.PrefetchCount = prefetchCount;
 0194            this.messageReceivePumpSyncLock = new object();
 0195            this.clientLinkManager = new ActiveClientLinkManager(this, this.CbsTokenProvider);
 0196            this.diagnosticSource = new ServiceBusDiagnosticSource(entityPath, serviceBusConnection.Endpoint);
 0197            MessagingEventSource.Log.MessageReceiverCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this
 0198        }
 199
 200        /// <summary>
 201        /// Gets a list of currently registered plugins.
 202        /// </summary>
 0203        public override IList<ServiceBusPlugin> RegisteredPlugins { get; } = new List<ServiceBusPlugin>();
 204
 205        /// <summary>
 206        /// Gets the <see cref="ServiceBus.ReceiveMode"/> of the current receiver.
 207        /// </summary>
 0208        public ReceiveMode ReceiveMode { get; protected set; }
 209
 210        /// <summary>
 211        /// Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when a
 212        /// Setting a non-zero value prefetches PrefetchCount number of messages.
 213        /// Setting the value to zero turns prefetch off.
 214        /// Defaults to 0.
 215        /// </summary>
 216        /// <remarks>
 217        /// <para>
 218        /// When Prefetch is enabled, the receiver will quietly acquire more messages, up to the PrefetchCount limit, th
 219        /// immediately asks for. A single initial Receive/ReceiveAsync call will therefore acquire a message for immedi
 220        /// that will be returned as soon as available, and the client will proceed to acquire further messages to fill 
 221        /// </para>
 222        /// <para>
 223        /// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately s
 224        /// replenished in the background as space becomes available.If there are no messages available for delivery, th
 225        /// buffer and then wait or block as expected.
 226        /// </para>
 227        /// <para>Prefetch also works equivalently with the <see cref="RegisterMessageHandler(Func{Message,CancellationT
 228        /// <para>Updates to this value take effect on the next receive call to the service.</para>
 229        /// </remarks>
 230        public int PrefetchCount
 231        {
 0232            get => this.prefetchCount;
 233
 234            set
 235            {
 0236                if (value < 0)
 237                {
 0238                    throw Fx.Exception.ArgumentOutOfRange(nameof(this.PrefetchCount), value, "Value cannot be less than 
 239                }
 0240                this.prefetchCount = value;
 0241                if (this.ReceiveLinkManager.TryGetOpenedObject(out var link))
 242                {
 0243                    link.SetTotalLinkCredit((uint)value, true, true);
 244                }
 0245            }
 246        }
 247
 248        /// <summary>Gets the sequence number of the last peeked message.</summary>
 249        /// <seealso cref="PeekAsync()"/>
 250        public long LastPeekedSequenceNumber
 251        {
 0252            get => this.lastPeekedSequenceNumber;
 253
 254            internal set
 255            {
 0256                if (value < 0)
 257                {
 0258                    throw new ArgumentOutOfRangeException(nameof(this.LastPeekedSequenceNumber), value.ToString());
 259                }
 260
 0261                this.lastPeekedSequenceNumber = value;
 0262            }
 263        }
 264
 265        /// <summary>The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions t
 0266        public override string Path { get; }
 267
 268        /// <summary>
 269        /// Duration after which individual operations will timeout.
 270        /// </summary>
 271        public override TimeSpan OperationTimeout {
 0272            get => this.ServiceBusConnection.OperationTimeout;
 0273            set => this.ServiceBusConnection.OperationTimeout = value;
 274        }
 275
 276        /// <summary>
 277        /// Connection object to the service bus namespace.
 278        /// </summary>
 0279        public override ServiceBusConnection ServiceBusConnection { get; }
 280
 281        /// <summary>
 282        /// Gets the DateTime that the current receiver is locked until. This is only applicable when Sessions are used.
 283        /// </summary>
 0284        internal DateTime LockedUntilUtcInternal { get; set; }
 285
 286        /// <summary>
 287        /// Gets the SessionId of the current receiver. This is only applicable when Sessions are used.
 288        /// </summary>
 0289        internal string SessionIdInternal { get; set; }
 290
 0291        internal MessagingEntityType? EntityType { get; }
 292
 0293        Exception LinkException { get; set; }
 294
 0295        ICbsTokenProvider CbsTokenProvider { get; }
 296
 0297        internal FaultTolerantAmqpObject<ReceivingAmqpLink> ReceiveLinkManager { get; }
 298
 0299        FaultTolerantAmqpObject<RequestResponseAmqpLink> RequestResponseLinkManager { get; }
 300
 301        /// <summary>
 302        /// Receive a message from the entity defined by <see cref="Path"/> using <see cref="ReceiveMode"/> mode.
 303        /// </summary>
 304        /// <returns>The message received. Returns null if no message is found.</returns>
 305        /// <remarks>Operation will time out after duration of <see cref="ClientEntity.OperationTimeout"/></remarks>
 306        public Task<Message> ReceiveAsync()
 307        {
 0308            return this.ReceiveAsync(this.OperationTimeout);
 309        }
 310
 311        /// <summary>
 312        /// Receive a message from the entity defined by <see cref="Path"/> using <see cref="ReceiveMode"/> mode.
 313        /// </summary>
 314        /// <param name="operationTimeout">The time span the client waits for receiving a message before it times out.</
 315        /// <returns>The message received. Returns null if no message is found.</returns>
 316        /// <remarks>
 317        /// The parameter <paramref name="operationTimeout"/> includes the time taken by the receiver to establish a con
 318        /// (either during the first receive or when connection needs to be re-established). If establishing the connect
 319        /// times out, this will throw <see cref="ServiceBusTimeoutException"/>.
 320        /// </remarks>
 321        public async Task<Message> ReceiveAsync(TimeSpan operationTimeout)
 322        {
 0323            var messages = await this.ReceiveAsync(1, operationTimeout).ConfigureAwait(false);
 0324            if (messages != null && messages.Count > 0)
 325            {
 0326                return messages[0];
 327            }
 328
 0329            return null;
 0330        }
 331
 332        /// <summary>
 333        /// Receives a maximum of <paramref name="maxMessageCount"/> messages from the entity defined by <see cref="Path
 334        /// </summary>
 335        /// <param name="maxMessageCount">The maximum number of messages that will be received.</param>
 336        /// <returns>List of messages received. Returns null if no message is found.</returns>
 337        /// <remarks>Receiving less than <paramref name="maxMessageCount"/> messages is not an indication of empty entit
 338        public Task<IList<Message>> ReceiveAsync(int maxMessageCount)
 339        {
 0340            return this.ReceiveAsync(maxMessageCount, this.OperationTimeout);
 341        }
 342
 343        /// <summary>
 344        /// Receives a maximum of <paramref name="maxMessageCount"/> messages from the entity defined by <see cref="Path
 345        /// </summary>
 346        /// <param name="maxMessageCount">The maximum number of messages that will be received.</param>
 347        /// <param name="operationTimeout">The time span the client waits for receiving a message before it times out.</
 348        /// <returns>List of messages received. Returns null if no message is found.</returns>
 349        /// <remarks>Receiving less than <paramref name="maxMessageCount"/> messages is not an indication of empty entit
 350        /// The parameter <paramref name="operationTimeout"/> includes the time taken by the receiver to establish a con
 351        /// (either during the first receive or when connection needs to be re-established). If establishing the connect
 352        /// times out, this will throw <see cref="ServiceBusTimeoutException"/>.
 353        /// </remarks>
 354        public async Task<IList<Message>> ReceiveAsync(int maxMessageCount, TimeSpan operationTimeout)
 355        {
 0356            this.ThrowIfClosed();
 357
 0358            if (operationTimeout <= TimeSpan.Zero)
 359            {
 0360                throw Fx.Exception.ArgumentOutOfRange(nameof(operationTimeout), operationTimeout, Resources.TimeoutMustB
 361            }
 362
 0363            MessagingEventSource.Log.MessageReceiveStart(this.ClientId, maxMessageCount);
 364
 0365            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0366            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.ReceiveStart(maxMessageCount) : null;
 0367            Task receiveTask = null;
 368
 0369            IList<Message> unprocessedMessageList = null;
 370            try
 371            {
 0372                receiveTask = this.RetryPolicy.RunOperation(
 0373                    async () =>
 0374                    {
 0375                        unprocessedMessageList = await this.OnReceiveAsync(maxMessageCount, operationTimeout)
 0376                            .ConfigureAwait(false);
 0377                    }, operationTimeout);
 0378                await receiveTask.ConfigureAwait(false);
 379
 0380            }
 0381            catch (Exception exception)
 382            {
 0383                if (isDiagnosticSourceEnabled)
 384                {
 0385                    this.diagnosticSource.ReportException(exception);
 386                }
 387
 0388                MessagingEventSource.Log.MessageReceiveException(this.ClientId, exception);
 0389                throw;
 390            }
 391            finally
 392            {
 0393                this.diagnosticSource.ReceiveStop(activity, maxMessageCount, receiveTask?.Status, unprocessedMessageList
 394            }
 395
 0396            MessagingEventSource.Log.MessageReceiveStop(this.ClientId, unprocessedMessageList?.Count ?? 0);
 397
 0398            if (unprocessedMessageList == null)
 399            {
 0400                return unprocessedMessageList;
 401            }
 402
 0403            return await this.ProcessMessages(unprocessedMessageList).ConfigureAwait(false);
 0404        }
 405
 406        /// <summary>
 407        /// Receives a specific deferred message identified by <paramref name="sequenceNumber"/>.
 408        /// </summary>
 409        /// <param name="sequenceNumber">The sequence number of the message that will be received.</param>
 410        /// <returns>Message identified by sequence number <paramref name="sequenceNumber"/>. Returns null if no such me
 411        /// Throws if the message has not been deferred.</returns>
 412        /// <seealso cref="DeferAsync"/>
 413        public async Task<Message> ReceiveDeferredMessageAsync(long sequenceNumber)
 414        {
 0415            var messages = await this.ReceiveDeferredMessageAsync(new[] { sequenceNumber }).ConfigureAwait(false);
 0416            if (messages != null && messages.Count > 0)
 417            {
 0418                return messages[0];
 419            }
 420
 0421            return null;
 0422        }
 423
 424        /// <summary>
 425        /// Receives a <see cref="IList{Message}"/> of deferred messages identified by <paramref name="sequenceNumbers"/
 426        /// </summary>
 427        /// <param name="sequenceNumbers">An <see cref="IEnumerable{T}"/> containing the sequence numbers to receive.</p
 428        /// <returns>Messages identified by sequence number are returned. Returns null if no messages are found.
 429        /// Throws if the messages have not been deferred.</returns>
 430        /// <seealso cref="DeferAsync"/>
 431        public async Task<IList<Message>> ReceiveDeferredMessageAsync(IEnumerable<long> sequenceNumbers)
 432        {
 0433            this.ThrowIfClosed();
 0434            this.ThrowIfNotPeekLockMode();
 435
 0436            if (sequenceNumbers == null)
 437            {
 0438                throw Fx.Exception.ArgumentNull(nameof(sequenceNumbers));
 439            }
 0440            var sequenceNumberList = sequenceNumbers.ToArray();
 0441            if (sequenceNumberList.Length == 0)
 442            {
 0443                throw Fx.Exception.ArgumentNull(nameof(sequenceNumbers));
 444            }
 445
 0446            MessagingEventSource.Log.MessageReceiveDeferredMessageStart(this.ClientId, sequenceNumberList.Length, sequen
 447
 0448            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0449            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.ReceiveDeferredStart(sequenceNumberLis
 0450            Task receiveTask = null;
 451
 0452            IList<Message> messages = null;
 453            try
 454            {
 0455                receiveTask = this.RetryPolicy.RunOperation(
 0456                    async () =>
 0457                    {
 0458                        messages = await this.OnReceiveDeferredMessageAsync(sequenceNumberList).ConfigureAwait(false);
 0459                    }, this.OperationTimeout);
 0460                await receiveTask.ConfigureAwait(false);
 0461            }
 0462            catch (Exception exception)
 463            {
 0464                if (isDiagnosticSourceEnabled)
 465                {
 0466                    this.diagnosticSource.ReportException(exception);
 467                }
 468
 0469                MessagingEventSource.Log.MessageReceiveDeferredMessageException(this.ClientId, exception);
 0470                throw;
 471            }
 472            finally
 473            {
 0474                this.diagnosticSource.ReceiveDeferredStop(activity, sequenceNumberList, receiveTask?.Status, messages);
 475            }
 0476            MessagingEventSource.Log.MessageReceiveDeferredMessageStop(this.ClientId, messages?.Count ?? 0);
 477
 0478            return messages;
 0479        }
 480
 481        /// <summary>
 482        /// Completes a <see cref="Message"/> using its lock token. This will delete the message from the service.
 483        /// </summary>
 484        /// <param name="lockToken">The lock token of the corresponding message to complete.</param>
 485        /// <remarks>
 486        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 487        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 488        /// This operation can only be performed on messages that were received by this receiver.
 489        /// </remarks>
 490        public Task CompleteAsync(string lockToken)
 491        {
 0492            return this.CompleteAsync(new[] { lockToken });
 493        }
 494
 495        /// <summary>
 496        /// Completes a series of <see cref="Message"/> using a list of lock tokens. This will delete the message from t
 497        /// </summary>
 498        /// <remarks>
 499        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 500        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 501        /// This operation can only be performed on messages that were received by this receiver.
 502        /// </remarks>
 503        /// <param name="lockTokens">An <see cref="IEnumerable{T}"/> containing the lock tokens of the corresponding mes
 504        public async Task CompleteAsync(IEnumerable<string> lockTokens)
 505        {
 0506            this.ThrowIfClosed();
 0507            this.ThrowIfNotPeekLockMode();
 0508            if (lockTokens == null)
 509            {
 0510                throw Fx.Exception.ArgumentNull(nameof(lockTokens));
 511            }
 0512            var lockTokenList = lockTokens.ToList();
 0513            if (lockTokenList.Count == 0)
 514            {
 0515                throw Fx.Exception.Argument(nameof(lockTokens), Resources.ListOfLockTokensCannotBeEmpty);
 516            }
 517
 0518            MessagingEventSource.Log.MessageCompleteStart(this.ClientId, lockTokenList.Count, lockTokenList);
 0519            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0520            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.CompleteStart(lockTokenList) : null;
 0521            Task completeTask = null;
 522
 523            try
 524            {
 0525                completeTask =
 0526                    this.RetryPolicy.RunOperation(() => this.OnCompleteAsync(lockTokenList), this.OperationTimeout);
 0527                await completeTask.ConfigureAwait(false);
 0528            }
 0529            catch (Exception exception)
 530            {
 0531                if (isDiagnosticSourceEnabled)
 532                {
 0533                    this.diagnosticSource.ReportException(exception);
 534                }
 535
 0536                MessagingEventSource.Log.MessageCompleteException(this.ClientId, exception);
 537
 0538                throw;
 539            }
 540            finally
 541            {
 0542                this.diagnosticSource.CompleteStop(activity, lockTokenList, completeTask?.Status);
 543            }
 544
 0545            MessagingEventSource.Log.MessageCompleteStop(this.ClientId);
 0546        }
 547
 548        /// <summary>
 549        /// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processi
 550        /// </summary>
 551        /// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
 552        /// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</para
 553        /// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 554        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 555        /// Abandoning a message will increase the delivery count on the message.
 556        /// This operation can only be performed on messages that were received by this receiver.
 557        /// </remarks>
 558        public async Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
 559        {
 0560            this.ThrowIfClosed();
 0561            this.ThrowIfNotPeekLockMode();
 562
 0563            MessagingEventSource.Log.MessageAbandonStart(this.ClientId, 1, lockToken);
 0564            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0565            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.DisposeStart("Abandon", lockToken) : n
 0566            Task abandonTask = null;
 567
 568            try
 569            {
 0570                abandonTask = this.RetryPolicy.RunOperation(() => this.OnAbandonAsync(lockToken, propertiesToModify),
 0571                    this.OperationTimeout);
 0572                await abandonTask.ConfigureAwait(false);
 0573            }
 0574            catch (Exception exception)
 575            {
 0576                if (isDiagnosticSourceEnabled)
 577                {
 0578                    this.diagnosticSource.ReportException(exception);
 579                }
 580
 0581                MessagingEventSource.Log.MessageAbandonException(this.ClientId, exception);
 0582                throw;
 583            }
 584            finally
 585            {
 0586                this.diagnosticSource.DisposeStop(activity, lockToken, abandonTask?.Status);
 587            }
 588
 589
 0590            MessagingEventSource.Log.MessageAbandonStop(this.ClientId);
 0591        }
 592
 593        /// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
 594        /// <param name="lockToken">The lock token of the <see cref="Message" />.</param>
 595        /// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param
 596        /// <remarks>
 597        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 598        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 599        /// In order to receive this message again in the future, you will need to save the <see cref="Message.SystemPro
 600        /// and receive it using <see cref="ReceiveDeferredMessageAsync(long)"/>.
 601        /// Deferring messages does not impact message's expiration, meaning that deferred messages can still expire.
 602        /// This operation can only be performed on messages that were received by this receiver.
 603        /// </remarks>
 604        public async Task DeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
 605        {
 0606            this.ThrowIfClosed();
 0607            this.ThrowIfNotPeekLockMode();
 608
 0609            MessagingEventSource.Log.MessageDeferStart(this.ClientId, 1, lockToken);
 0610            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0611            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.DisposeStart("Defer", lockToken) : nul
 0612            Task deferTask = null;
 613
 614            try
 615            {
 0616                deferTask = this.RetryPolicy.RunOperation(() => this.OnDeferAsync(lockToken, propertiesToModify),
 0617                    this.OperationTimeout);
 0618                await deferTask.ConfigureAwait(false);
 0619            }
 0620            catch (Exception exception)
 621            {
 0622                if (isDiagnosticSourceEnabled)
 623                {
 0624                    this.diagnosticSource.ReportException(exception);
 625                }
 626
 0627                MessagingEventSource.Log.MessageDeferException(this.ClientId, exception);
 0628                throw;
 629            }
 630            finally
 631            {
 0632                this.diagnosticSource.DisposeStop(activity, lockToken, deferTask?.Status);
 633            }
 0634            MessagingEventSource.Log.MessageDeferStop(this.ClientId);
 0635        }
 636
 637        /// <summary>
 638        /// Moves a message to the deadletter sub-queue.
 639        /// </summary>
 640        /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
 641        /// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
 642        /// <remarks>
 643        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 644        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 645        /// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>,
 646        /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
 647        /// This operation can only be performed on messages that were received by this receiver.
 648        /// </remarks>
 649        public async Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
 650        {
 0651            this.ThrowIfClosed();
 0652            this.ThrowIfNotPeekLockMode();
 653
 0654            MessagingEventSource.Log.MessageDeadLetterStart(this.ClientId, 1, lockToken);
 0655            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0656            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.DisposeStart("DeadLetter", lockToken) 
 0657            Task deadLetterTask = null;
 658
 659            try
 660            {
 0661                deadLetterTask = this.RetryPolicy.RunOperation(() => this.OnDeadLetterAsync(lockToken, propertiesToModif
 0662                    this.OperationTimeout);
 0663                await deadLetterTask.ConfigureAwait(false);
 0664            }
 0665            catch (Exception exception)
 666            {
 0667                if (isDiagnosticSourceEnabled)
 668                {
 0669                    this.diagnosticSource.ReportException(exception);
 670                }
 671
 0672                MessagingEventSource.Log.MessageDeadLetterException(this.ClientId, exception);
 0673                throw;
 674            }
 675            finally
 676            {
 0677                this.diagnosticSource.DisposeStop(activity, lockToken, deadLetterTask?.Status);
 678            }
 0679            MessagingEventSource.Log.MessageDeadLetterStop(this.ClientId);
 0680        }
 681
 682        /// <summary>
 683        /// Moves a message to the deadletter sub-queue.
 684        /// </summary>
 685        /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
 686        /// <param name="deadLetterReason">The reason for deadlettering the message.</param>
 687        /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
 688        /// <remarks>
 689        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 690        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 691        /// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>,
 692        /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
 693        /// This operation can only be performed on messages that were received by this receiver.
 694        /// </remarks>
 695        public async Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription =
 696        {
 0697            this.ThrowIfClosed();
 0698            this.ThrowIfNotPeekLockMode();
 699
 0700            MessagingEventSource.Log.MessageDeadLetterStart(this.ClientId, 1, lockToken);
 0701            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0702            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.DisposeStart("DeadLetter", lockToken) 
 0703            Task deadLetterTask = null;
 704
 705            try
 706            {
 0707                deadLetterTask =
 0708                    this.RetryPolicy.RunOperation(
 0709                        () => this.OnDeadLetterAsync(lockToken, null, deadLetterReason, deadLetterErrorDescription),
 0710                        this.OperationTimeout);
 0711                await deadLetterTask.ConfigureAwait(false);
 0712            }
 0713            catch (Exception exception)
 714            {
 0715                if (isDiagnosticSourceEnabled)
 716                {
 0717                    this.diagnosticSource.ReportException(exception);
 718                }
 719
 0720                MessagingEventSource.Log.MessageDeadLetterException(this.ClientId, exception);
 0721                throw;
 722            }
 723            finally
 724            {
 0725                this.diagnosticSource.DisposeStop(activity, lockToken, deadLetterTask?.Status);
 726            }
 727
 0728            MessagingEventSource.Log.MessageDeadLetterStop(this.ClientId);
 0729        }
 730
 731        /// <summary>
 732        /// Renews the lock on the message specified by the lock token. The lock will be renewed based on the setting sp
 733        /// </summary>
 734        /// <remarks>
 735        /// When a message is received in <see cref="ServiceBus.ReceiveMode.PeekLock"/> mode, the message is locked on t
 736        /// receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration).
 737        /// If processing of the message requires longer than this duration, the lock needs to be renewed.
 738        /// For each renewal, it resets the time the message is locked by the LockDuration set on the Entity.
 739        /// </remarks>
 740        public async Task RenewLockAsync(Message message)
 741        {
 0742            message.SystemProperties.LockedUntilUtc = await RenewLockAsync(message.SystemProperties.LockToken).Configure
 0743        }
 744
 745        /// <summary>
 746        /// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue.
 747        /// <returns>New lock token expiry date and time in UTC format.</returns>
 748        /// </summary>
 749        /// <param name="lockToken">Lock token associated with the message.</param>
 750        /// <remarks>
 751        /// When a message is received in <see cref="ServiceBus.ReceiveMode.PeekLock"/> mode, the message is locked on t
 752        /// receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration).
 753        /// If processing of the message requires longer than this duration, the lock needs to be renewed.
 754        /// For each renewal, it resets the time the message is locked by the LockDuration set on the Entity.
 755        /// </remarks>
 756        public async Task<DateTime> RenewLockAsync(string lockToken)
 757        {
 0758            this.ThrowIfClosed();
 0759            this.ThrowIfNotPeekLockMode();
 760
 0761            MessagingEventSource.Log.MessageRenewLockStart(this.ClientId, 1, lockToken);
 0762            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0763            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.RenewLockStart(lockToken) : null;
 0764            Task renewTask = null;
 765
 0766            var lockedUntilUtc = DateTime.MinValue;
 767
 768            try
 769            {
 0770                renewTask = this.RetryPolicy.RunOperation(
 0771                    async () => lockedUntilUtc = await this.OnRenewLockAsync(lockToken).ConfigureAwait(false),
 0772                    this.OperationTimeout);
 0773                await renewTask.ConfigureAwait(false);
 0774            }
 0775            catch (Exception exception)
 776            {
 0777                if (isDiagnosticSourceEnabled)
 778                {
 0779                    this.diagnosticSource.ReportException(exception);
 780                }
 781
 0782                MessagingEventSource.Log.MessageRenewLockException(this.ClientId, exception);
 0783                throw;
 784            }
 785            finally
 786            {
 0787                this.diagnosticSource.RenewLockStop(activity, lockToken, renewTask?.Status, lockedUntilUtc);
 788            }
 0789            MessagingEventSource.Log.MessageRenewLockStop(this.ClientId);
 790
 0791            return lockedUntilUtc;
 0792        }
 793
 794        /// <summary>
 795        /// Fetches the next active message without changing the state of the receiver or the message source.
 796        /// </summary>
 797        /// <remarks>
 798        /// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequ
 799        /// fetches the subsequent message in the entity.
 800        /// Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot b
 801        /// Also, unlike <see cref="ReceiveAsync()"/>, this method will fetch even Deferred messages (but not Deadletter
 802        /// </remarks>
 803        /// <returns>The <see cref="Message" /> that represents the next message to be read. Returns null when nothing t
 804        public Task<Message> PeekAsync()
 805        {
 0806            return this.PeekBySequenceNumberAsync(this.lastPeekedSequenceNumber + 1);
 807        }
 808
 809        /// <summary>
 810        /// Fetches the next batch of active messages without changing the state of the receiver or the message source.
 811        /// </summary>
 812        /// <remarks>
 813        /// The first call to <see cref="PeekAsync()"/> fetches the first active message for this receiver. Each subsequ
 814        /// fetches the subsequent message in the entity.
 815        /// Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot b
 816        /// Also, unlike <see cref="ReceiveAsync()"/>, this method will fetch even Deferred messages (but not Deadletter
 817        /// </remarks>
 818        /// <returns>List of <see cref="Message" /> that represents the next message to be read. Returns null when nothi
 819        public Task<IList<Message>> PeekAsync(int maxMessageCount)
 820        {
 0821            return this.PeekBySequenceNumberAsync(this.lastPeekedSequenceNumber + 1, maxMessageCount);
 822        }
 823
 824        /// <summary>
 825        /// Asynchronously reads the next message without changing the state of the receiver or the message source.
 826        /// </summary>
 827        /// <param name="fromSequenceNumber">The sequence number from where to read the message.</param>
 828        /// <returns>The asynchronous operation that returns the <see cref="Message" /> that represents the next message
 829        public async Task<Message> PeekBySequenceNumberAsync(long fromSequenceNumber)
 830        {
 0831            var messages = await this.PeekBySequenceNumberAsync(fromSequenceNumber, 1).ConfigureAwait(false);
 0832            return messages?.FirstOrDefault();
 0833        }
 834
 835        /// <summary>Peeks a batch of messages.</summary>
 836        /// <param name="fromSequenceNumber">The starting point from which to browse a batch of messages.</param>
 837        /// <param name="messageCount">The number of messages to retrieve.</param>
 838        /// <returns>A batch of messages peeked.</returns>
 839        public async Task<IList<Message>> PeekBySequenceNumberAsync(long fromSequenceNumber, int messageCount)
 840        {
 0841            this.ThrowIfClosed();
 0842            IList<Message> messages = null;
 843
 0844            MessagingEventSource.Log.MessagePeekStart(this.ClientId, fromSequenceNumber, messageCount);
 0845            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0846            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.PeekStart(fromSequenceNumber, messageC
 0847            Task peekTask = null;
 848
 849            try
 850            {
 0851                peekTask = this.RetryPolicy.RunOperation(
 0852                    async () =>
 0853                    {
 0854                        messages = await this.OnPeekAsync(fromSequenceNumber, messageCount).ConfigureAwait(false);
 0855                    }, this.OperationTimeout);
 856
 0857                await peekTask.ConfigureAwait(false);
 0858            }
 0859            catch (Exception exception)
 860            {
 0861                if (isDiagnosticSourceEnabled)
 862                {
 0863                    this.diagnosticSource.ReportException(exception);
 864                }
 865
 0866                MessagingEventSource.Log.MessagePeekException(this.ClientId, exception);
 0867                throw;
 868            }
 869            finally
 870            {
 0871                this.diagnosticSource.PeekStop(activity, fromSequenceNumber, messageCount, peekTask?.Status, messages);
 872            }
 873
 0874            MessagingEventSource.Log.MessagePeekStop(this.ClientId, messages?.Count ?? 0);
 0875            return messages;
 0876        }
 877
 878        /// <summary>
 879        /// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receiv
 880        /// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is
 881        /// </summary>
 882        /// <param name="handler">A <see cref="Func{T1, T2, TResult}"/> that processes messages.</param>
 883        /// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is used to notify exceptions.<
 884        public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventAr
 885        {
 0886            this.RegisterMessageHandler(handler, new MessageHandlerOptions(exceptionReceivedHandler));
 0887        }
 888
 889        /// <summary>
 890        /// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receiv
 891        /// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is
 892        /// </summary>
 893        /// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param
 894        /// <param name="messageHandlerOptions">The <see cref="MessageHandlerOptions"/> options used to configure the se
 895        /// <remarks>Enable prefetch to speed up the receive rate.</remarks>
 896        public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions message
 897        {
 0898            this.ThrowIfClosed();
 0899            this.OnMessageHandler(messageHandlerOptions, handler);
 0900        }
 901
 902        /// <summary>
 903        /// Registers a <see cref="ServiceBusPlugin"/> to be used with this receiver.
 904        /// </summary>
 905        public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)
 906        {
 0907            this.ThrowIfClosed();
 0908            if (serviceBusPlugin == null)
 909            {
 0910                throw new ArgumentNullException(nameof(serviceBusPlugin), Resources.ArgumentNullOrWhiteSpace.FormatForUs
 911            }
 0912            if (this.RegisteredPlugins.Any(p => p.Name == serviceBusPlugin.Name))
 913            {
 0914                throw new ArgumentException(nameof(serviceBusPlugin), Resources.PluginAlreadyRegistered.FormatForUser(se
 915            }
 0916            this.RegisteredPlugins.Add(serviceBusPlugin);
 0917        }
 918
 919        /// <summary>
 920        /// Unregisters a <see cref="ServiceBusPlugin"/>.
 921        /// </summary>
 922        /// <param name="serviceBusPluginName">The <see cref="ServiceBusPlugin.Name"/> of the plugin to be unregistered.
 923        public override void UnregisterPlugin(string serviceBusPluginName)
 924        {
 0925            this.ThrowIfClosed();
 0926            if (this.RegisteredPlugins == null)
 927            {
 0928                return;
 929            }
 0930            if (string.IsNullOrWhiteSpace(serviceBusPluginName))
 931            {
 0932                throw new ArgumentNullException(nameof(serviceBusPluginName), Resources.ArgumentNullOrWhiteSpace.FormatF
 933            }
 0934            if (this.RegisteredPlugins.Any(p => p.Name == serviceBusPluginName))
 935            {
 0936                var plugin = this.RegisteredPlugins.First(p => p.Name == serviceBusPluginName);
 0937                this.RegisteredPlugins.Remove(plugin);
 938            }
 0939        }
 940
 941        internal async Task GetSessionReceiverLinkAsync(TimeSpan serverWaitTime)
 942        {
 0943            var timeoutHelper = new TimeoutHelper(serverWaitTime, true);
 0944            var receivingAmqpLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).Config
 945
 0946            var source = (Source)receivingAmqpLink.Settings.Source;
 0947            if (!source.FilterSet.TryGetValue<string>(AmqpClientConstants.SessionFilterName, out var tempSessionId))
 948            {
 0949                receivingAmqpLink.Session.SafeClose();
 0950                throw new ServiceBusException(true, Resources.SessionFilterMissing);
 951            }
 952
 0953            if (string.IsNullOrWhiteSpace(tempSessionId))
 954            {
 0955                receivingAmqpLink.Session.SafeClose();
 0956                throw new ServiceBusException(true, Resources.AmqpFieldSessionId);
 957            }
 958
 0959            receivingAmqpLink.Closed += this.OnSessionReceiverLinkClosed;
 0960            this.SessionIdInternal = tempSessionId;
 0961            this.LockedUntilUtcInternal = receivingAmqpLink.Settings.Properties.TryGetValue<long>(AmqpClientConstants.Lo
 0962                ? new DateTime(lockedUntilUtcTicks, DateTimeKind.Utc)
 0963                : DateTime.MinValue;
 0964        }
 965
 966        internal async Task<AmqpResponseMessage> ExecuteRequestResponseAsync(AmqpRequestMessage amqpRequestMessage)
 967        {
 0968            var amqpMessage = amqpRequestMessage.AmqpMessage;
 0969            if (this.isSessionReceiver)
 970            {
 0971                this.ThrowIfSessionLockLost();
 972            }
 973
 0974            var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
 975
 0976            ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
 0977            var ambientTransaction = Transaction.Current;
 0978            if (ambientTransaction != null)
 979            {
 0980                transactionId = await AmqpTransactionManager.Instance.EnlistAsync(ambientTransaction, this.ServiceBusCon
 981            }
 982
 0983            if (!this.RequestResponseLinkManager.TryGetOpenedObject(out var requestResponseAmqpLink))
 984            {
 0985                MessagingEventSource.Log.CreatingNewLink(this.ClientId, this.isSessionReceiver, this.SessionIdInternal, 
 0986                requestResponseAmqpLink = await this.RequestResponseLinkManager.GetOrCreateAsync(timeoutHelper.Remaining
 987            }
 988
 0989            var responseAmqpMessage = await Task.Factory.FromAsync(
 0990                (c, s) => requestResponseAmqpLink.BeginRequest(amqpMessage, transactionId, timeoutHelper.RemainingTime()
 0991                (a) => requestResponseAmqpLink.EndRequest(a),
 0992                this).ConfigureAwait(false);
 993
 0994            return AmqpResponseMessage.CreateResponse(responseAmqpMessage);
 0995        }
 996
 997        protected override async Task OnClosingAsync()
 998        {
 0999            this.clientLinkManager.Close();
 01000            lock (this.messageReceivePumpSyncLock)
 1001            {
 01002                if (this.receivePump != null)
 1003                {
 01004                    this.receivePumpCancellationTokenSource.Cancel();
 01005                    this.receivePumpCancellationTokenSource.Dispose();
 01006                    this.receivePump = null;
 1007                }
 01008            }
 01009            await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false);
 01010            await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
 01011            this.requestResponseLockedMessages.Close();
 01012        }
 1013
 1014        protected virtual async Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)
 1015        {
 01016            ReceivingAmqpLink receiveLink = null;
 1017
 01018            if (this.isSessionReceiver)
 1019            {
 01020                this.ThrowIfSessionLockLost();
 1021            }
 1022
 1023            try
 1024            {
 01025                var timeoutHelper = new TimeoutHelper(serverWaitTime, true);
 01026                if(!this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
 1027                {
 01028                    MessagingEventSource.Log.CreatingNewLink(this.ClientId, this.isSessionReceiver, this.SessionIdIntern
 01029                    receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).Configur
 1030                }
 1031
 01032                IList<Message> brokeredMessages = null;
 01033                this.ThrowIfClosed();
 1034
 01035                IEnumerable<AmqpMessage> amqpMessages = null;
 01036                var hasMessages = await Task.Factory.FromAsync(
 01037                    (c, s) => receiveLink.BeginReceiveRemoteMessages(maxMessageCount, DefaultBatchFlushInterval, timeout
 01038                    a => receiveLink.EndReceiveMessages(a, out amqpMessages),
 01039                    this).ConfigureAwait(false);
 1040                Exception exception;
 01041                if ((exception = receiveLink.GetInnerException()) != null)
 1042                {
 01043                    throw exception;
 1044                }
 1045
 01046                if (hasMessages && amqpMessages != null)
 1047                {
 01048                    foreach (var amqpMessage in amqpMessages)
 1049                    {
 01050                        if (this.ReceiveMode == ReceiveMode.ReceiveAndDelete)
 1051                        {
 01052                            receiveLink.DisposeDelivery(amqpMessage, true, AmqpConstants.AcceptedOutcome);
 1053                        }
 1054
 01055                        var message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage);
 01056                        if (brokeredMessages == null)
 1057                        {
 01058                            brokeredMessages = new List<Message>();
 1059                        }
 1060
 01061                        brokeredMessages.Add(message);
 1062                    }
 1063                }
 1064
 01065                return brokeredMessages;
 1066            }
 1067            catch (Exception exception)
 1068            {
 01069                throw AmqpExceptionHelper.GetClientException(exception, receiveLink?.GetTrackingId(), null, receiveLink?
 1070            }
 01071        }
 1072
 1073        protected virtual async Task<IList<Message>> OnPeekAsync(long fromSequenceNumber, int messageCount = 1)
 1074        {
 1075            try
 1076            {
 01077                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(
 01078                        ManagementConstants.Operations.PeekMessageOperation,
 01079                        this.OperationTimeout,
 01080                        null);
 1081
 01082                if (this.ReceiveLinkManager.TryGetOpenedObject(out var receiveLink))
 1083                {
 01084                    amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkN
 1085                }
 1086
 01087                amqpRequestMessage.Map[ManagementConstants.Properties.FromSequenceNumber] = fromSequenceNumber;
 01088                amqpRequestMessage.Map[ManagementConstants.Properties.MessageCount] = messageCount;
 1089
 01090                if (!string.IsNullOrWhiteSpace(this.SessionIdInternal))
 1091                {
 01092                    amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
 1093                }
 1094
 01095                var messages = new List<Message>();
 1096
 01097                var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(fals
 01098                if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 1099                {
 01100                    Message message = null;
 01101                    var messageList = amqpResponseMessage.GetListValue<AmqpMap>(ManagementConstants.Properties.Messages)
 01102                    foreach (AmqpMap entry in messageList)
 1103                    {
 01104                        var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
 01105                        var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), t
 01106                        message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage, true);
 01107                        messages.Add(message);
 1108                    }
 1109
 01110                    if (message != null)
 1111                    {
 01112                        this.LastPeekedSequenceNumber = message.SystemProperties.SequenceNumber;
 1113                    }
 1114
 01115                    return messages;
 1116                }
 1117
 01118                if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.NoContent ||
 01119                    (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.NotFound && Equals(AmqpClientConstants.Mes
 1120                {
 01121                    return messages;
 1122                }
 1123
 01124                throw amqpResponseMessage.ToMessagingContractException();
 1125            }
 1126            catch (Exception exception)
 1127            {
 01128                throw AmqpExceptionHelper.GetClientException(exception);
 1129            }
 01130        }
 1131
 1132        protected virtual async Task<IList<Message>> OnReceiveDeferredMessageAsync(long[] sequenceNumbers)
 1133        {
 01134            var messages = new List<Message>();
 1135            try
 1136            {
 01137                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.ReceiveBySequen
 1138
 01139                if (this.ReceiveLinkManager.TryGetOpenedObject(out var receiveLink))
 1140                {
 01141                    amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkN
 1142                }
 01143                amqpRequestMessage.Map[ManagementConstants.Properties.SequenceNumbers] = sequenceNumbers;
 01144                amqpRequestMessage.Map[ManagementConstants.Properties.ReceiverSettleMode] = (uint)(this.ReceiveMode == R
 01145                if (!string.IsNullOrWhiteSpace(this.SessionIdInternal))
 1146                {
 01147                    amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
 1148                }
 1149
 01150                var response = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(false);
 1151
 01152                if (response.StatusCode == AmqpResponseStatusCode.OK)
 1153                {
 01154                    var amqpMapList = response.GetListValue<AmqpMap>(ManagementConstants.Properties.Messages);
 01155                    foreach (var entry in amqpMapList)
 1156                    {
 01157                        var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
 01158                        var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), t
 01159                        var message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage);
 01160                        if (entry.TryGetValue<Guid>(ManagementConstants.Properties.LockToken, out var lockToken))
 1161                        {
 01162                            message.SystemProperties.LockTokenGuid = lockToken;
 01163                            this.requestResponseLockedMessages.AddOrUpdate(lockToken, message.SystemProperties.LockedUnt
 1164                        }
 1165
 01166                        messages.Add(message);
 1167                    }
 1168                }
 1169                else
 1170                {
 01171                    throw response.ToMessagingContractException();
 1172                }
 01173            }
 1174            catch (Exception exception)
 1175            {
 01176                throw AmqpExceptionHelper.GetClientException(exception);
 1177            }
 1178
 01179            return messages;
 01180        }
 1181
 1182        protected virtual Task OnCompleteAsync(IEnumerable<string> lockTokens)
 1183        {
 01184            var lockTokenGuids = lockTokens.Select(lt => new Guid(lt)).ToArray();
 01185            if (lockTokenGuids.Any(lt => this.requestResponseLockedMessages.Contains(lt)))
 1186            {
 01187                return this.DisposeMessageRequestResponseAsync(lockTokenGuids, DispositionStatus.Completed);
 1188            }
 01189            return this.DisposeMessagesAsync(lockTokenGuids, AmqpConstants.AcceptedOutcome);
 1190        }
 1191
 1192        protected virtual Task OnAbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
 1193        {
 01194            var lockTokens = new[] { new Guid(lockToken) };
 01195            if (lockTokens.Any(lt => this.requestResponseLockedMessages.Contains(lt)))
 1196            {
 01197                return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Abandoned, propertiesToModi
 1198            }
 01199            return this.DisposeMessagesAsync(lockTokens, GetAbandonOutcome(propertiesToModify));
 1200        }
 1201
 1202        protected virtual Task OnDeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
 1203        {
 01204            var lockTokens = new[] { new Guid(lockToken) };
 01205            if (lockTokens.Any(lt => this.requestResponseLockedMessages.Contains(lt)))
 1206            {
 01207                return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Defered, propertiesToModify
 1208            }
 01209            return this.DisposeMessagesAsync(lockTokens, GetDeferOutcome(propertiesToModify));
 1210        }
 1211
 1212        protected virtual Task OnDeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null
 1213        {
 01214            if (deadLetterReason != null && deadLetterReason.Length > Constants.MaxDeadLetterReasonLength)
 1215            {
 01216                throw new ArgumentOutOfRangeException(nameof(deadLetterReason), $"Maximum permitted length is {Constants
 1217            }
 1218
 01219            if (deadLetterErrorDescription != null && deadLetterErrorDescription.Length > Constants.MaxDeadLetterReasonL
 1220            {
 01221                throw new ArgumentOutOfRangeException(nameof(deadLetterErrorDescription), $"Maximum permitted length is 
 1222            }
 1223
 01224            var lockTokens = new[] { new Guid(lockToken) };
 01225            if (lockTokens.Any(lt => this.requestResponseLockedMessages.Contains(lt)))
 1226            {
 01227                return this.DisposeMessageRequestResponseAsync(lockTokens, DispositionStatus.Suspended, propertiesToModi
 1228            }
 1229
 01230            return this.DisposeMessagesAsync(lockTokens, GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLe
 1231        }
 1232
 1233        protected virtual async Task<DateTime> OnRenewLockAsync(string lockToken)
 1234        {
 1235            DateTime lockedUntilUtc;
 1236            try
 1237            {
 1238                // Create an AmqpRequest Message to renew  lock
 01239                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.RenewLockOperat
 1240
 01241                if (this.ReceiveLinkManager.TryGetOpenedObject(out var receiveLink))
 1242                {
 01243                    amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkN
 1244                }
 01245                amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new[] { new Guid(lockToken) };
 1246
 01247                var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(fals
 1248
 01249                if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 1250                {
 01251                    var lockedUntilUtcTimes = amqpResponseMessage.GetValue<IEnumerable<DateTime>>(ManagementConstants.Pr
 01252                    lockedUntilUtc = lockedUntilUtcTimes.First();
 1253                }
 1254                else
 1255                {
 01256                    throw amqpResponseMessage.ToMessagingContractException();
 1257                }
 01258            }
 1259            catch (Exception exception)
 1260            {
 01261                throw AmqpExceptionHelper.GetClientException(exception);
 1262            }
 1263
 01264            return lockedUntilUtc;
 01265        }
 1266
 1267        /// <summary> </summary>
 1268        protected virtual void OnMessageHandler(
 1269            MessageHandlerOptions registerHandlerOptions,
 1270            Func<Message, CancellationToken, Task> callback)
 1271        {
 01272            MessagingEventSource.Log.RegisterOnMessageHandlerStart(this.ClientId, registerHandlerOptions);
 1273
 01274            lock (this.messageReceivePumpSyncLock)
 1275            {
 01276                if (this.receivePump != null)
 1277                {
 01278                    throw new InvalidOperationException(Resources.MessageHandlerAlreadyRegistered);
 1279                }
 1280
 01281                this.receivePumpCancellationTokenSource = new CancellationTokenSource();
 01282                this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnect
 01283            }
 1284
 1285            try
 1286            {
 01287                this.receivePump.StartPump();
 01288            }
 01289            catch (Exception exception)
 1290            {
 01291                MessagingEventSource.Log.RegisterOnMessageHandlerException(this.ClientId, exception);
 01292                lock (this.messageReceivePumpSyncLock)
 1293                {
 01294                    if (this.receivePump != null)
 1295                    {
 01296                        this.receivePumpCancellationTokenSource.Cancel();
 01297                        this.receivePumpCancellationTokenSource.Dispose();
 01298                        this.receivePump = null;
 1299                    }
 01300                }
 1301
 01302                throw;
 1303            }
 1304
 01305            MessagingEventSource.Log.RegisterOnMessageHandlerStop(this.ClientId);
 01306        }
 1307
 1308        static void CloseSession(ReceivingAmqpLink link)
 1309        {
 01310            link.Session.SafeClose();
 01311        }
 1312
 1313        static void CloseRequestResponseSession(RequestResponseAmqpLink requestResponseAmqpLink)
 1314        {
 01315            requestResponseAmqpLink.Session.SafeClose();
 01316        }
 1317
 1318        async Task<Message> ProcessMessage(Message message)
 1319        {
 01320            var processedMessage = message;
 01321            foreach (var plugin in this.RegisteredPlugins)
 1322            {
 1323                try
 1324                {
 01325                    MessagingEventSource.Log.PluginCallStarted(plugin.Name, message.MessageId);
 01326                    processedMessage = await plugin.AfterMessageReceive(message).ConfigureAwait(false);
 01327                    MessagingEventSource.Log.PluginCallCompleted(plugin.Name, message.MessageId);
 01328                }
 01329                catch (Exception ex)
 1330                {
 01331                    MessagingEventSource.Log.PluginCallFailed(plugin.Name, message.MessageId, ex);
 01332                    if (!plugin.ShouldContinueOnException)
 1333                    {
 01334                        throw;
 1335                    }
 01336                }
 01337            }
 01338            return processedMessage;
 01339        }
 1340
 1341        async Task<IList<Message>> ProcessMessages(IList<Message> messageList)
 1342        {
 01343            if (this.RegisteredPlugins.Count < 1)
 1344            {
 01345                return messageList;
 1346            }
 1347
 01348            var processedMessageList = new List<Message>();
 01349            foreach (var message in messageList)
 1350            {
 01351                var processedMessage = await this.ProcessMessage(message).ConfigureAwait(false);
 01352                processedMessageList.Add(processedMessage);
 1353            }
 1354
 01355            return processedMessageList;
 01356        }
 1357
 1358        async Task DisposeMessagesAsync(IEnumerable<Guid> lockTokens, Outcome outcome)
 1359        {
 01360            if(this.isSessionReceiver)
 1361            {
 01362                this.ThrowIfSessionLockLost();
 1363            }
 1364
 01365            var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
 01366            List<ArraySegment<byte>> deliveryTags = this.ConvertLockTokensToDeliveryTags(lockTokens);
 1367
 01368            ReceivingAmqpLink receiveLink = null;
 1369            try
 1370            {
 01371                ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
 01372                var ambientTransaction = Transaction.Current;
 01373                if (ambientTransaction != null)
 1374                {
 01375                    transactionId = await AmqpTransactionManager.Instance.EnlistAsync(ambientTransaction, this.ServiceBu
 1376                }
 1377
 01378                if (!this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
 1379                {
 01380                    MessagingEventSource.Log.CreatingNewLink(this.ClientId, this.isSessionReceiver, this.SessionIdIntern
 01381                    receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).Configur
 1382                }
 1383
 01384                var disposeMessageTasks = new Task<Outcome>[deliveryTags.Count];
 01385                var i = 0;
 01386                foreach (ArraySegment<byte> deliveryTag in deliveryTags)
 1387                {
 01388                    disposeMessageTasks[i++] = Task.Factory.FromAsync(
 01389                        (c, s) => receiveLink.BeginDisposeMessage(deliveryTag, transactionId, outcome, true, timeoutHelp
 01390                        a => receiveLink.EndDisposeMessage(a),
 01391                        this);
 1392                }
 1393
 01394                var outcomes = await Task.WhenAll(disposeMessageTasks).ConfigureAwait(false);
 01395                Error error = null;
 01396                foreach (var item in outcomes)
 1397                {
 01398                    var disposedOutcome = item.DescriptorCode == Rejected.Code && ((error = ((Rejected)item).Error) != n
 01399                    if (disposedOutcome != null)
 1400                    {
 01401                        if (error.Condition.Equals(AmqpErrorCode.NotFound))
 1402                        {
 01403                            if (this.isSessionReceiver)
 1404                            {
 01405                                throw new SessionLockLostException(Resources.SessionLockExpiredOnMessageSession);
 1406                            }
 1407
 01408                            throw new MessageLockLostException(Resources.MessageLockLost);
 1409                        }
 1410
 01411                        throw error.ToMessagingContractException();
 1412                    }
 1413                }
 01414            }
 01415            catch (Exception exception)
 1416            {
 01417                if (exception is OperationCanceledException &&
 01418                    receiveLink != null && receiveLink.State != AmqpObjectState.Opened)
 1419                {
 1420                    // The link state is lost, We need to return a non-retriable error.
 01421                    MessagingEventSource.Log.LinkStateLost(this.ClientId, receiveLink.Name, receiveLink.State, this.isSe
 01422                    if (this.isSessionReceiver)
 1423                    {
 01424                        throw new SessionLockLostException(Resources.SessionLockExpiredOnMessageSession);
 1425                    }
 1426
 01427                    throw new MessageLockLostException(Resources.MessageLockLost);
 1428                }
 1429
 01430                throw AmqpExceptionHelper.GetClientException(exception);
 1431            }
 01432        }
 1433
 1434        async Task DisposeMessageRequestResponseAsync(Guid[] lockTokens, DispositionStatus dispositionStatus, IDictionar
 1435        {
 1436            try
 1437            {
 1438                // Create an AmqpRequest Message to update disposition
 01439                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.UpdateDispositi
 1440
 01441                if (this.ReceiveLinkManager.TryGetOpenedObject(out var receiveLink))
 1442                {
 01443                    amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkN
 1444                }
 01445                amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens;
 01446                amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().
 1447
 01448                if (deadLetterReason != null)
 1449                {
 01450                    amqpRequestMessage.Map[ManagementConstants.Properties.DeadLetterReason] = deadLetterReason;
 1451                }
 1452
 01453                if (deadLetterDescription != null)
 1454                {
 01455                    amqpRequestMessage.Map[ManagementConstants.Properties.DeadLetterDescription] = deadLetterDescription
 1456                }
 1457
 01458                if (propertiesToModify != null)
 1459                {
 01460                    var amqpPropertiesToModify = new AmqpMap();
 01461                    foreach (var pair in propertiesToModify)
 1462                    {
 01463                        if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProper
 1464                        {
 01465                            amqpPropertiesToModify[new MapKey(pair.Key)] = amqpObject;
 1466                        }
 1467                        else
 1468                        {
 01469                            throw new NotSupportedException(
 01470                                Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetType()));
 1471                        }
 1472                    }
 1473
 01474                    if (amqpPropertiesToModify.Count > 0)
 1475                    {
 01476                        amqpRequestMessage.Map[ManagementConstants.Properties.PropertiesToModify] = amqpPropertiesToModi
 1477                    }
 1478                }
 1479
 01480                if (!string.IsNullOrWhiteSpace(this.SessionIdInternal))
 1481                {
 01482                    amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
 1483                }
 1484
 01485                var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(fals
 01486                if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
 1487                {
 01488                    throw amqpResponseMessage.ToMessagingContractException();
 1489                }
 01490            }
 1491            catch (Exception exception)
 1492            {
 01493                throw AmqpExceptionHelper.GetClientException(exception);
 1494            }
 01495        }
 1496
 1497        async Task<ReceivingAmqpLink> CreateLinkAsync(TimeSpan timeout)
 1498        {
 01499            FilterSet filterMap = null;
 1500
 01501            MessagingEventSource.Log.AmqpReceiveLinkCreateStart(this.ClientId, false, this.EntityType, this.Path);
 1502
 01503            if (this.isSessionReceiver)
 1504            {
 01505                filterMap = new FilterSet { { AmqpClientConstants.SessionFilterName, this.SessionIdInternal } };
 1506            }
 1507
 01508            var amqpLinkSettings = new AmqpLinkSettings
 01509            {
 01510                Role = true,
 01511                TotalLinkCredit = (uint)this.PrefetchCount,
 01512                AutoSendFlow = this.PrefetchCount > 0,
 01513                Source = new Source { Address = this.Path, FilterSet = filterMap },
 01514                SettleType = (this.ReceiveMode == ReceiveMode.PeekLock) ? SettleMode.SettleOnDispose : SettleMode.Settle
 01515            };
 1516
 01517            if (this.EntityType != null)
 1518            {
 01519                amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, (int)this.EntityType);
 1520            }
 1521
 01522            amqpLinkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.TotalMilliseconds);
 1523
 01524            var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.Path);
 01525            var claims = new[] { ClaimConstants.Listen };
 01526            var amqpSendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(
 01527                this.Path,
 01528                this.ServiceBusConnection,
 01529                endpointUri,
 01530                new string[] { endpointUri.AbsoluteUri },
 01531                claims,
 01532                this.CbsTokenProvider,
 01533                amqpLinkSettings,
 01534                this.ClientId);
 1535
 01536            Tuple<AmqpObject, DateTime> linkDetails = await amqpSendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().Conf
 1537
 01538            var receivingAmqpLink = (ReceivingAmqpLink) linkDetails.Item1;
 01539            var activeSendReceiveClientLink = new ActiveSendReceiveClientLink(
 01540                receivingAmqpLink,
 01541                endpointUri,
 01542                new string[] { endpointUri.AbsoluteUri },
 01543                claims,
 01544                linkDetails.Item2);
 1545
 01546            this.clientLinkManager.SetActiveSendReceiveLink(activeSendReceiveClientLink);
 1547
 01548            MessagingEventSource.Log.AmqpReceiveLinkCreateStop(this.ClientId);
 1549
 01550            return receivingAmqpLink;
 01551        }
 1552
 1553        // TODO: Consolidate the link creation paths
 1554        async Task<RequestResponseAmqpLink> CreateRequestResponseLinkAsync(TimeSpan timeout)
 1555        {
 01556            var entityPath = this.Path + '/' + AmqpClientConstants.ManagementAddress;
 1557
 01558            MessagingEventSource.Log.AmqpReceiveLinkCreateStart(this.ClientId, true, this.EntityType, entityPath);
 01559            var amqpLinkSettings = new AmqpLinkSettings();
 01560            amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);
 1561
 01562            var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, entityPath);
 01563            string[] claims = { ClaimConstants.Manage, ClaimConstants.Listen };
 01564            var amqpRequestResponseLinkCreator = new AmqpRequestResponseLinkCreator(
 01565                entityPath,
 01566                this.ServiceBusConnection,
 01567                endpointUri,
 01568                new string[] { endpointUri.AbsoluteUri },
 01569                claims,
 01570                this.CbsTokenProvider,
 01571                amqpLinkSettings,
 01572                this.ClientId);
 1573
 01574            var linkDetails = await amqpRequestResponseLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);
 1575
 01576            var requestResponseAmqpLink = (RequestResponseAmqpLink)linkDetails.Item1;
 01577            var activeRequestResponseClientLink = new ActiveRequestResponseLink(
 01578                requestResponseAmqpLink,
 01579                endpointUri,
 01580                new string[] { endpointUri.AbsoluteUri },
 01581                claims,
 01582                linkDetails.Item2);
 01583            this.clientLinkManager.SetActiveRequestResponseLink(activeRequestResponseClientLink);
 1584
 01585            MessagingEventSource.Log.AmqpReceiveLinkCreateStop(this.ClientId);
 01586            return requestResponseAmqpLink;
 01587        }
 1588
 1589        void OnSessionReceiverLinkClosed(object sender, EventArgs e)
 1590        {
 01591            var receivingAmqpLink = (ReceivingAmqpLink)sender;
 01592            if (receivingAmqpLink != null)
 1593            {
 01594                var exception = receivingAmqpLink.GetInnerException();
 01595                if (!(exception is SessionLockLostException))
 1596                {
 01597                    exception = new SessionLockLostException("Session lock lost. Accept a new session", exception);
 1598                }
 1599
 01600                this.LinkException = exception;
 01601                MessagingEventSource.Log.SessionReceiverLinkClosed(this.ClientId, this.SessionIdInternal, this.LinkExcep
 1602            }
 01603        }
 1604
 1605        List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
 1606        {
 01607            return lockTokens.Select(lockToken => new ArraySegment<byte>(lockToken.ToByteArray())).ToList();
 1608        }
 1609
 1610        void ThrowIfNotPeekLockMode()
 1611        {
 01612            if (this.ReceiveMode != ReceiveMode.PeekLock)
 1613            {
 01614                throw Fx.Exception.AsError(new InvalidOperationException("The operation is only supported in 'PeekLock' 
 1615            }
 01616        }
 1617
 1618        void ThrowIfSessionLockLost()
 1619        {
 01620            if (this.LinkException != null)
 1621            {
 01622                throw this.LinkException;
 1623            }
 01624        }
 1625
 1626        Outcome GetAbandonOutcome(IDictionary<string, object> propertiesToModify)
 1627        {
 01628            return this.GetModifiedOutcome(propertiesToModify, false);
 1629        }
 1630
 1631        Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify)
 1632        {
 01633            return this.GetModifiedOutcome(propertiesToModify, true);
 1634        }
 1635
 1636        Outcome GetModifiedOutcome(IDictionary<string, object> propertiesToModify, bool undeliverableHere)
 1637        {
 01638            Modified modified = new Modified();
 01639            if (undeliverableHere)
 1640            {
 01641                modified.UndeliverableHere = true;
 1642            }
 1643
 01644            if (propertiesToModify != null)
 1645            {
 01646                modified.MessageAnnotations = new Fields();
 01647                foreach (var pair in propertiesToModify)
 1648                {
 01649                    if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, 
 1650                    {
 01651                        modified.MessageAnnotations.Add(pair.Key, amqpObject);
 1652                    }
 1653                    else
 1654                    {
 01655                        throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetT
 1656                    }
 1657                }
 1658            }
 1659
 01660            return modified;
 1661        }
 1662
 1663        Rejected GetRejectedOutcome(IDictionary<string, object> propertiesToModify, string deadLetterReason, string dead
 1664        {
 01665            var rejected = AmqpConstants.RejectedOutcome;
 01666            if (deadLetterReason != null || deadLetterErrorDescription != null || propertiesToModify != null)
 1667            {
 01668                rejected = new Rejected { Error = new Error { Condition = AmqpClientConstants.DeadLetterName, Info = new
 01669                if (deadLetterReason != null)
 1670                {
 01671                    rejected.Error.Info.Add(Message.DeadLetterReasonHeader, deadLetterReason);
 1672                }
 1673
 01674                if (deadLetterErrorDescription != null)
 1675                {
 01676                    rejected.Error.Info.Add(Message.DeadLetterErrorDescriptionHeader, deadLetterErrorDescription);
 1677                }
 1678
 01679                if (propertiesToModify != null)
 1680                {
 01681                    foreach (var pair in propertiesToModify)
 1682                    {
 01683                        if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProper
 1684                        {
 01685                            rejected.Error.Info.Add(pair.Key, amqpObject);
 1686                        }
 1687                        else
 1688                        {
 01689                            throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.
 1690                        }
 1691                    }
 1692                }
 1693            }
 1694
 01695            return rejected;
 1696        }
 1697    }
 1698}

Methods/Properties

.cctor()
.ctor(...)
.ctor(...)
.ctor(...)
.ctor(...)
.ctor(...)
get_RegisteredPlugins()
get_ReceiveMode()
get_PrefetchCount()
set_PrefetchCount(...)
get_LastPeekedSequenceNumber()
set_LastPeekedSequenceNumber(...)
get_Path()
get_OperationTimeout()
set_OperationTimeout(...)
get_ServiceBusConnection()
get_LockedUntilUtcInternal()
get_SessionIdInternal()
get_EntityType()
get_LinkException()
get_CbsTokenProvider()
get_ReceiveLinkManager()
get_RequestResponseLinkManager()
ReceiveAsync()
ReceiveAsync()
ReceiveAsync(...)
ReceiveAsync()
<ReceiveAsync()
ReceiveDeferredMessageAsync()
ReceiveDeferredMessageAsync()
<ReceiveDeferredMessageAsync()
CompleteAsync(...)
CompleteAsync()
AbandonAsync()
DeferAsync()
DeadLetterAsync()
DeadLetterAsync()
RenewLockAsync()
RenewLockAsync()
<RenewLockAsync()
PeekAsync()
PeekAsync(...)
PeekBySequenceNumberAsync()
PeekBySequenceNumberAsync()
<PeekBySequenceNumberAsync()
RegisterMessageHandler(...)
RegisterMessageHandler(...)
RegisterPlugin(...)
UnregisterPlugin(...)
GetSessionReceiverLinkAsync()
ExecuteRequestResponseAsync()
OnClosingAsync()
OnReceiveAsync()
OnPeekAsync()
OnReceiveDeferredMessageAsync()
OnCompleteAsync(...)
OnAbandonAsync(...)
OnDeferAsync(...)
OnDeadLetterAsync(...)
OnRenewLockAsync()
OnMessageHandler(...)
CloseSession(...)
CloseRequestResponseSession(...)
ProcessMessage()
ProcessMessages()
DisposeMessagesAsync()
DisposeMessageRequestResponseAsync()
CreateLinkAsync()
CreateRequestResponseLinkAsync()
OnSessionReceiverLinkClosed(...)
ConvertLockTokensToDeliveryTags(...)
ThrowIfNotPeekLockMode()
ThrowIfSessionLockLost()
GetAbandonOutcome(...)
GetDeferOutcome(...)
GetModifiedOutcome(...)
GetRejectedOutcome(...)