< Summary

Class:Azure.Messaging.ServiceBus.Amqp.AmqpReceiver
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpReceiver.cs
Covered lines:74
Uncovered lines:445
Coverable lines:519
Total lines:1305
Line coverage:14.2% (74 of 519)
Covered branches:13
Total branches:174
Branch coverage:7.4% (13 of 174)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_IsClosed()-100%100%
get_LastPeekedSequenceNumber()-0%100%
get_SessionId()-100%100%
get_SessionLockedUntil()-0%100%
.ctor(...)-90.91%100%
OpenManagementLinkAsync()-0%100%
OpenReceiverLinkAsync()-57.14%0%
CloseLink(...)-0%0%
CloseLink(...)-0%0%
ReceiveMessagesAsync()-76.92%100%
<ReceiveMessagesAsync()-83.33%100%
ReceiveMessagesAsyncInternal()-33.33%18.75%
CompleteAsync()-0%100%
<CompleteAsync()-0%100%
CompleteInternalAsync()-0%0%
DisposeMessagesAsync()-0%0%
ThrowLockLostException()-0%0%
DeferAsync()-0%100%
<DeferAsync()-0%100%
DeferInternalAsync(...)-0%0%
AbandonAsync()-0%100%
<AbandonAsync()-0%100%
AbandonInternalAsync(...)-0%0%
DeadLetterAsync()-0%100%
<DeadLetterAsync()-0%100%
DeadLetterInternalAsync(...)-0%0%
GetRejectedOutcome(...)-0%0%
DisposeMessageRequestResponseAsync()-0%0%
GetAbandonOutcome(...)-0%100%
GetDeferOutcome(...)-0%100%
ConvertLockTokensToDeliveryTags(...)-0%100%
GetModifiedOutcome(...)-0%0%
PeekMessagesAsync()-0%0%
<PeekMessagesAsync()-0%100%
PeekMessagesInternalAsync()-0%0%
RenewMessageLockAsync()-0%100%
<RenewMessageLockAsync()-0%100%
RenewMessageLockInternalAsync()-0%0%
ExecuteRequest()-0%100%
RenewSessionLockAsync()-0%100%
<RenewSessionLockAsync()-0%100%
RenewSessionLockInternal()-0%0%
GetStateAsync()-0%100%
<GetStateAsync()-0%100%
GetStateInternal()-0%0%
SetStateAsync()-0%100%
<SetStateAsync()-0%100%
SetStateInternal()-0%0%
ReceiveDeferredMessagesAsync()-0%100%
<ReceiveDeferredMessagesAsync()-0%100%
ReceiveDeferredMessagesAsyncInternal()-0%0%
CloseAsync()-58.33%50%
OnReceiverLinkClosed(...)-0%100%
OnManagementLinkClosed(...)-0%100%
UseMinimum(...)-100%50%
OpenLinkAsync()-0%0%
<OpenLinkAsync()-0%100%
HasLinkCommunicationError(...)-100%50%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpReceiver.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Linq;
 7using System.Runtime.ExceptionServices;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using System.Transactions;
 11using Azure.Core;
 12using Azure.Core.Diagnostics;
 13using Azure.Messaging.ServiceBus.Core;
 14using Azure.Messaging.ServiceBus.Diagnostics;
 15using Azure.Messaging.ServiceBus.Primitives;
 16using Microsoft.Azure.Amqp;
 17using Microsoft.Azure.Amqp.Encoding;
 18using Microsoft.Azure.Amqp.Framing;
 19
 20namespace Azure.Messaging.ServiceBus.Amqp
 21{
 22    /// <summary>
 23    /// A transport client abstraction responsible for brokering operations for AMQP-based connections.
 24    /// It is intended that the public <see cref="ServiceBusReceiver" /> make use of an instance
 25    /// via containment and delegate operations to it.
 26    /// </summary>
 27    ///
 28    /// <seealso cref="Azure.Messaging.ServiceBus.Core.TransportReceiver" />
 29#pragma warning disable CA1001 // Types that own disposable fields should be disposable
 30    internal class AmqpReceiver : TransportReceiver
 31#pragma warning restore CA1001 // Types that own disposable fields should be disposable
 32    {
 33        /// <summary>Indicates whether or not this instance has been closed.</summary>
 34        private bool _closed = false;
 35
 36        /// <summary>
 37        /// Indicates whether or not this receiver has been closed.
 38        /// </summary>
 39        ///
 40        /// <value>
 41        /// <c>true</c> if the receiver is closed; otherwise, <c>false</c>.
 42        /// </value>
 443        public override bool IsClosed => _closed;
 44
 45        /// <summary>
 46        /// The name of the Service Bus entity to which the receiver is bound.
 47        /// </summary>
 48        ///
 49        private readonly string _entityPath;
 50
 51        /// <summary>
 52        /// The policy to use for determining retry behavior for when an operation fails.
 53        /// </summary>
 54        private readonly ServiceBusRetryPolicy _retryPolicy;
 55
 56        /// <summary>
 57        /// Indicates whether or not this is a receiver scoped to a session.
 58        /// </summary>
 59        private readonly bool _isSessionReceiver;
 60
 61        /// <summary>
 62        /// The AMQP connection scope responsible for managing transport constructs for this instance.
 63        /// </summary>
 64        ///
 65        private readonly AmqpConnectionScope _connectionScope;
 66
 67        /// <summary>
 68        /// The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.
 69        /// </summary>
 70        private readonly ReceiveMode _receiveMode;
 71        private readonly string _identifier;
 72        private readonly FaultTolerantAmqpObject<ReceivingAmqpLink> _receiveLink;
 73
 74        private readonly FaultTolerantAmqpObject<RequestResponseAmqpLink> _managementLink;
 75
 76        /// <summary>
 77        /// Gets the sequence number of the last peeked message.
 78        /// </summary>
 079        public long LastPeekedSequenceNumber { get; private set; }
 80
 81        /// <summary>
 82        /// The Session Id associated with the receiver.
 83        /// </summary>
 6284        public override string SessionId { get; protected set; }
 85
 086        public override DateTimeOffset SessionLockedUntil { get; protected set; }
 87
 88        /// <summary>
 89        /// A map of locked messages received using the management client.
 90        /// </summary>
 91        private readonly ConcurrentExpiringSet<Guid> _requestResponseLockedMessages;
 92
 93        /// <summary>
 94        /// Initializes a new instance of the <see cref="AmqpReceiver"/> class.
 95        /// </summary>
 96        ///
 97        /// <param name="entityPath">The name of the Service Bus entity from which events will be consumed.</param>
 98        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 99        /// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults 
 100        /// <param name="connectionScope">The AMQP connection context for operations .</param>
 101        /// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
 102        /// <param name="identifier"></param>
 103        /// <param name="sessionId"></param>
 104        /// <param name="isSessionReceiver"></param>
 105        ///
 106        /// <remarks>
 107        /// As an internal type, this class performs only basic sanity checks against its arguments.  It
 108        /// is assumed that callers are trusted and have performed deep validation.
 109        ///
 110        /// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
 111        /// creation of clones or otherwise protecting the parameters is assumed to be the purview of the
 112        /// caller.
 113        /// </remarks>
 34114        public AmqpReceiver(
 34115            string entityPath,
 34116            ReceiveMode receiveMode,
 34117            uint prefetchCount,
 34118            AmqpConnectionScope connectionScope,
 34119            ServiceBusRetryPolicy retryPolicy,
 34120            string identifier,
 34121            string sessionId,
 34122            bool isSessionReceiver)
 123        {
 34124            Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));
 30125            Argument.AssertNotNull(connectionScope, nameof(connectionScope));
 28126            Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
 127
 26128            _entityPath = entityPath;
 26129            _connectionScope = connectionScope;
 26130            _retryPolicy = retryPolicy;
 26131            _isSessionReceiver = isSessionReceiver;
 26132            _receiveMode = receiveMode;
 26133            _identifier = identifier;
 26134            _requestResponseLockedMessages = new ConcurrentExpiringSet<Guid>();
 26135            SessionId = sessionId;
 136
 26137            _receiveLink = new FaultTolerantAmqpObject<ReceivingAmqpLink>(
 26138                timeout =>
 62139                    OpenReceiverLinkAsync(
 62140                        timeout: timeout,
 62141                        prefetchCount: prefetchCount,
 62142                        receiveMode: receiveMode,
 62143                        sessionId: SessionId,
 62144                        isSessionReceiver: isSessionReceiver),
 0145                link => CloseLink(link));
 146
 26147            _managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
 0148                timeout => OpenManagementLinkAsync(timeout),
 0149                link => CloseLink(link));
 26150        }
 151
 152        private async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
 153            TimeSpan timeout)
 154        {
 0155            RequestResponseAmqpLink link = await _connectionScope.OpenManagementLinkAsync(
 0156                _entityPath,
 0157                _identifier,
 0158                timeout,
 0159                CancellationToken.None).ConfigureAwait(false);
 0160            link.Closed += OnManagementLinkClosed;
 0161            return link;
 0162        }
 163
 164        private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
 165            TimeSpan timeout,
 166            uint prefetchCount,
 167            ReceiveMode receiveMode,
 168            string sessionId,
 169            bool isSessionReceiver)
 170        {
 36171            ServiceBusEventSource.Log.CreateReceiveLinkStart(_identifier);
 172
 173            try
 174            {
 36175                ReceivingAmqpLink link = await _connectionScope.OpenReceiverLinkAsync(
 36176                    entityPath: _entityPath,
 36177                    timeout: timeout,
 36178                    prefetchCount: prefetchCount,
 36179                    receiveMode: receiveMode,
 36180                    sessionId: sessionId,
 36181                    isSessionReceiver: isSessionReceiver,
 36182                    cancellationToken: CancellationToken.None).ConfigureAwait(false);
 0183                if (isSessionReceiver)
 184                {
 185                    // Refresh the session lock value in case we have reconnected a link.
 186                    // SessionId need not be refreshed here as we do not allow reconnecting
 187                    // a receiver instance to a different session.
 0188                    SessionLockedUntil = link.Settings.Properties.TryGetValue<long>(
 0189                        AmqpClientConstants.LockedUntilUtc, out var lockedUntilUtcTicks) ?
 0190                        new DateTime(lockedUntilUtcTicks, DateTimeKind.Utc)
 0191                        : DateTime.MinValue;
 192                }
 0193                ServiceBusEventSource.Log.CreateReceiveLinkComplete(_identifier);
 0194                link.Closed += OnReceiverLinkClosed;
 0195                return link;
 196            }
 36197            catch (Exception ex)
 198            {
 36199                ServiceBusEventSource.Log.CreateReceiveLinkException(_identifier, ex.ToString());
 36200                throw;
 201            }
 0202        }
 203
 204        private static void CloseLink(ReceivingAmqpLink link)
 205        {
 0206            link.Session?.SafeClose();
 0207            link.SafeClose();
 0208        }
 209
 210        private static void CloseLink(RequestResponseAmqpLink link)
 211        {
 0212            link.Session?.SafeClose();
 0213            link.SafeClose();
 0214        }
 215
 216        /// <summary>
 217        /// Receives a list of <see cref="ServiceBusReceivedMessage" /> from the entity using <see cref="ReceiveMode"/> 
 218        /// </summary>
 219        ///
 220        /// <param name="maxMessages">The maximum number of messages that will be received.</param>
 221        /// <param name="maxWaitTime">An optional <see cref="TimeSpan"/> specifying the maximum time to wait for the fir
 222        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 223        ///
 224        /// <returns>List of messages received. Returns an empty list if no message is found.</returns>
 225        public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsync(
 226            int maxMessages,
 227            TimeSpan? maxWaitTime,
 228            CancellationToken cancellationToken)
 229        {
 20230            IReadOnlyList<ServiceBusReceivedMessage> messages = null;
 20231            await _retryPolicy.RunOperation(async (timeout) =>
 20232            {
 56233                messages = await ReceiveMessagesAsyncInternal(
 56234                    maxMessages,
 56235                    maxWaitTime,
 56236                    timeout,
 56237                    cancellationToken).ConfigureAwait(false);
 0238            },
 20239            _connectionScope,
 20240            cancellationToken).ConfigureAwait(false);
 241
 0242            return messages;
 0243        }
 244
 245        /// <summary>
 246        /// Receives a list of <see cref="ServiceBusMessage" /> from the Service Bus entity.
 247        /// </summary>
 248        ///
 249        /// <param name="maxMessages">The maximum number of messages to receive.</param>
 250        /// <param name="maxWaitTime">An optional <see cref="TimeSpan"/> specifying the maximum time to wait for the fir
 251        /// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.</param>
 252        /// <param name="timeout">The per-try timeout specified in the RetryOptions.</param>
 253        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 254        ///
 255        /// <returns>The list of <see cref="ServiceBusMessage" /> from the Service Bus entity this receiver is associate
 256        ///
 257        private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyncInternal(
 258            int maxMessages,
 259            TimeSpan? maxWaitTime,
 260            TimeSpan timeout,
 261            CancellationToken cancellationToken)
 262        {
 36263            var link = default(ReceivingAmqpLink);
 36264            var amqpMessages = default(IEnumerable<AmqpMessage>);
 36265            var receivedMessages = new List<ServiceBusReceivedMessage>();
 266            try
 267            {
 36268                if (!_receiveLink.TryGetOpenedObject(out link))
 269                {
 36270                    link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).Con
 271                }
 0272                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 273
 0274                var messagesReceived = await Task.Factory.FromAsync
 0275                (
 0276                    (callback, state) => link.BeginReceiveRemoteMessages(
 0277                        maxMessages,
 0278                        TimeSpan.FromMilliseconds(20),
 0279                        maxWaitTime ?? timeout,
 0280                        callback,
 0281                        state),
 0282                    (asyncResult) => link.EndReceiveMessages(asyncResult, out amqpMessages),
 0283                    TaskCreationOptions.RunContinuationsAsynchronously
 0284                ).ConfigureAwait(false);
 285
 0286                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 287                // If event messages were received, then package them for consumption and
 288                // return them.
 289
 0290                if ((messagesReceived) && (amqpMessages != null))
 291                {
 0292                    foreach (AmqpMessage message in amqpMessages)
 293                    {
 0294                        if (_receiveMode == ReceiveMode.ReceiveAndDelete)
 295                        {
 0296                            link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
 297                        }
 0298                        receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message));
 0299                        message.Dispose();
 300                    }
 301                }
 302
 0303                return receivedMessages;
 304            }
 305            catch (Exception exception)
 306            {
 36307                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
 36308                    exception,
 36309                    link?.GetTrackingId(),
 36310                    null,
 36311                    !cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link)))
 36312                .Throw();
 313
 0314                throw; // will never be reached
 315            }
 0316        }
 317
 318        /// <summary>
 319        /// Completes a <see cref="ServiceBusReceivedMessage"/>. This will delete the message from the service.
 320        /// </summary>
 321        ///
 322        /// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
 323        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 324        ///
 325        /// <remarks>
 326        /// This operation can only be performed on a message that was received by this receiver
 327        /// when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>.
 328        /// </remarks>
 329        ///
 330        /// <returns>A task to be resolved on when the operation has completed.</returns>
 331        public override async Task CompleteAsync(
 332            string lockToken,
 333            CancellationToken cancellationToken = default) =>
 0334            await _retryPolicy.RunOperation(
 0335                async (timeout) =>
 0336                await CompleteInternalAsync(
 0337                    lockToken,
 0338                    timeout).ConfigureAwait(false),
 0339                _connectionScope,
 0340                cancellationToken).ConfigureAwait(false);
 341
 342        /// <summary>
 343        /// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the mess
 344        /// </summary>
 345        ///
 346        /// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
 347        /// <param name="timeout"></param>
 348        private async Task CompleteInternalAsync(
 349            string lockToken,
 350            TimeSpan timeout)
 351        {
 0352            Guid lockTokenGuid = new Guid(lockToken);
 0353            var lockTokenGuids = new[] { lockTokenGuid };
 0354            if (_requestResponseLockedMessages.Contains(lockTokenGuid))
 355            {
 0356                await DisposeMessageRequestResponseAsync(
 0357                    lockTokenGuids,
 0358                    timeout,
 0359                    DispositionStatus.Completed,
 0360                    SessionId).ConfigureAwait(false);
 0361                return;
 362            }
 0363            await DisposeMessagesAsync(lockTokenGuids, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
 0364        }
 365
 366        /// <summary>
 367        /// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the mess
 368        /// </summary>
 369        ///
 370        /// <param name="lockTokens">An <see cref="IEnumerable{T}"/> containing the lock tokens of the corresponding mes
 371        /// <param name="outcome"></param>
 372        /// <param name="timeout"></param>
 373        private async Task DisposeMessagesAsync(
 374            IEnumerable<Guid> lockTokens,
 375            Outcome outcome,
 376            TimeSpan timeout)
 377        {
 0378            List<ArraySegment<byte>> deliveryTags = ConvertLockTokensToDeliveryTags(lockTokens);
 379
 0380            ReceivingAmqpLink receiveLink = null;
 381            try
 382            {
 0383                ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
 0384                Transaction ambientTransaction = Transaction.Current;
 0385                if (ambientTransaction != null)
 386                {
 0387                    transactionId = await AmqpTransactionManager.Instance.EnlistAsync(
 0388                        ambientTransaction,
 0389                        _connectionScope,
 0390                        timeout).ConfigureAwait(false);
 391                }
 392
 0393                if (!_receiveLink.TryGetOpenedObject(out receiveLink))
 394                {
 0395                    receiveLink = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false);
 396                }
 397
 0398                var disposeMessageTasks = new Task<Outcome>[deliveryTags.Count];
 0399                var i = 0;
 0400                foreach (ArraySegment<byte> deliveryTag in deliveryTags)
 401                {
 402
 0403                    disposeMessageTasks[i++] = receiveLink.DisposeMessageAsync(deliveryTag, transactionId, outcome, true
 404                }
 405
 0406                Outcome[] outcomes = await Task.WhenAll(disposeMessageTasks).ConfigureAwait(false);
 0407                Error error = null;
 0408                foreach (Outcome item in outcomes)
 409                {
 0410                    Outcome disposedOutcome = item.DescriptorCode == Rejected.Code && ((error = ((Rejected)item).Error) 
 0411                    if (disposedOutcome != null)
 412                    {
 0413                        if (error.Condition.Equals(AmqpErrorCode.NotFound))
 414                        {
 0415                            ThrowLockLostException();
 416                        }
 417
 0418                        throw error.ToMessagingContractException();
 419                    }
 420                }
 0421            }
 0422            catch (Exception exception)
 423            {
 0424                if (exception is OperationCanceledException &&
 0425                    receiveLink != null && receiveLink.State != AmqpObjectState.Opened)
 426                {
 427                    // The link state is lost, We need to return a non-retriable error.
 0428                    ServiceBusEventSource.Log.LinkStateLost(
 0429                        _identifier,
 0430                        receiveLink.Name,
 0431                        receiveLink.State.ToString(),
 0432                        _isSessionReceiver,
 0433                        exception.ToString());
 0434                    ThrowLockLostException();
 435                }
 436
 0437                throw;
 438            }
 0439        }
 440
 441        private void ThrowLockLostException()
 442        {
 0443            if (_isSessionReceiver)
 444            {
 0445                throw new ServiceBusException(
 0446                    Resources.SessionLockExpiredOnMessageSession,
 0447                    ServiceBusFailureReason.SessionLockLost);
 448            }
 0449            throw new ServiceBusException(
 0450                Resources.MessageLockLost,
 0451                ServiceBusFailureReason.MessageLockLost);
 452        }
 453
 454        /// <summary> Indicates that the receiver wants to defer the processing for the message.</summary>
 455        ///
 456        /// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to defer.</param>
 457        /// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param
 458        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 459        ///
 460        /// <remarks>
 461        /// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockToken"/>,
 462        /// only when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>.
 463        /// In order to receive this message again in the future, you will need to save
 464        /// the <see cref="ServiceBusReceivedMessage.SequenceNumber"/>
 465        /// and receive it using <see cref="ReceiveDeferredMessagesAsync"/>.
 466        /// Deferring messages does not impact message's expiration, meaning that deferred messages can still expire.
 467        /// This operation can only be performed on messages that were received by this receiver.
 468        /// </remarks>
 469        ///
 470        /// <returns>A task to be resolved on when the operation has completed.</returns>
 471        public override async Task DeferAsync(
 472            string lockToken,
 473            IDictionary<string, object> propertiesToModify = null,
 474            CancellationToken cancellationToken = default) =>
 0475            await _retryPolicy.RunOperation(
 0476                async (timeout) => await DeferInternalAsync(
 0477                    lockToken,
 0478                    timeout,
 0479                    propertiesToModify).ConfigureAwait(false),
 0480                _connectionScope,
 0481                cancellationToken).ConfigureAwait(false);
 482
 483        /// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
 484        ///
 485        /// <param name="lockToken">The lock token of the <see cref="ServiceBusMessage" />.</param>
 486        /// <param name="timeout"></param>
 487        /// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param
 488        ///
 489        private Task DeferInternalAsync(
 490            string lockToken,
 491            TimeSpan timeout,
 492            IDictionary<string, object> propertiesToModify = null)
 493        {
 0494            Guid lockTokenGuid = new Guid(lockToken);
 0495            var lockTokenGuids = new[] { lockTokenGuid };
 0496            if (_requestResponseLockedMessages.Contains(lockTokenGuid))
 497            {
 0498                return DisposeMessageRequestResponseAsync(
 0499                    lockTokenGuids,
 0500                    timeout,
 0501                    DispositionStatus.Defered,
 0502                    SessionId,
 0503                    propertiesToModify);
 504            }
 0505            return DisposeMessagesAsync(lockTokenGuids, GetDeferOutcome(propertiesToModify), timeout);
 506        }
 507
 508        /// <summary>
 509        /// Abandons a <see cref="ServiceBusReceivedMessage"/>. This will make the message available again for processin
 510        /// </summary>
 511        ///
 512        /// <param name="lockToken">The lock token of the <see cref="ServiceBusReceivedMessage"/> to abandon.</param>
 513        /// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</para
 514        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 515        ///
 516        /// <remarks>
 517        /// Abandoning a message will increase the delivery count on the message.
 518        /// This operation can only be performed on messages that were received by this receiver
 519        /// when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>.
 520        /// </remarks>
 521        ///
 522        /// <returns>A task to be resolved on when the operation has completed.</returns>
 523        public override async Task AbandonAsync(
 524            string lockToken,
 525            IDictionary<string, object> propertiesToModify = null,
 526            CancellationToken cancellationToken = default) =>
 0527            await _retryPolicy.RunOperation(
 0528                async (timeout) => await AbandonInternalAsync(
 0529                    lockToken,
 0530                    timeout,
 0531                    propertiesToModify).ConfigureAwait(false),
 0532                _connectionScope,
 0533                cancellationToken).ConfigureAwait(false);
 534
 535        /// <summary>
 536        /// Abandons a <see cref="ServiceBusMessage"/> using a lock token. This will make the message available again fo
 537        /// </summary>
 538        ///
 539        /// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
 540        /// <param name="timeout"></param>
 541        /// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</para
 542        private Task AbandonInternalAsync(
 543            string lockToken,
 544            TimeSpan timeout,
 545            IDictionary<string, object> propertiesToModify = null)
 546        {
 0547            Guid lockTokenGuid = new Guid(lockToken);
 0548            var lockTokenGuids = new[] { lockTokenGuid };
 0549            if (_requestResponseLockedMessages.Contains(lockTokenGuid))
 550            {
 0551                return DisposeMessageRequestResponseAsync(
 0552                    lockTokenGuids,
 0553                    timeout,
 0554                    DispositionStatus.Abandoned,
 0555                    SessionId,
 0556                    propertiesToModify);
 557            }
 0558            return DisposeMessagesAsync(lockTokenGuids, GetAbandonOutcome(propertiesToModify), timeout);
 559        }
 560
 561        /// <summary>
 562        /// Moves a message to the deadletter sub-queue.
 563        /// </summary>
 564        ///
 565        /// <param name="lockToken">The lock token of the <see cref="ServiceBusReceivedMessage"/> to deadletter.</param>
 566        /// <param name="deadLetterReason">The reason for deadlettering the message.</param>
 567        /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
 568        /// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
 569        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 570        ///
 571        /// <remarks>
 572        /// In order to receive a message from the deadletter queue, you will need a new
 573        /// <see cref="ServiceBusReceiver"/> with the corresponding path.
 574        /// You can use EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
 575        /// This operation can only be performed on messages that were received by this receiver
 576        /// when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>.
 577        /// </remarks>
 578        ///
 579        /// <returns>A task to be resolved on when the operation has completed.</returns>
 580        public override async Task DeadLetterAsync(
 581            string lockToken,
 582            string deadLetterReason,
 583            string deadLetterErrorDescription = default,
 584            IDictionary<string, object> propertiesToModify = default,
 585            CancellationToken cancellationToken = default) =>
 0586            await _retryPolicy.RunOperation(
 0587                async (timeout) => await DeadLetterInternalAsync(
 0588                    lockToken,
 0589                    timeout,
 0590                    propertiesToModify,
 0591                    deadLetterReason,
 0592                    deadLetterErrorDescription).ConfigureAwait(false),
 0593                _connectionScope,
 0594                cancellationToken).ConfigureAwait(false);
 595
 596        /// <summary>
 597        /// Moves a message to the deadletter sub-queue.
 598        /// </summary>
 599        ///
 600        /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
 601        /// <param name="timeout"></param>
 602        /// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
 603        /// <param name="deadLetterReason">The reason for deadlettering the message.</param>
 604        /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
 605        internal virtual Task DeadLetterInternalAsync(
 606            string lockToken,
 607            TimeSpan timeout,
 608            IDictionary<string, object> propertiesToModify,
 609            string deadLetterReason,
 610            string deadLetterErrorDescription)
 611        {
 0612            Argument.AssertNotTooLong(deadLetterReason, Constants.MaxDeadLetterReasonLength, nameof(deadLetterReason));
 0613            Argument.AssertNotTooLong(deadLetterErrorDescription, Constants.MaxDeadLetterReasonLength, nameof(deadLetter
 614
 0615            Guid lockTokenGuid = new Guid(lockToken);
 0616            var lockTokenGuids = new[] { lockTokenGuid };
 0617            if (_requestResponseLockedMessages.Contains(lockTokenGuid))
 618            {
 0619                return DisposeMessageRequestResponseAsync(
 0620                    lockTokenGuids,
 0621                    timeout,
 0622                    DispositionStatus.Suspended,
 0623                    SessionId,
 0624                    propertiesToModify,
 0625                    deadLetterReason,
 0626                    deadLetterErrorDescription);
 627            }
 628
 0629            return DisposeMessagesAsync(
 0630                lockTokenGuids,
 0631                GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription),
 0632                timeout);
 633        }
 634
 635        private Rejected GetRejectedOutcome(
 636            IDictionary<string, object> propertiesToModify,
 637            string deadLetterReason,
 638            string deadLetterErrorDescription)
 639        {
 0640            Rejected rejected = AmqpConstants.RejectedOutcome;
 0641            if (deadLetterReason != null || deadLetterErrorDescription != null || propertiesToModify != null)
 642            {
 0643                rejected = new Rejected { Error = new Error { Condition = AmqpClientConstants.DeadLetterName, Info = new
 0644                if (deadLetterReason != null)
 645                {
 0646                    rejected.Error.Info.Add(ServiceBusReceivedMessage.DeadLetterReasonHeader, deadLetterReason);
 647                }
 648
 0649                if (deadLetterErrorDescription != null)
 650                {
 0651                    rejected.Error.Info.Add(ServiceBusReceivedMessage.DeadLetterErrorDescriptionHeader, deadLetterErrorD
 652                }
 653
 0654                if (propertiesToModify != null)
 655                {
 0656                    foreach (KeyValuePair<string, object> pair in propertiesToModify)
 657                    {
 0658                        if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProper
 659                        {
 0660                            rejected.Error.Info.Add(pair.Key, amqpObject);
 661                        }
 662                        else
 663                        {
 0664                            throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.
 665                        }
 666                    }
 667                }
 668            }
 669
 0670            return rejected;
 671        }
 672
 673        /// <summary>
 674        /// Updates the disposition status of deferred messages.
 675        /// </summary>
 676        ///
 677        /// <param name="lockTokens">Message lock tokens to update disposition status.</param>
 678        /// <param name="timeout"></param>
 679        /// <param name="dispositionStatus"></param>
 680        /// <param name="sessionId"></param>
 681        /// <param name="propertiesToModify"></param>
 682        /// <param name="deadLetterReason"></param>
 683        /// <param name="deadLetterDescription"></param>
 684        private async Task DisposeMessageRequestResponseAsync(
 685            Guid[] lockTokens,
 686            TimeSpan timeout,
 687            DispositionStatus dispositionStatus,
 688            string sessionId = null,
 689            IDictionary<string, object> propertiesToModify = null,
 690            string deadLetterReason = null,
 691            string deadLetterDescription = null)
 692        {
 693            // Create an AmqpRequest Message to update disposition
 0694            var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.UpdateDispositionOp
 695
 0696            if (_receiveLink.TryGetOpenedObject(out ReceivingAmqpLink receiveLink))
 697            {
 0698                amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName]
 699            }
 700
 0701            amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens;
 0702            amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLo
 703
 0704            if (deadLetterReason != null)
 705            {
 0706                amqpRequestMessage.Map[ManagementConstants.Properties.DeadLetterReason] = deadLetterReason;
 707            }
 708
 0709            if (deadLetterDescription != null)
 710            {
 0711                amqpRequestMessage.Map[ManagementConstants.Properties.DeadLetterDescription] = deadLetterDescription;
 712            }
 713
 0714            if (propertiesToModify != null)
 715            {
 0716                var amqpPropertiesToModify = new AmqpMap();
 0717                foreach (KeyValuePair<string, object> pair in propertiesToModify)
 718                {
 0719                    if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, 
 720                    {
 0721                        amqpPropertiesToModify[new MapKey(pair.Key)] = amqpObject;
 722                    }
 723                    else
 724                    {
 0725                        throw new NotSupportedException(
 0726                            Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetType()));
 727                    }
 728                }
 729
 0730                if (amqpPropertiesToModify.Count > 0)
 731                {
 0732                    amqpRequestMessage.Map[ManagementConstants.Properties.PropertiesToModify] = amqpPropertiesToModify;
 733                }
 734            }
 735
 0736            if (!string.IsNullOrWhiteSpace(sessionId))
 737            {
 0738                amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = sessionId;
 739            }
 740
 0741            AmqpResponseMessage amqpResponseMessage = await ExecuteRequest(
 0742                timeout,
 0743                amqpRequestMessage).ConfigureAwait(false);
 744
 0745            if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
 746            {
 0747                throw amqpResponseMessage.ToMessagingContractException();
 748            }
 0749        }
 750
 751        private static Outcome GetAbandonOutcome(IDictionary<string, object> propertiesToModify) =>
 0752            GetModifiedOutcome(propertiesToModify, false);
 753
 754        private static Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify) =>
 0755            GetModifiedOutcome(propertiesToModify, true);
 756
 757        private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
 758        {
 0759            return lockTokens.Select(lockToken => new ArraySegment<byte>(lockToken.ToByteArray())).ToList();
 760        }
 761
 762        private static Outcome GetModifiedOutcome(
 763            IDictionary<string, object> propertiesToModify,
 764            bool undeliverableHere)
 765        {
 0766            Modified modified = new Modified();
 0767            if (undeliverableHere)
 768            {
 0769                modified.UndeliverableHere = true;
 770            }
 771
 0772            if (propertiesToModify != null)
 773            {
 0774                modified.MessageAnnotations = new Fields();
 0775                foreach (KeyValuePair<string, object> pair in propertiesToModify)
 776                {
 0777                    if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, 
 778                    {
 0779                        modified.MessageAnnotations.Add(pair.Key, amqpObject);
 780                    }
 781                    else
 782                    {
 0783                        throw new NotSupportedException(Resources.InvalidAmqpMessageProperty.FormatForUser(pair.Key.GetT
 784                    }
 785                }
 786            }
 787
 0788            return modified;
 789        }
 790
 791        /// <summary>
 792        /// Fetches a list of active messages without changing the state of the receiver or the message source.
 793        /// </summary>
 794        ///
 795        /// <param name="sequenceNumber">The sequence number from where to read the message.</param>
 796        /// <param name="messageCount">The maximum number of messages that will be fetched.</param>
 797        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 798        ///
 799        /// <remarks>
 800        /// The first call to <see cref="PeekMessagesAsync"/>(long, int, CancellationToken) fetches the first active mes
 801        /// Each subsequent call fetches the subsequent message in the entity.
 802        /// Unlike a received message, peeked message will not have lock token associated with it,
 803        /// and hence it cannot be Completed/Abandoned/Deferred/Deadlettered/Renewed.
 804        /// Also, unlike <see cref="ReceiveMessagesAsync(int, TimeSpan?, CancellationToken)"/>, this method will fetch e
 805        /// </remarks>
 806        /// <returns></returns>
 807        public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesAsync(
 808            long? sequenceNumber,
 809            int messageCount = 1,
 810            CancellationToken cancellationToken = default)
 811        {
 812
 0813            long seqNumber = sequenceNumber ?? LastPeekedSequenceNumber + 1;
 0814            IReadOnlyList<ServiceBusReceivedMessage> messages = null;
 815
 0816            await _retryPolicy.RunOperation(
 0817                async (timeout) =>
 0818                messages = await PeekMessagesInternalAsync(
 0819                    seqNumber,
 0820                    messageCount,
 0821                    timeout,
 0822                    cancellationToken)
 0823                .ConfigureAwait(false),
 0824                _connectionScope,
 0825                cancellationToken).ConfigureAwait(false);
 826
 0827            return messages;
 0828        }
 829
 830        /// <summary>
 831        ///
 832        /// </summary>
 833        /// <param name="sequenceNumber"></param>
 834        /// <param name="messageCount"></param>
 835        /// <param name="timeout"></param>
 836        /// <param name="cancellationToken"></param>
 837        /// <returns></returns>
 838        private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInternalAsync(
 839            long sequenceNumber,
 840            int messageCount,
 841            TimeSpan timeout,
 842            CancellationToken cancellationToken)
 843        {
 0844            var stopWatch = ValueStopwatch.StartNew();
 845
 0846            AmqpRequestMessage amqpRequestMessage = AmqpRequestMessage.CreateRequest(
 0847                    ManagementConstants.Operations.PeekMessageOperation,
 0848                    timeout,
 0849                    null);
 850
 0851            if (_receiveLink.TryGetOpenedObject(out ReceivingAmqpLink receiveLink))
 852            {
 0853                amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName]
 854            }
 855
 0856            amqpRequestMessage.Map[ManagementConstants.Properties.FromSequenceNumber] = sequenceNumber;
 0857            amqpRequestMessage.Map[ManagementConstants.Properties.MessageCount] = messageCount;
 858
 0859            if (!string.IsNullOrWhiteSpace(SessionId))
 860            {
 0861                amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = SessionId;
 862            }
 863
 0864            RequestResponseAmqpLink link = await _managementLink.GetOrCreateAsync(
 0865                UseMinimum(_connectionScope.SessionTimeout,
 0866                timeout.CalculateRemaining(stopWatch.GetElapsedTime())))
 0867                .ConfigureAwait(false);
 0868            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 869
 0870            using AmqpMessage responseAmqpMessage = await link.RequestAsync(
 0871                amqpRequestMessage.AmqpMessage,
 0872                timeout.CalculateRemaining(stopWatch.GetElapsedTime()))
 0873                .ConfigureAwait(false);
 874
 0875            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 876
 0877            AmqpResponseMessage amqpResponseMessage = AmqpResponseMessage.CreateResponse(responseAmqpMessage);
 878
 0879            var messages = new List<ServiceBusReceivedMessage>();
 0880            if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 881            {
 0882                ServiceBusReceivedMessage message = null;
 0883                IEnumerable<AmqpMap> messageList = amqpResponseMessage.GetListValue<AmqpMap>(ManagementConstants.Propert
 0884                foreach (AmqpMap entry in messageList)
 885                {
 0886                    var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
 0887                    var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true)
 0888                    message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage, true);
 0889                    messages.Add(message);
 890                }
 891
 0892                if (message != null)
 893                {
 0894                    LastPeekedSequenceNumber = message.SequenceNumber;
 895                }
 0896                return messages;
 897            }
 898
 0899            if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.NoContent ||
 0900                (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.NotFound && Equals(AmqpClientConstants.Message
 901            {
 0902                return messages;
 903            }
 904
 0905            throw amqpResponseMessage.ToMessagingContractException();
 0906        }
 907
 908        /// <summary>
 909        /// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue.
 910        /// </summary>
 911        /// <returns>New lock token expiry date and time in UTC format.</returns>
 912        ///
 913        /// <param name="lockToken">Lock token associated with the message.</param>
 914        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 915        public override async Task<DateTimeOffset> RenewMessageLockAsync(
 916            string lockToken,
 917            CancellationToken cancellationToken)
 918        {
 0919            DateTimeOffset lockedUntil = DateTimeOffset.MinValue;
 0920            await _retryPolicy.RunOperation(
 0921                async (timeout) =>
 0922                {
 0923                    lockedUntil = await RenewMessageLockInternalAsync(
 0924                        lockToken,
 0925                        timeout).ConfigureAwait(false);
 0926                },
 0927                _connectionScope,
 0928                cancellationToken).ConfigureAwait(false);
 0929            return lockedUntil;
 0930        }
 931
 932        /// <summary>
 933        /// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue.
 934        /// </summary>
 935        ///
 936        /// <returns>New lock token expiry date and time in UTC format.</returns>
 937        ///
 938        /// <param name="lockToken">Lock token associated with the message.</param>
 939        /// <param name="timeout"></param>
 940        private async Task<DateTimeOffset> RenewMessageLockInternalAsync(
 941            string lockToken,
 942            TimeSpan timeout)
 943        {
 944            DateTimeOffset lockedUntil;
 945
 946            // Create an AmqpRequest Message to renew  lock
 0947            var amqpRequestMessage = AmqpRequestMessage.CreateRequest(
 0948                ManagementConstants.Operations.RenewLockOperation,
 0949                timeout,
 0950                null);
 951
 0952            if (_receiveLink.TryGetOpenedObject(out ReceivingAmqpLink receiveLink))
 953            {
 0954                amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName]
 955            }
 0956            amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new[] { new Guid(lockToken) };
 957
 0958            AmqpResponseMessage amqpResponseMessage = await ExecuteRequest(
 0959                timeout,
 0960                amqpRequestMessage).ConfigureAwait(false);
 961
 0962            if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 963            {
 0964                DateTime[] lockedUntilUtcTimes = amqpResponseMessage.GetValue<DateTime[]>(ManagementConstants.Properties
 0965                lockedUntil = lockedUntilUtcTimes[0];
 966            }
 967            else
 968            {
 0969                throw amqpResponseMessage.ToMessagingContractException();
 970            }
 971
 0972            return lockedUntil;
 0973        }
 974
 975        private async Task<AmqpResponseMessage> ExecuteRequest(TimeSpan timeout, AmqpRequestMessage amqpRequestMessage)
 976        {
 0977            AmqpResponseMessage amqpResponseMessage = await ManagementUtilities.ExecuteRequestResponseAsync(
 0978                _connectionScope,
 0979                _managementLink,
 0980                amqpRequestMessage,
 0981                timeout).ConfigureAwait(false);
 0982            return amqpResponseMessage;
 0983        }
 984
 985        /// <summary>
 986        /// Renews the lock on the session specified by the <see cref="SessionId"/>. The lock will be renewed based on t
 987        /// </summary>
 988        public override async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
 989        {
 990            DateTimeOffset lockedUntil;
 0991            await _retryPolicy.RunOperation(
 0992                async (timeout) =>
 0993                {
 0994                    lockedUntil = await RenewSessionLockInternal(
 0995                        timeout).ConfigureAwait(false);
 0996                },
 0997                _connectionScope,
 0998                cancellationToken).ConfigureAwait(false);
 0999            SessionLockedUntil = lockedUntil;
 01000        }
 1001
 1002        internal async Task<DateTimeOffset> RenewSessionLockInternal(TimeSpan timeout)
 1003        {
 1004            // Create an AmqpRequest Message to renew  lock
 01005            var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.RenewSessionLockOpe
 1006
 01007            if (_receiveLink.TryGetOpenedObject(out var receiveLink))
 1008            {
 01009                amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName]
 1010            }
 1011
 01012            amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = SessionId;
 1013
 01014            AmqpResponseMessage amqpResponseMessage = await ExecuteRequest(
 01015                timeout,
 01016                amqpRequestMessage).ConfigureAwait(false);
 1017
 1018            DateTimeOffset lockedUntil;
 01019            if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 1020            {
 01021                lockedUntil = amqpResponseMessage.GetValue<DateTime>(ManagementConstants.Properties.Expiration);
 1022            }
 1023            else
 1024            {
 01025                throw amqpResponseMessage.ToMessagingContractException();
 1026            }
 01027            return lockedUntil;
 01028        }
 1029
 1030        /// <summary>
 1031        /// Gets the session state.
 1032        /// </summary>
 1033        ///
 1034        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 1035        ///
 1036        /// <returns>The session state as byte array.</returns>
 1037        public override async Task<byte[]> GetStateAsync(CancellationToken cancellationToken = default)
 1038        {
 01039            byte[] sessionState = null;
 01040            await _retryPolicy.RunOperation(
 01041                async (timeout) =>
 01042                {
 01043                    sessionState = await GetStateInternal(timeout).ConfigureAwait(false);
 01044                },
 01045                _connectionScope,
 01046                cancellationToken).ConfigureAwait(false);
 01047            return sessionState;
 01048        }
 1049
 1050        internal async Task<byte[]> GetStateInternal(TimeSpan timeout)
 1051        {
 01052            var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.GetSessionStateOper
 1053
 01054            if (_receiveLink.TryGetOpenedObject(out var receiveLink))
 1055            {
 01056                amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName]
 1057            }
 1058
 01059            amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = SessionId;
 1060
 01061            var amqpResponseMessage = await ExecuteRequest(timeout, amqpRequestMessage).ConfigureAwait(false);
 1062
 01063            byte[] sessionState = null;
 01064            if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 1065            {
 01066                if (amqpResponseMessage.Map[ManagementConstants.Properties.SessionState] != null)
 1067                {
 01068                    sessionState = amqpResponseMessage.GetValue<ArraySegment<byte>>(ManagementConstants.Properties.Sessi
 1069                }
 1070            }
 1071            else
 1072            {
 01073                throw amqpResponseMessage.ToMessagingContractException();
 1074            }
 1075
 01076            return sessionState;
 01077        }
 1078
 1079        /// <summary>
 1080        /// Set a custom state on the session which can be later retrieved using <see cref="GetStateAsync"/>
 1081        /// </summary>
 1082        ///
 1083        /// <param name="sessionState">A byte array of session state</param>
 1084        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 1085        ///
 1086        /// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
 1087        ///
 1088        /// <returns>A task to be resolved on when the operation has completed.</returns>
 1089        public override async Task SetStateAsync(
 1090            byte[] sessionState,
 1091            CancellationToken cancellationToken)
 1092        {
 01093            await _retryPolicy.RunOperation(
 01094                async (timeout) =>
 01095                {
 01096                    await SetStateInternal(
 01097                        sessionState,
 01098                        timeout).ConfigureAwait(false);
 01099                },
 01100                _connectionScope,
 01101                cancellationToken).ConfigureAwait(false);
 01102        }
 1103
 1104        internal async Task SetStateInternal(
 1105            byte[] sessionState,
 1106            TimeSpan timeout)
 1107        {
 01108            var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.SetSessionStateOper
 1109
 01110            if (_receiveLink.TryGetOpenedObject(out var receiveLink))
 1111            {
 01112                amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName]
 1113            }
 1114
 01115            amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = SessionId;
 1116
 01117            if (sessionState != null)
 1118            {
 01119                var value = new ArraySegment<byte>(sessionState);
 01120                amqpRequestMessage.Map[ManagementConstants.Properties.SessionState] = value;
 1121            }
 1122            else
 1123            {
 01124                amqpRequestMessage.Map[ManagementConstants.Properties.SessionState] = null;
 1125            }
 1126
 01127            var amqpResponseMessage = await ExecuteRequest(timeout, amqpRequestMessage).ConfigureAwait(false);
 01128            if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
 1129            {
 01130                throw amqpResponseMessage.ToMessagingContractException();
 1131            }
 01132        }
 1133
 1134        /// <summary>
 1135        /// Receives a <see cref="IList{Message}"/> of deferred messages identified by <paramref name="sequenceNumbers"/
 1136        /// </summary>
 1137        /// <param name="sequenceNumbers">A <see cref="IList{SequenceNumber}"/> containing the sequence numbers to recei
 1138        /// <param name="cancellationToken"></param>
 1139        /// <returns>Messages identified by sequence number are returned. Returns null if no messages are found.
 1140        /// Throws if the messages have not been deferred.</returns>
 1141        /// <seealso cref="DeferAsync"/>
 1142        public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync(
 1143            IList<long> sequenceNumbers,
 1144            CancellationToken cancellationToken = default)
 1145        {
 01146            IReadOnlyList<ServiceBusReceivedMessage> messages = null;
 01147            await _retryPolicy.RunOperation(
 01148                async (timeout) => messages = await ReceiveDeferredMessagesAsyncInternal(
 01149                    sequenceNumbers.ToArray(),
 01150                    timeout).ConfigureAwait(false),
 01151                _connectionScope,
 01152                cancellationToken).ConfigureAwait(false);
 01153            return messages;
 01154        }
 1155
 1156        internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsyncInternal(
 1157            long[] sequenceNumbers,
 1158            TimeSpan timeout)
 1159        {
 01160            var messages = new List<ServiceBusReceivedMessage>();
 1161            try
 1162            {
 01163                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.ReceiveBySequen
 1164
 01165                if (_receiveLink.TryGetOpenedObject(out var receiveLink))
 1166                {
 01167                    amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkN
 1168                }
 01169                amqpRequestMessage.Map[ManagementConstants.Properties.SequenceNumbers] = sequenceNumbers;
 01170                amqpRequestMessage.Map[ManagementConstants.Properties.ReceiverSettleMode] = (uint)(_receiveMode == Recei
 01171                if (!string.IsNullOrWhiteSpace(SessionId))
 1172                {
 01173                    amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = SessionId;
 1174                }
 1175
 01176                var response = await ExecuteRequest(
 01177                    timeout,
 01178                    amqpRequestMessage).ConfigureAwait(false);
 1179
 01180                if (response.StatusCode == AmqpResponseStatusCode.OK)
 1181                {
 01182                    var amqpMapList = response.GetListValue<AmqpMap>(ManagementConstants.Properties.Messages);
 01183                    foreach (var entry in amqpMapList)
 1184                    {
 01185                        var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
 01186                        var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), t
 01187                        var message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage);
 01188                        if (entry.TryGetValue<Guid>(ManagementConstants.Properties.LockToken, out var lockToken))
 1189                        {
 01190                            message.LockTokenGuid = lockToken;
 01191                            _requestResponseLockedMessages.AddOrUpdate(lockToken, message.LockedUntil);
 1192                        }
 1193
 01194                        messages.Add(message);
 1195                    }
 1196                }
 1197                else
 1198                {
 01199                    throw response.ToMessagingContractException();
 1200                }
 01201            }
 1202            catch (Exception exception)
 1203            {
 01204                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(exception))
 01205                    .Throw();
 1206
 01207                throw; // will never be reached
 1208            }
 1209
 01210            return messages;
 01211        }
 1212
 1213        /// <summary>
 1214        /// Closes the connection to the transport receiver instance.
 1215        /// </summary>
 1216        ///
 1217        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 1218        public override async Task CloseAsync(CancellationToken cancellationToken)
 1219        {
 21220            if (_closed)
 1221            {
 01222                return;
 1223            }
 1224
 21225            _closed = true;
 1226
 21227            if (_receiveLink?.TryGetOpenedObject(out var _) == true)
 1228            {
 01229                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 01230                await _receiveLink.CloseAsync().ConfigureAwait(false);
 1231            }
 1232
 21233            if (_managementLink?.TryGetOpenedObject(out var _) == true)
 1234            {
 01235                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 01236                await _managementLink.CloseAsync().ConfigureAwait(false);
 1237            }
 1238
 21239            _receiveLink?.Dispose();
 21240            _managementLink?.Dispose();
 1241
 21242        }
 1243
 1244        private void OnReceiverLinkClosed(object receiver, EventArgs e) =>
 01245            ServiceBusEventSource.Log.ReceiveLinkClosed(
 01246                _identifier,
 01247                SessionId,
 01248                receiver);
 1249
 1250        private void OnManagementLinkClosed(object managementLink, EventArgs e) =>
 01251            ServiceBusEventSource.Log.ManagementLinkClosed(
 01252                _identifier,
 01253                managementLink);
 1254
 1255        /// <summary>
 1256        /// Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
 1257        /// </summary>
 1258        ///
 1259        /// <param name="firstOption">The first option to consider.</param>
 1260        /// <param name="secondOption">The second option to consider.</param>
 1261        ///
 1262        /// <returns>The smaller of the two specified intervals.</returns>
 1263        private static TimeSpan UseMinimum(
 1264            TimeSpan firstOption,
 1265            TimeSpan secondOption) =>
 361266            (firstOption < secondOption) ? firstOption : secondOption;
 1267
 1268        /// <summary>
 1269        /// Opens an AMQP link for use with receiver operations.
 1270        /// </summary>
 1271        ///
 1272        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 1273        ///
 1274        /// <returns>A task to be resolved on when the operation has completed.</returns>
 1275        public override async Task OpenLinkAsync(CancellationToken cancellationToken)
 1276        {
 01277            ReceivingAmqpLink link = null;
 01278            await _retryPolicy.RunOperation(
 01279               async (timeout) =>
 01280               link = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
 01281               _connectionScope,
 01282               cancellationToken).ConfigureAwait(false);
 1283
 01284            if (_isSessionReceiver)
 1285            {
 01286                var source = (Source)link.Settings.Source;
 01287                if (!source.FilterSet.TryGetValue<string>(AmqpClientConstants.SessionFilterName, out var tempSessionId))
 1288                {
 01289                    link.Session.SafeClose();
 01290                    throw new ServiceBusException(true, Resources.SessionFilterMissing);
 1291                }
 1292
 01293                if (string.IsNullOrWhiteSpace(tempSessionId))
 1294                {
 01295                    link.Session.SafeClose();
 01296                    throw new ServiceBusException(true, Resources.AmqpFieldSessionId);
 1297                }
 01298                SessionId = tempSessionId;
 1299            }
 01300        }
 1301
 1302        private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>
 361303            !_closed && (link?.IsClosing() ?? false);
 1304    }
 1305}