< Summary

Class:Azure.Messaging.ServiceBus.SessionReceiverManager
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Processor\SessionReceiverManager.cs
Covered lines:0
Uncovered lines:156
Coverable lines:156
Total lines:365
Line coverage:0% (0 of 156)
Covered branches:0
Total branches:68
Branch coverage:0% (0 of 68)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%100%
get_Receiver()-0%100%
EnsureCanProcess()-0%0%
WaitSemaphore()-0%100%
CreateAndInitializeSessionReceiver()-0%0%
CreateReceiver()-0%0%
CloseReceiverIfNeeded()-0%0%
CloseReceiver()-0%0%
ReceiveAndProcessMessagesAsync()-0%0%
RenewSessionLock()-0%0%
OnMessageHandler()-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Processor\SessionReceiverManager.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Threading;
 7using System.Threading.Tasks;
 8using Azure.Messaging.ServiceBus.Diagnostics;
 9using Azure.Messaging.ServiceBus.Plugins;
 10
 11namespace Azure.Messaging.ServiceBus
 12{
 13    /// <summary>
 14    /// Represents a thread-safe abstraction around a single session receiver that threads
 15    /// spawned by the ServiceBusProcessor use to receive and process messages. Depending on how the
 16    /// MaxConcurrentCalls and SessionIds options are configured, there may be multiple threads using the same
 17    /// SessionReceiverManager (i.e. if MaxConcurrentCalls is greater than the number of specified sessions).
 18    /// The manager will delegate to the user provided callbacks and handle automatic locking of sessions.
 19    /// The receiver instance will only be closed when no other threads are using it, or when the user has
 20    /// called StopProcessingAsync.
 21    /// </summary>
 22#pragma warning disable CA1001 // Types that own disposable fields should be disposable.
 23    // Doesn't own _concurrentAcceptSessionsSemaphore
 24    internal class SessionReceiverManager : ReceiverManager
 25#pragma warning restore CA1001 // Types that own disposable fields should be disposable
 26    {
 27        private int _threadCount = 0;
 28        private readonly Func<ProcessSessionEventArgs, Task> _sessionInitHandler;
 29        private readonly Func<ProcessSessionEventArgs, Task> _sessionCloseHandler;
 30        private readonly Func<ProcessSessionMessageEventArgs, Task> _messageHandler;
 31        private readonly SemaphoreSlim _concurrentAcceptSessionsSemaphore;
 32        private readonly int _maxCallsPerSession;
 33        private readonly ServiceBusSessionReceiverOptions _sessionReceiverOptions;
 34        private readonly bool _keepOpenOnReceiveTimeout;
 35        private ServiceBusSessionReceiver _receiver;
 36        private CancellationTokenSource _sessionLockRenewalCancellationSource;
 37        private Task _sessionLockRenewalTask;
 038        private CancellationTokenSource _sessionCancellationSource = new CancellationTokenSource();
 039        protected override ServiceBusReceiver Receiver => _receiver;
 40
 041        private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
 42
 43        public SessionReceiverManager(
 44            ServiceBusConnection connection,
 45            string fullyQualifiedNamespace,
 46            string entityPath,
 47            string identifier,
 48            string sessionId,
 49            ServiceBusProcessorOptions processorOptions,
 50            Func<ProcessSessionEventArgs, Task> sessionInitHandler,
 51            Func<ProcessSessionEventArgs, Task> sessionCloseHandler,
 52            Func<ProcessSessionMessageEventArgs, Task> messageHandler,
 53            Func<ProcessErrorEventArgs, Task> errorHandler,
 54            SemaphoreSlim concurrentAcceptSessionsSemaphore,
 55            EntityScopeFactory scopeFactory,
 56            IList<ServiceBusPlugin> plugins,
 57            int maxCallsPerSession,
 58            bool keepOpenOnReceiveTimeout)
 059            : base(connection, fullyQualifiedNamespace, entityPath, identifier, processorOptions, default, errorHandler,
 060                  scopeFactory, plugins)
 61        {
 062            _sessionInitHandler = sessionInitHandler;
 063            _sessionCloseHandler = sessionCloseHandler;
 064            _messageHandler = messageHandler;
 065            _concurrentAcceptSessionsSemaphore = concurrentAcceptSessionsSemaphore;
 066            _maxCallsPerSession = maxCallsPerSession;
 067            _sessionReceiverOptions = new ServiceBusSessionReceiverOptions
 068            {
 069                ReceiveMode = _processorOptions.ReceiveMode,
 070                PrefetchCount = _processorOptions.PrefetchCount,
 071                SessionId = sessionId
 072            };
 073            _keepOpenOnReceiveTimeout = keepOpenOnReceiveTimeout;
 074        }
 75
 76        private async Task<bool> EnsureCanProcess(CancellationToken cancellationToken)
 77        {
 078            bool releaseSemaphore = false;
 79            try
 80            {
 081                await WaitSemaphore(cancellationToken).ConfigureAwait(false);
 082                releaseSemaphore = true;
 083                if (_threadCount >= _maxCallsPerSession)
 84                {
 085                    return false;
 86                }
 087                if (_receiver == null)
 88                {
 089                    await CreateAndInitializeSessionReceiver(cancellationToken).ConfigureAwait(false);
 90                }
 091                _threadCount++;
 092                return true;
 93            }
 94            finally
 95            {
 096                if (releaseSemaphore)
 97                {
 098                    _semaphore.Release();
 99                }
 100            }
 0101        }
 102
 103        private async Task WaitSemaphore(CancellationToken cancellationToken)
 104        {
 105            try
 106            {
 0107                await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 0108            }
 0109            catch (OperationCanceledException)
 110            {
 111                // propagate as TCE so can be handled by
 112                // caller
 0113                throw new TaskCanceledException();
 114            }
 0115        }
 116
 117        private async Task CreateAndInitializeSessionReceiver(
 118            CancellationToken processorCancellationToken)
 119        {
 0120            await CreateReceiver(processorCancellationToken).ConfigureAwait(false);
 0121            _sessionCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(processorCancellationToken);
 122
 0123            if (AutoRenewLock)
 124            {
 0125                _sessionLockRenewalTask = RenewSessionLock();
 126            }
 127
 0128            if (_sessionInitHandler != null)
 129            {
 0130                var args = new ProcessSessionEventArgs(_receiver, processorCancellationToken);
 0131                await _sessionInitHandler(args).ConfigureAwait(false);
 132            }
 0133        }
 134
 135        private async Task CreateReceiver(CancellationToken processorCancellationToken)
 136        {
 0137            bool releaseSemaphore = false;
 138            try
 139            {
 0140                await _concurrentAcceptSessionsSemaphore.WaitAsync(processorCancellationToken).ConfigureAwait(false);
 141                // only attempt to release semaphore if WaitAsync is successful,
 142                // otherwise SemaphoreFullException can occur.
 0143                releaseSemaphore = true;
 0144                _receiver = await ServiceBusSessionReceiver.CreateSessionReceiverAsync(
 0145                    entityPath: _entityPath,
 0146                    connection: _connection,
 0147                    plugins: _plugins,
 0148                    options: _sessionReceiverOptions,
 0149                    cancellationToken: processorCancellationToken).ConfigureAwait(false);
 0150            }
 0151            catch (OperationCanceledException)
 152            {
 153                // propagate as TCE so it will be handled by the outer catch block
 0154                throw new TaskCanceledException();
 155            }
 156            finally
 157            {
 0158                if (releaseSemaphore)
 159                {
 0160                    _concurrentAcceptSessionsSemaphore.Release();
 161                }
 162            }
 0163        }
 164
 165        public override async Task CloseReceiverIfNeeded(
 166            CancellationToken processorCancellationToken)
 167        {
 0168            bool releaseSemaphore = false;
 169            try
 170            {
 171                // Intentionally not including processor cancellation token as
 172                // we need to ensure that we at least attempt to close the receiver if needed.
 0173                await WaitSemaphore(CancellationToken.None).ConfigureAwait(false);
 0174                releaseSemaphore = true;
 0175                if (_receiver == null)
 176                {
 0177                    return;
 178                }
 0179                _threadCount--;
 0180                if (_threadCount == 0)
 181                {
 0182                    if (!_keepOpenOnReceiveTimeout ||
 0183                        !AutoRenewLock ||
 0184                        _sessionLockRenewalCancellationSource.IsCancellationRequested)
 185                    {
 0186                        await CloseReceiver(processorCancellationToken).ConfigureAwait(false);
 187                    }
 188                }
 0189            }
 190            finally
 191            {
 0192                if (releaseSemaphore)
 193                {
 0194                    _semaphore.Release();
 195                }
 196            }
 0197        }
 198
 199        private async Task CloseReceiver(CancellationToken cancellationToken)
 200        {
 0201            if (_receiver == null || _receiver.IsDisposed)
 202            {
 0203                return;
 204            }
 205            try
 206            {
 0207                if (_sessionCloseHandler != null)
 208                {
 0209                    var args = new ProcessSessionEventArgs(_receiver, cancellationToken);
 0210                    await _sessionCloseHandler(args).ConfigureAwait(false);
 211                }
 0212            }
 0213            catch (Exception exception)
 214            {
 0215                await RaiseExceptionReceived(
 0216                    new ProcessErrorEventArgs(
 0217                        exception,
 0218                        ServiceBusErrorSource.CloseMessageSession,
 0219                        _fullyQualifiedNamespace,
 0220                        _entityPath))
 0221                    .ConfigureAwait(false);
 222            }
 223            finally
 224            {
 225                // cancel the automatic session lock renewal
 0226                await CancelTask(_sessionLockRenewalCancellationSource, _sessionLockRenewalTask).ConfigureAwait(false);
 227
 228                // Always at least attempt to dispose. If this fails, it won't be retried.
 0229                await _receiver.DisposeAsync().ConfigureAwait(false);
 0230                _receiver = null;
 231            }
 0232        }
 233
 234        public override async Task ReceiveAndProcessMessagesAsync(CancellationToken processorCancellationToken)
 235        {
 0236            ServiceBusErrorSource errorSource = ServiceBusErrorSource.AcceptMessageSession;
 0237            bool canProcess = false;
 238            try
 239            {
 240                try
 241                {
 0242                    canProcess = await EnsureCanProcess(processorCancellationToken).ConfigureAwait(false);
 0243                    if (!canProcess)
 244                    {
 0245                        return;
 246                    }
 0247                }
 248                catch (ServiceBusException ex)
 0249                when (ex.Reason == ServiceBusFailureReason.ServiceTimeout)
 250                {
 251                    // these exceptions are expected when no messages are available
 252                    // so simply return and allow this to be tried again on next thread
 0253                    return;
 254                }
 255                // loop within the context of this thread
 0256                while (!_sessionCancellationSource.Token.IsCancellationRequested)
 257                {
 0258                    errorSource = ServiceBusErrorSource.Receive;
 0259                    ServiceBusReceivedMessage message = await _receiver.ReceiveMessageAsync(
 0260                        _maxReceiveWaitTime,
 0261                        _sessionCancellationSource.Token).ConfigureAwait(false);
 0262                    if (message == null)
 263                    {
 264                        // Break out of the loop to allow a new session to
 265                        // be processed.
 266                        break;
 267                    }
 0268                    await ProcessOneMessageWithinScopeAsync(
 0269                        message,
 0270                        DiagnosticProperty.ProcessSessionMessageActivityName,
 0271                        _sessionCancellationSource.Token).ConfigureAwait(false);
 272                }
 0273            }
 274            catch (Exception ex)
 0275            when (!(ex is TaskCanceledException) ||
 0276            // If the user manually throws a TCE, then we should log it.
 0277            (!_sessionCancellationSource.IsCancellationRequested &&
 0278            // Even though the _sessionCancellationSource is linked to processorCancellationToken,
 0279            // we need to check both here in case the processor token gets cancelled before the
 0280            // session token is linked.
 0281            !processorCancellationToken.IsCancellationRequested))
 282            {
 0283                if (ex is ServiceBusException sbException && sbException.ProcessorErrorSource.HasValue)
 284                {
 0285                    errorSource = sbException.ProcessorErrorSource.Value;
 286
 287                    // Signal cancellation so user event handlers can stop whatever processing they are doing
 288                    // as soon as we know the session lock has been lost. Note, we don't have analogous handling
 289                    // for message locks in ReceiverManager, because there is only ever one thread processing a
 290                    // single message at one time, so cancelling the token there would serve no purpose.
 0291                    if (sbException.Reason == ServiceBusFailureReason.SessionLockLost)
 292                    {
 0293                        _sessionCancellationSource.Cancel();
 294                    }
 295                }
 0296                await RaiseExceptionReceived(
 0297                    new ProcessErrorEventArgs(
 0298                        ex,
 0299                        errorSource,
 0300                        _fullyQualifiedNamespace,
 0301                        _entityPath))
 0302                    .ConfigureAwait(false);
 303            }
 304            finally
 305            {
 0306                if (canProcess)
 307                {
 0308                    await CloseReceiverIfNeeded(
 0309                        processorCancellationToken).ConfigureAwait(false);
 310                }
 311            }
 0312        }
 313
 314        /// <summary>
 315        ///
 316        /// </summary>
 317        /// <returns></returns>
 318        private async Task RenewSessionLock()
 319        {
 0320            _sessionLockRenewalCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(
 0321                _sessionCancellationSource.Token);
 0322            _sessionLockRenewalCancellationSource.CancelAfter(_processorOptions.MaxAutoLockRenewalDuration);
 0323            CancellationToken sessionLockRenewalCancellationToken = _sessionLockRenewalCancellationSource.Token;
 0324            while (!sessionLockRenewalCancellationToken.IsCancellationRequested)
 325            {
 326                try
 327                {
 0328                    ServiceBusEventSource.Log.ProcessorRenewSessionLockStart(_identifier, _receiver.SessionId);
 0329                    TimeSpan delay = CalculateRenewDelay(_receiver.SessionLockedUntil);
 330
 0331                    await Task.Delay(delay, sessionLockRenewalCancellationToken).ConfigureAwait(false);
 0332                    if (_receiver.IsDisposed)
 333                    {
 0334                        break;
 335                    }
 0336                    await _receiver.RenewSessionLockAsync(sessionLockRenewalCancellationToken).ConfigureAwait(false);
 0337                    ServiceBusEventSource.Log.ProcessorRenewSessionLockComplete(_identifier);
 0338                }
 339
 0340                catch (Exception ex) when (!(ex is TaskCanceledException))
 341                {
 0342                    ServiceBusEventSource.Log.ProcessorRenewSessionLockException(_identifier, ex.ToString());
 0343                    await HandleRenewLockException(ex, sessionLockRenewalCancellationToken).ConfigureAwait(false);
 344
 345                    // if the error was not transient, break out of the loop
 0346                    if (!(ex as ServiceBusException)?.IsTransient == true)
 347                    {
 348                        break;
 349                    }
 350                }
 351            }
 0352        }
 353
 354        protected override async Task OnMessageHandler(
 355            ServiceBusReceivedMessage message,
 356            CancellationToken cancellationToken)
 357        {
 0358            var args = new ProcessSessionMessageEventArgs(
 0359                message,
 0360                _receiver,
 0361                cancellationToken);
 0362            await _messageHandler(args).ConfigureAwait(false);
 0363        }
 364    }
 365}