< Summary

Class:Azure.Messaging.ServiceBus.ReceiverManager
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Processor\ReceiverManager.cs
Covered lines:85
Uncovered lines:96
Coverable lines:181
Total lines:384
Line coverage:46.9% (85 of 181)
Covered branches:22
Total branches:62
Branch coverage:35.4% (22 of 62)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Receiver()-100%100%
get_AutoRenewLock()-100%100%
.ctor(...)-100%100%
CloseReceiverIfNeeded()-100%100%
ReceiveAndProcessMessagesAsync()-54.17%50%
ProcessOneMessageWithinScopeAsync()-75%100%
ProcessOneMessage()-30.19%53.33%
OnMessageHandler()-100%100%
RenewMessageLock()-0%0%
CancelTask()-42.86%50%
ThrowIfSessionLockLost(...)-0%0%
HandleRenewLockException()-0%0%
RaiseExceptionReceived()-0%100%
CalculateRenewDelay(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Processor\ReceiverManager.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections;
 6using System.Collections.Generic;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Core.Pipeline;
 10using Azure.Messaging.ServiceBus.Diagnostics;
 11using Azure.Messaging.ServiceBus.Plugins;
 12
 13namespace Azure.Messaging.ServiceBus
 14{
 15    /// <summary>
 16    /// Represents a single receiver instance that multiple threads spawned by the ServiceBusProcessor
 17    /// may be using to receive and process messages. The manager will delegate to the user provided
 18    /// callbacks and handle automatic locking of messages.
 19    /// </summary>
 20    internal class ReceiverManager
 21    {
 3222        protected virtual ServiceBusReceiver Receiver { get; set; }
 23        protected readonly ServiceBusConnection _connection;
 24        protected readonly string _fullyQualifiedNamespace;
 25        protected readonly string _entityPath;
 26        protected readonly string _identifier;
 27        protected readonly TimeSpan? _maxReceiveWaitTime;
 28        private readonly ServiceBusReceiverOptions _receiverOptions;
 29        protected readonly ServiceBusProcessorOptions _processorOptions;
 30        private readonly Func<ProcessErrorEventArgs, Task> _errorHandler;
 31        private readonly Func<ProcessMessageEventArgs, Task> _messageHandler;
 32        protected readonly EntityScopeFactory _scopeFactory;
 33        protected readonly IList<ServiceBusPlugin> _plugins;
 34
 435        protected bool AutoRenewLock => _processorOptions.MaxAutoLockRenewalDuration > TimeSpan.Zero;
 36
 437        public ReceiverManager(
 438            ServiceBusConnection connection,
 439            string fullyQualifiedNamespace,
 440            string entityPath,
 441            string identifier,
 442            ServiceBusProcessorOptions processorOptions,
 443            Func<ProcessMessageEventArgs, Task> messageHandler,
 444            Func<ProcessErrorEventArgs, Task> errorHandler,
 445            EntityScopeFactory scopeFactory,
 446            IList<ServiceBusPlugin> plugins)
 47        {
 448            _connection = connection;
 449            _fullyQualifiedNamespace = fullyQualifiedNamespace;
 450            _entityPath = entityPath;
 451            _processorOptions = processorOptions;
 452            _receiverOptions = new ServiceBusReceiverOptions
 453            {
 454                ReceiveMode = _processorOptions.ReceiveMode,
 455                PrefetchCount = _processorOptions.PrefetchCount
 456            };
 457            _maxReceiveWaitTime = _processorOptions.MaxReceiveWaitTime;
 458            _identifier = identifier;
 459            _plugins = plugins;
 460            Receiver = new ServiceBusReceiver(
 461                connection: _connection,
 462                entityPath: _entityPath,
 463                isSessionEntity: false,
 464                plugins: _plugins,
 465                options: _receiverOptions);
 466            _errorHandler = errorHandler;
 467            _messageHandler = messageHandler;
 468            _scopeFactory = scopeFactory;
 469        }
 70
 71        public virtual async Task CloseReceiverIfNeeded(
 72            CancellationToken cancellationToken)
 73        {
 74            try
 75            {
 476                await Receiver.DisposeAsync().ConfigureAwait(false);
 477            }
 78            finally
 79            {
 480                Receiver = null;
 81            }
 482        }
 83
 84        public virtual async Task ReceiveAndProcessMessagesAsync(CancellationToken cancellationToken)
 85        {
 486            ServiceBusErrorSource errorSource = ServiceBusErrorSource.Receive;
 87            try
 88            {
 89                // loop within the context of this thread
 890                while (!cancellationToken.IsCancellationRequested)
 91                {
 492                    errorSource = ServiceBusErrorSource.Receive;
 493                    ServiceBusReceivedMessage message = await Receiver.ReceiveMessageAsync(
 494                        _maxReceiveWaitTime,
 495                        cancellationToken).ConfigureAwait(false);
 496                    if (message == null)
 97                    {
 98                        continue;
 99                    }
 4100                    await ProcessOneMessageWithinScopeAsync(
 4101                        message,
 4102                        DiagnosticProperty.ProcessMessageActivityName,
 4103                        cancellationToken).ConfigureAwait(false);
 104                }
 4105            }
 106            catch (Exception ex)
 107            // If the user manually throws a TCE, then we should log it.
 0108            when (!(ex is TaskCanceledException) ||
 0109                  !cancellationToken.IsCancellationRequested)
 110            {
 0111                if (ex is ServiceBusException sbException && sbException.ProcessorErrorSource.HasValue)
 112                {
 0113                    errorSource = sbException.ProcessorErrorSource.Value;
 114                }
 0115                await RaiseExceptionReceived(
 0116                    new ProcessErrorEventArgs(
 0117                        ex,
 0118                        errorSource,
 0119                        _fullyQualifiedNamespace,
 0120                        _entityPath))
 0121                    .ConfigureAwait(false);
 122            }
 4123        }
 124
 125        protected async Task ProcessOneMessageWithinScopeAsync(ServiceBusReceivedMessage message, string activityName, C
 126        {
 4127            using DiagnosticScope scope = _scopeFactory.CreateScope(activityName);
 4128            scope.Start();
 4129            scope.SetMessageData(new ServiceBusReceivedMessage[] { message });
 130            try
 131            {
 4132                await ProcessOneMessage(
 4133                    message,
 4134                    cancellationToken)
 4135                    .ConfigureAwait(false);
 4136            }
 0137            catch (Exception ex)
 138            {
 0139                scope.Failed(ex);
 0140                throw;
 141            }
 4142        }
 143
 144        /// <summary>
 145        ///
 146        /// </summary>
 147        /// <param name="message"></param>
 148        /// <param name="cancellationToken"></param>
 149        /// <returns></returns>
 150        private async Task ProcessOneMessage(
 151            ServiceBusReceivedMessage message,
 152            CancellationToken cancellationToken)
 153        {
 4154            ServiceBusErrorSource errorSource = ServiceBusErrorSource.Receive;
 4155            CancellationTokenSource renewLockCancellationTokenSource = null;
 4156            Task renewLock = null;
 157
 158            try
 159            {
 4160                if (!Receiver.IsSessionReceiver &&
 4161                    Receiver.ReceiveMode == ReceiveMode.PeekLock &&
 4162                    AutoRenewLock)
 163                {
 0164                    renewLockCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken
 0165                    renewLock = RenewMessageLock(
 0166                        message,
 0167                        renewLockCancellationTokenSource);
 168                }
 169
 4170                errorSource = ServiceBusErrorSource.UserCallback;
 171
 4172                await OnMessageHandler(message, cancellationToken).ConfigureAwait(false);
 173
 4174                if (Receiver.ReceiveMode == ReceiveMode.PeekLock &&
 4175                    _processorOptions.AutoComplete &&
 4176                    !message.IsSettled)
 177                {
 0178                    errorSource = ServiceBusErrorSource.Complete;
 179                    // don't pass the processor cancellation token
 180                    // as we want in flight autocompletion to be able
 181                    // to finish
 0182                    await Receiver.CompleteMessageAsync(
 0183                        message.LockToken,
 0184                        CancellationToken.None)
 0185                        .ConfigureAwait(false);
 186                }
 187
 4188                await CancelTask(renewLockCancellationTokenSource, renewLock).ConfigureAwait(false);
 4189            }
 190            catch (Exception ex)
 191            // This prevents exceptions relating to processing a message from bubbling up all
 192            // the way to the main thread when calling StopProcessingAsync, which we don't want
 193            // as it isn't actionable.
 0194            when (!(ex is TaskCanceledException) || !cancellationToken.IsCancellationRequested)
 195            {
 0196                ThrowIfSessionLockLost(ex, errorSource);
 0197                await RaiseExceptionReceived(
 0198                    new ProcessErrorEventArgs(
 0199                        ex,
 0200                        errorSource,
 0201                        _fullyQualifiedNamespace,
 0202                        _entityPath))
 0203                    .ConfigureAwait(false);
 204
 205                // if the user settled the message, or if the message or session lock was lost,
 206                // do not attempt to abandon the message
 0207                ServiceBusFailureReason? failureReason = (ex as ServiceBusException)?.Reason;
 0208                if (!message.IsSettled &&
 0209                    _receiverOptions.ReceiveMode == ReceiveMode.PeekLock &&
 0210                    failureReason != ServiceBusFailureReason.SessionLockLost &&
 0211                    failureReason != ServiceBusFailureReason.MessageLockLost)
 212                {
 213                    try
 214                    {
 215                        // don't pass the processor cancellation token
 216                        // as we want in flight abandon to be able
 217                        // to finish even if user stopped processing
 0218                        await Receiver.AbandonMessageAsync(
 0219                            message.LockToken,
 0220                            cancellationToken: CancellationToken.None)
 0221                            .ConfigureAwait(false);
 0222                    }
 0223                    catch (Exception exception)
 224                    {
 0225                        ThrowIfSessionLockLost(exception, ServiceBusErrorSource.Abandon);
 0226                        await RaiseExceptionReceived(
 0227                            new ProcessErrorEventArgs(
 0228                                exception,
 0229                                ServiceBusErrorSource.Abandon,
 0230                                _fullyQualifiedNamespace,
 0231                                _entityPath))
 0232                        .ConfigureAwait(false);
 233                    }
 234                }
 235            }
 236            finally
 237            {
 4238                renewLockCancellationTokenSource?.Cancel();
 4239                renewLockCancellationTokenSource?.Dispose();
 240            }
 4241        }
 242
 243        protected virtual async Task OnMessageHandler(ServiceBusReceivedMessage message, CancellationToken processorCanc
 244        {
 4245            var args = new ProcessMessageEventArgs(
 4246                message,
 4247                Receiver,
 4248                processorCancellationToken);
 4249            await _messageHandler(args).ConfigureAwait(false);
 4250        }
 251
 252        /// <summary>
 253        ///
 254        /// </summary>
 255        /// <param name="message"></param>
 256        /// <param name="cancellationTokenSource"></param>
 257        /// <returns></returns>
 258        private async Task RenewMessageLock(
 259            ServiceBusReceivedMessage message,
 260            CancellationTokenSource cancellationTokenSource)
 261        {
 0262            cancellationTokenSource.CancelAfter(_processorOptions.MaxAutoLockRenewalDuration);
 0263            CancellationToken cancellationToken = cancellationTokenSource.Token;
 0264            while (!cancellationToken.IsCancellationRequested)
 265            {
 266                try
 267                {
 0268                    ServiceBusEventSource.Log.ProcessorRenewMessageLockStart(_identifier, 1, message.LockToken);
 0269                    TimeSpan delay = CalculateRenewDelay(message.LockedUntil);
 270
 0271                    await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
 0272                    if (Receiver.IsDisposed)
 273                    {
 0274                        break;
 275                    }
 276
 0277                    await Receiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
 0278                    ServiceBusEventSource.Log.ProcessorRenewMessageLockComplete(_identifier);
 0279                }
 0280                catch (Exception ex) when (!(ex is TaskCanceledException))
 281                {
 0282                    ServiceBusEventSource.Log.ProcessorRenewMessageLockException(_identifier, ex.ToString());
 0283                    await HandleRenewLockException(ex, cancellationToken).ConfigureAwait(false);
 284
 285                    // if the error was not transient, break out of the loop
 0286                    if (!(ex as ServiceBusException)?.IsTransient == true)
 287                    {
 288                        break;
 289                    }
 290                }
 291            }
 0292        }
 293
 294        /// <summary>
 295        /// Cancels the specified cancellation source and awaits the specified task.
 296        /// </summary>
 297        /// <param name="cancellationSource">CancellationTokenSource to cancel</param>
 298        /// <param name="task">Associated task to await</param>
 299        protected static async Task CancelTask(
 300            CancellationTokenSource cancellationSource,
 301            Task task)
 302        {
 303            try
 304            {
 4305                if (cancellationSource != null)
 306                {
 0307                    cancellationSource.Cancel();
 0308                    await task.ConfigureAwait(false);
 309                }
 4310            }
 0311            catch (Exception ex) when (ex is TaskCanceledException)
 312            {
 313                // Nothing to do here.  These exceptions are expected.
 0314            }
 4315        }
 316
 317        private static void ThrowIfSessionLockLost(
 318            Exception exception,
 319            ServiceBusErrorSource errorSource)
 320        {
 321            // we need to propagate this in order to dispose the session receiver
 322            // in the same place where we are creating them.
 0323            var sbException = exception as ServiceBusException;
 0324            if (sbException?.Reason == ServiceBusFailureReason.SessionLockLost)
 325            {
 0326                sbException.ProcessorErrorSource = errorSource;
 0327                throw sbException;
 328            }
 0329        }
 330
 331        protected async Task HandleRenewLockException(Exception ex, CancellationToken cancellationToken)
 332        {
 333            // ObjectDisposedException should only happen here because the CancellationToken was disposed at which point
 334            // this renew exception is not relevant anymore. Lets not bother user with this exception.
 0335            if (!(ex is ObjectDisposedException) && !cancellationToken.IsCancellationRequested)
 336            {
 0337                await RaiseExceptionReceived(
 0338                    new ProcessErrorEventArgs(
 0339                        ex,
 0340                        ServiceBusErrorSource.RenewLock,
 0341                        _fullyQualifiedNamespace,
 0342                        _entityPath)).ConfigureAwait(false);
 343            }
 0344        }
 345
 346        /// <summary>
 347        ///
 348        /// </summary>
 349        /// <param name="eventArgs"></param>
 350        /// <returns></returns>
 351        protected async Task RaiseExceptionReceived(ProcessErrorEventArgs eventArgs)
 352        {
 353            try
 354            {
 0355                await _errorHandler(eventArgs).ConfigureAwait(false);
 0356            }
 0357            catch (Exception exception)
 358            {
 359                // don't bubble up exceptions raised from customer exception handler
 0360                ServiceBusEventSource.Log.ProcessorErrorHandlerThrewException(exception.ToString());
 0361            }
 0362        }
 363
 364        /// <summary>
 365        ///
 366        /// </summary>
 367        /// <param name="lockedUntil"></param>
 368        /// <returns></returns>
 369        protected static TimeSpan CalculateRenewDelay(DateTimeOffset lockedUntil)
 370        {
 0371            var remainingTime = lockedUntil - DateTimeOffset.UtcNow;
 372
 0373            if (remainingTime < TimeSpan.FromMilliseconds(400))
 374            {
 0375                return TimeSpan.Zero;
 376            }
 377
 0378            var buffer = TimeSpan.FromTicks(Math.Min(remainingTime.Ticks / 2, Constants.MaximumRenewBufferDuration.Ticks
 0379            var renewAfter = remainingTime - buffer;
 380
 0381            return renewAfter;
 382        }
 383    }
 384}