< Summary

Class:Azure.Messaging.ServiceBus.ServiceBusProcessor
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Processor\ServiceBusProcessor.cs
Covered lines:197
Uncovered lines:38
Coverable lines:235
Total lines:694
Line coverage:83.8% (197 of 235)
Covered branches:56
Total branches:78
Branch coverage:71.7% (56 of 78)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_MaxConcurrentAcceptSessionsSemaphore()-0%100%
.ctor(...)-93.44%70%
get_RunningTaskTokenSource()-100%100%
get_ActiveReceiveTask()-100%100%
get_FullyQualifiedNamespace()-100%100%
get_EntityPath()-100%100%
get_Identifier()-100%100%
get_ReceiveMode()-100%100%
get_IsSessionProcessor()-100%100%
get_PrefetchCount()-100%100%
get_IsProcessing()-0%100%
get_MaxConcurrentCalls()-100%100%
get_MaxReceiveWaitTime()-100%100%
get_AutoComplete()-100%100%
get_MaxAutoLockRenewalDuration()-100%100%
get_Logger()-0%100%
get_MaxConcurrentSessions()-100%100%
get_MaxConcurrentCallsPerSession()-100%100%
.ctor()-0%100%
Equals(...)-0%100%
GetHashCode()-0%100%
ToString()-0%100%
add_ProcessMessageAsync(...)-100%100%
remove_ProcessMessageAsync(...)-100%100%
add_ProcessSessionMessageAsync(...)-100%100%
remove_ProcessSessionMessageAsync(...)-100%100%
add_ProcessErrorAsync(...)-100%100%
remove_ProcessErrorAsync(...)-100%100%
add_SessionInitializingAsync(...)-100%100%
remove_SessionInitializingAsync(...)-100%100%
add_SessionClosingAsync(...)-100%100%
remove_SessionClosingAsync(...)-100%100%
StartProcessingAsync()-95.65%62.5%
InitializeReceiverManagers()-37.14%10%
ValidateErrorHandler()-100%100%
ValidateMessageHandler()-100%100%
StopProcessingAsync()-96.43%100%
RunReceiveTaskAsync()-80%60%
ReceiveAndProcessMessagesAsync()-100%100%
EnsureNotRunningAndInvoke(...)-77.78%50%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Processor\ServiceBusProcessor.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.ComponentModel;
 7using System.Diagnostics.CodeAnalysis;
 8using System.Globalization;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Azure.Core;
 12using Azure.Messaging.ServiceBus.Core;
 13using Azure.Messaging.ServiceBus.Diagnostics;
 14using Azure.Messaging.ServiceBus.Plugins;
 15
 16namespace Azure.Messaging.ServiceBus
 17{
 18    /// <summary>
 19    /// The <see cref="ServiceBusProcessor"/> provides an abstraction around a set of <see cref="ServiceBusReceiver"/> t
 20    /// allows using an event based model for processing received <see cref="ServiceBusReceivedMessage" />. It is constr
 21    /// <see cref="ServiceBusClient.CreateProcessor(string, ServiceBusProcessorOptions)"/>.
 22    /// The event handler is specified with the <see cref="ProcessMessageAsync"/>
 23    /// property. The error handler is specified with the <see cref="ProcessErrorAsync"/> property.
 24    /// To start processing after the handlers have been specified, call <see cref="StartProcessingAsync"/>.
 25    /// </summary>
 26#pragma warning disable CA1001 // Types that own disposable fields should be disposable
 27    public class ServiceBusProcessor
 28#pragma warning restore CA1001 // Types that own disposable fields should be disposable
 29    {
 30        private Func<ProcessMessageEventArgs, Task> _processMessageAsync;
 31
 32        private Func<ProcessSessionMessageEventArgs, Task> _processSessionMessageAsync;
 33
 34        private Func<ProcessErrorEventArgs, Task> _processErrorAsync = default;
 35
 36        private Func<ProcessSessionEventArgs, Task> _sessionInitializingAsync;
 37
 38        private Func<ProcessSessionEventArgs, Task> _sessionClosingAsync;
 39
 40        private SemaphoreSlim MessageHandlerSemaphore;
 41
 42        /// <summary>
 43        ///
 44        /// </summary>
 045        private SemaphoreSlim MaxConcurrentAcceptSessionsSemaphore { get; set; }
 46
 47        /// <summary>The primitive for synchronizing access during start and close operations.</summary>
 048        private readonly SemaphoreSlim ProcessingStartStopSemaphore = new SemaphoreSlim(1, 1);
 49
 2850        private CancellationTokenSource RunningTaskTokenSource { get; set; }
 51
 18052        private Task ActiveReceiveTask { get; set; }
 53
 54        /// <summary>
 55        /// The fully qualified Service Bus namespace that the receiver is associated with.  This is likely
 56        /// to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
 57        /// </summary>
 4258        public string FullyQualifiedNamespace => _connection.FullyQualifiedNamespace;
 59
 60        /// <summary>
 61        /// The path of the Service Bus entity that the processor is connected to, specific to the
 62        /// Service Bus namespace that contains it.
 63        /// </summary>
 14064        public string EntityPath { get; private set; }
 65
 66        /// <summary>
 67        /// Gets the ID to identify this client. This can be used to correlate logs and exceptions.
 68        /// </summary>
 69        /// <remarks>Every new client has a unique ID.</remarks>
 11070        internal string Identifier { get; private set; }
 71
 72        /// <summary>
 73        /// The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.
 74        /// </summary>
 475        public ReceiveMode ReceiveMode { get; }
 76
 77        /// <summary>
 78        /// Indicates whether the processor is configured to process session entities.
 79        /// </summary>
 1880        internal bool IsSessionProcessor { get; }
 81
 82        /// <summary>
 83        /// The number of messages that will be eagerly requested from Queues or Subscriptions
 84        /// during processing. This is intended to help maximize throughput by allowing the
 85        /// processor to receive from a local cache rather than waiting on a service request.
 86        /// </summary>
 487        public int PrefetchCount { get; }
 88
 89        /// <summary>
 90        /// Indicates whether or not this <see cref="ServiceBusProcessor"/> is currently processing messages.
 91        /// </summary>
 92        ///
 93        /// <value>
 94        /// <c>true</c> if the client is processing messages; otherwise, <c>false</c>.
 95        /// </value>
 096        public bool IsProcessing => ActiveReceiveTask != null;
 97
 98        private readonly ServiceBusProcessorOptions _options;
 99
 100        /// <summary>
 101        ///   The active connection to the Azure Service Bus service, enabling client communications for metadata
 102        ///   about the associated Service Bus entity and access to transport-aware consumers.
 103        /// </summary>
 104        private readonly ServiceBusConnection _connection;
 105
 106        /// <summary>Gets or sets the maximum number of concurrent calls to the
 107        /// <see cref="ProcessMessageAsync"/> event handler the processor should initiate.
 108        /// </summary>
 109        ///
 110        /// <value>The maximum number of concurrent calls to the event handler.</value>
 26111        public int MaxConcurrentCalls { get; }
 112
 113        /// <summary>
 114        /// The maximum amount of time to wait for each Receive call using the processor's underlying receiver. If not s
 115        /// </summary>
 2116        public TimeSpan? MaxReceiveWaitTime { get; }
 117
 118        /// <summary>Gets or sets a value that indicates whether the <see cref="ServiceBusProcessor"/> should automatica
 119        /// complete messages after the event handler has completed processing. If the event handler
 120        /// triggers an exception, the message will not be automatically completed.</summary>
 121        ///
 122        /// <value>true to complete the message processing automatically on successful execution of the operation; other
 4123        public bool AutoComplete { get; }
 124
 125        /// <summary>
 126        /// Gets or sets the maximum duration within which the lock will be renewed automatically. This
 127        /// value should be greater than the longest message lock duration; for example, the LockDuration Property.
 128        /// </summary>
 129        ///
 130        /// <value>The maximum duration during which locks are automatically renewed.</value>
 131        ///
 132        /// <remarks>The message renew can continue for sometime in the background
 133        /// after completion of message and result in a few false MessageLockLostExceptions temporarily.</remarks>
 4134        public TimeSpan MaxAutoLockRenewalDuration { get; }
 135
 136        /// <summary>
 137        ///   The instance of <see cref="ServiceBusEventSource" /> which can be mocked for testing.
 138        /// </summary>
 139        ///
 0140        internal ServiceBusEventSource Logger { get; set; } = ServiceBusEventSource.Log;
 16141        internal int MaxConcurrentSessions { get; }
 16142        internal int MaxConcurrentCallsPerSession { get; }
 143
 144        private readonly string[] _sessionIds;
 145        private readonly EntityScopeFactory _scopeFactory;
 146        private readonly IList<ServiceBusPlugin> _plugins;
 0147        private readonly IList<ReceiverManager> _receiverManagers = new List<ReceiverManager>();
 148
 149        /// <summary>
 150        ///   Initializes a new instance of the <see cref="ServiceBusProcessor"/> class.
 151        /// </summary>
 152        ///
 153        /// <param name="connection">The <see cref="ServiceBusConnection" /> connection to use for communication with th
 154        /// <param name="entityPath">The queue name or subscription path to process messages from.</param>
 155        /// <param name="isSessionEntity">Whether or not the processor is associated with a session entity.</param>
 156        /// <param name="plugins">The set of plugins to apply to incoming messages.</param>
 157        /// <param name="options">The set of options to use when configuring the processor.</param>
 158        /// <param name="sessionIds">An optional set of session Ids to limit processing to.
 159        /// Only applies if isSessionEntity is true.</param>
 160        /// <param name="maxConcurrentSessions">The max number of sessions that can be processed concurrently.
 161        /// Only applies if isSessionEntity is true.</param>
 162        /// <param name="maxConcurrentCallsPerSession">The max number of concurrent calls per session.
 163        /// Only applies if isSessionEntity is true.</param>
 34164        internal ServiceBusProcessor(
 34165            ServiceBusConnection connection,
 34166            string entityPath,
 34167            bool isSessionEntity,
 34168            IList<ServiceBusPlugin> plugins,
 34169            ServiceBusProcessorOptions options,
 34170            string[] sessionIds = default,
 34171            int maxConcurrentSessions = default,
 34172            int maxConcurrentCallsPerSession = default)
 173        {
 34174            Argument.AssertNotNullOrWhiteSpace(entityPath, nameof(entityPath));
 34175            Argument.AssertNotNull(connection, nameof(connection));
 34176            Argument.AssertNotNull(connection.RetryOptions, nameof(connection.RetryOptions));
 34177            connection.ThrowIfClosed();
 178
 34179            _options = options?.Clone() ?? new ServiceBusProcessorOptions();
 34180            _connection = connection;
 34181            EntityPath = entityPath;
 34182            Identifier = DiagnosticUtilities.GenerateIdentifier(EntityPath);
 183
 34184            ReceiveMode = _options.ReceiveMode;
 34185            PrefetchCount = _options.PrefetchCount;
 34186            MaxAutoLockRenewalDuration = _options.MaxAutoLockRenewalDuration;
 34187            MaxConcurrentCalls = _options.MaxConcurrentCalls;
 34188            MaxConcurrentSessions = maxConcurrentSessions;
 34189            MaxConcurrentCallsPerSession = maxConcurrentCallsPerSession;
 34190            _sessionIds = sessionIds ?? Array.Empty<string>();
 191
 34192            int maxCalls = isSessionEntity ?
 34193                (_sessionIds.Length > 0 ?
 34194                    Math.Min(_sessionIds.Length, MaxConcurrentSessions) :
 34195                    MaxConcurrentSessions) * MaxConcurrentCallsPerSession :
 34196                MaxConcurrentCalls;
 197
 34198            MessageHandlerSemaphore = new SemaphoreSlim(
 34199                maxCalls,
 34200                maxCalls);
 34201            var maxAcceptSessions = Math.Min(maxCalls, 2 * Environment.ProcessorCount);
 34202            MaxConcurrentAcceptSessionsSemaphore = new SemaphoreSlim(
 34203                maxAcceptSessions,
 34204                maxAcceptSessions);
 205
 34206            MaxReceiveWaitTime = _options.MaxReceiveWaitTime;
 34207            AutoComplete = _options.AutoComplete;
 208
 34209            EntityPath = entityPath;
 34210            IsSessionProcessor = isSessionEntity;
 34211            _scopeFactory = new EntityScopeFactory(EntityPath, FullyQualifiedNamespace);
 34212            _plugins = plugins;
 34213        }
 214
 215        /// <summary>
 216        /// Initializes a new instance of the <see cref="ServiceBusProcessor"/> class for mocking.
 217        /// </summary>
 0218        protected ServiceBusProcessor()
 219        {
 0220        }
 221
 222        /// <summary>
 223        ///   Determines whether the specified <see cref="System.Object" /> is equal to this instance.
 224        /// </summary>
 225        ///
 226        /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param>
 227        ///
 228        /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c>
 229        [EditorBrowsable(EditorBrowsableState.Never)]
 0230        public override bool Equals(object obj) => base.Equals(obj);
 231
 232        /// <summary>
 233        /// Returns a hash code for this instance.
 234        /// </summary>
 235        ///
 236        /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha
 237        ///
 238        [EditorBrowsable(EditorBrowsableState.Never)]
 0239        public override int GetHashCode() => base.GetHashCode();
 240
 241        /// <summary>
 242        /// Converts the instance to string representation.
 243        /// </summary>
 244        ///
 245        /// <returns>A <see cref="System.String" /> that represents this instance.</returns>
 246        ///
 247        [EditorBrowsable(EditorBrowsableState.Never)]
 0248        public override string ToString() => base.ToString();
 249
 250        /// <summary>
 251        /// The event responsible for processing messages received from the Queue or Subscription.
 252        /// Implementation is mandatory.
 253        /// </summary>
 254        ///
 255        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 256        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 257        public event Func<ProcessMessageEventArgs, Task> ProcessMessageAsync
 258        {
 259            add
 260            {
 18261                Argument.AssertNotNull(value, nameof(ProcessMessageAsync));
 262
 16263                if (_processMessageAsync != default)
 264                {
 265#pragma warning disable CA1065 // Do not raise exceptions in unexpected locations
 2266                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 267#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
 268                }
 28269                EnsureNotRunningAndInvoke(() => _processMessageAsync = value);
 270
 14271            }
 272
 273            remove
 274            {
 6275                Argument.AssertNotNull(value, nameof(ProcessMessageAsync));
 276
 6277                if (_processMessageAsync != value)
 278                {
 279#pragma warning disable CA1065 // Do not raise exceptions in unexpected locations
 4280                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 281#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
 282                }
 283
 4284                EnsureNotRunningAndInvoke(() => _processMessageAsync = default);
 2285            }
 286        }
 287
 288        /// <summary>
 289        /// The event responsible for processing messages received from the Queue or Subscription. Implementation
 290        /// is mandatory.
 291        /// </summary>
 292        ///
 293        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 294        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 295        internal event Func<ProcessSessionMessageEventArgs, Task> ProcessSessionMessageAsync
 296        {
 297            add
 298            {
 14299                Argument.AssertNotNull(value, nameof(ProcessMessageAsync));
 300
 12301                if (_processSessionMessageAsync != default)
 302                {
 2303                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 304                }
 20305                EnsureNotRunningAndInvoke(() => _processSessionMessageAsync = value);
 306
 10307            }
 308
 309            remove
 310            {
 6311                Argument.AssertNotNull(value, nameof(ProcessMessageAsync));
 312
 6313                if (_processSessionMessageAsync != value)
 314                {
 4315                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 316                }
 317
 4318                EnsureNotRunningAndInvoke(() => _processSessionMessageAsync = default);
 2319            }
 320        }
 321
 322        /// <summary>
 323        /// The event responsible for processing unhandled exceptions thrown while this processor is running.
 324        /// Implementation is mandatory.
 325        /// </summary>
 326        ///
 327        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 328        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 329        public event Func<ProcessErrorEventArgs, Task> ProcessErrorAsync
 330        {
 331            add
 332            {
 28333                Argument.AssertNotNull(value, nameof(ProcessErrorAsync));
 334
 24335                if (_processErrorAsync != default)
 336                {
 337#pragma warning disable CA1065 // Do not raise exceptions in unexpected locations
 4338                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 339#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
 340                }
 341
 40342                EnsureNotRunningAndInvoke(() => _processErrorAsync = value);
 20343            }
 344
 345            remove
 346            {
 12347                Argument.AssertNotNull(value, nameof(ProcessErrorAsync));
 348
 12349                if (_processErrorAsync != value)
 350                {
 351#pragma warning disable CA1065 // Do not raise exceptions in unexpected locations
 8352                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 353#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
 354                }
 355
 8356                EnsureNotRunningAndInvoke(() => _processErrorAsync = default);
 4357            }
 358        }
 359
 360        /// <summary>
 361        /// Optional event that can be set to be notified when a new session is about to be processed.
 362        /// </summary>
 363        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 364        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 365        internal event Func<ProcessSessionEventArgs, Task> SessionInitializingAsync
 366        {
 367            add
 368            {
 12369                Argument.AssertNotNull(value, nameof(SessionInitializingAsync));
 370
 10371                if (_sessionInitializingAsync != default)
 372                {
 2373                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 374                }
 16375                EnsureNotRunningAndInvoke(() => _sessionInitializingAsync = value);
 376
 8377            }
 378
 379            remove
 380            {
 6381                Argument.AssertNotNull(value, nameof(SessionInitializingAsync));
 6382                if (_sessionInitializingAsync != value)
 383                {
 4384                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 385                }
 4386                EnsureNotRunningAndInvoke(() => _sessionInitializingAsync = default);
 2387            }
 388        }
 389
 390        /// <summary>
 391        /// Optional event that can be set to be notified when a session is about to be closed for processing.
 392        /// This means that the most recent <see cref="ServiceBusReceiver.ReceiveMessageAsync"/> call timed out so there
 393        /// available to be received for the session.
 394        /// </summary>
 395        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 396        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 397        internal event Func<ProcessSessionEventArgs, Task> SessionClosingAsync
 398        {
 399            add
 400            {
 12401                Argument.AssertNotNull(value, nameof(SessionClosingAsync));
 402
 10403                if (_sessionClosingAsync != default)
 404                {
 2405                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 406                }
 16407                EnsureNotRunningAndInvoke(() => _sessionClosingAsync = value);
 408
 8409            }
 410
 411            remove
 412            {
 6413                Argument.AssertNotNull(value, nameof(SessionClosingAsync));
 6414                if (_sessionClosingAsync != value)
 415                {
 4416                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 417                }
 4418                EnsureNotRunningAndInvoke(() => _sessionClosingAsync = default);
 2419            }
 420        }
 421
 422        /// <summary>
 423        /// Signals the <see cref="ServiceBusProcessor" /> to begin processing messages. Should this method be called wh
 424        /// is running, no action is taken.
 425        /// </summary>
 426        ///
 427        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 428        public virtual async Task StartProcessingAsync(
 429            CancellationToken cancellationToken = default)
 430        {
 14431            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 14432            if (ActiveReceiveTask == null)
 433            {
 14434                Logger.StartProcessingStart(Identifier);
 14435                bool releaseGuard = false;
 436
 437                try
 438                {
 14439                    await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 14440                    releaseGuard = true;
 14441                    ValidateMessageHandler();
 8442                    ValidateErrorHandler();
 4443                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 444
 4445                    InitializeReceiverManagers();
 446
 447                    // We expect the token source to be null, but we are playing safe.
 448
 4449                    RunningTaskTokenSource?.Cancel();
 4450                    RunningTaskTokenSource?.Dispose();
 4451                    RunningTaskTokenSource = new CancellationTokenSource();
 452
 453                    // Start the main running task.
 4454                    ActiveReceiveTask = RunReceiveTaskAsync(RunningTaskTokenSource.Token);
 4455                }
 10456                catch (Exception exception)
 457                {
 10458                    Logger.StartProcessingException(Identifier, exception.ToString());
 10459                    throw;
 460                }
 461                finally
 462                {
 14463                    if (releaseGuard)
 464                    {
 14465                        ProcessingStartStopSemaphore.Release();
 466                    }
 467                }
 4468                Logger.StartProcessingComplete(Identifier);
 469            }
 470            else
 471            {
 0472                throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
 473            }
 4474        }
 475
 476        private void InitializeReceiverManagers()
 477        {
 4478            if (IsSessionProcessor)
 479            {
 0480                var numReceivers = _sessionIds.Length > 0 ? _sessionIds.Length : MaxConcurrentSessions;
 0481                for (int i = 0; i < numReceivers; i++)
 482                {
 0483                    var sessionId = _sessionIds.Length > 0 ? _sessionIds[i] : null;
 484                    // If the user has listed named sessions, and they
 485                    // have MaxConcurrentSessions greater or equal to the number
 486                    // of sessions, we can leave the sessions open at all times
 487                    // instead of cycling through them as receive calls time out.
 0488                    bool keepOpenOnReceiveTimeout = _sessionIds.Length > 0 &&
 0489                        MaxConcurrentSessions >= _sessionIds.Length;
 490
 0491                    _receiverManagers.Add(
 0492                        new SessionReceiverManager(
 0493                            _connection,
 0494                            FullyQualifiedNamespace,
 0495                            EntityPath,
 0496                            Identifier,
 0497                            sessionId,
 0498                            _options,
 0499                            _sessionInitializingAsync,
 0500                            _sessionClosingAsync,
 0501                            _processSessionMessageAsync,
 0502                            _processErrorAsync,
 0503                            MaxConcurrentAcceptSessionsSemaphore,
 0504                            _scopeFactory,
 0505                            _plugins,
 0506                            MaxConcurrentCallsPerSession,
 0507                            keepOpenOnReceiveTimeout));
 508                }
 509            }
 510            else
 511            {
 4512                _receiverManagers.Add(
 4513                    new ReceiverManager(
 4514                        _connection,
 4515                        FullyQualifiedNamespace,
 4516                        EntityPath,
 4517                        Identifier,
 4518                        _options,
 4519                        _processMessageAsync,
 4520                        _processErrorAsync,
 4521                        _scopeFactory,
 4522                        _plugins));
 523            }
 4524        }
 525
 526        private void ValidateErrorHandler()
 527        {
 8528            if (_processErrorAsync == null)
 529            {
 4530                throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartMessa
 531            }
 4532        }
 533
 534        private void ValidateMessageHandler()
 535        {
 14536            if (IsSessionProcessor)
 537            {
 4538                if (_processSessionMessageAsync == null)
 539                {
 2540                    throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartM
 541                }
 542            }
 10543            else if (_processMessageAsync == null)
 544            {
 4545                throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartMessa
 546            }
 8547        }
 548
 549        /// <summary>
 550        /// Signals the <see cref="ServiceBusProcessor" /> to stop processing events. Should this method be called while
 551        /// is not running, no action is taken.
 552        /// </summary>
 553        ///
 554        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 555        public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default)
 556        {
 6557            bool releaseGuard = false;
 558            try
 559            {
 6560                if (ActiveReceiveTask != null)
 561                {
 6562                    Logger.StopProcessingStart(Identifier);
 6563                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 564
 4565                    await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 4566                    releaseGuard = true;
 567
 4568                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 569
 570                    // Cancel the current running task.
 571
 4572                    RunningTaskTokenSource.Cancel();
 4573                    RunningTaskTokenSource.Dispose();
 4574                    RunningTaskTokenSource = null;
 575
 576                    // Now that a cancellation request has been issued, wait for the running task to finish.  In case so
 577                    // unexpected happened and it stopped working midway, this is the moment we expect to catch an excep
 578                    try
 579                    {
 4580                        await ActiveReceiveTask.ConfigureAwait(false);
 0581                    }
 4582                    catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException)
 583                    {
 584                        // Nothing to do here.  These exceptions are expected.
 4585                    }
 586
 4587                    ActiveReceiveTask.Dispose();
 4588                    ActiveReceiveTask = null;
 589
 16590                    foreach (ReceiverManager receiverManager in _receiverManagers)
 591                    {
 4592                        await receiverManager.CloseReceiverIfNeeded(
 4593                            cancellationToken)
 4594                            .ConfigureAwait(false);
 595                    }
 596                }
 4597            }
 2598            catch (Exception exception)
 599            {
 2600                Logger.StopProcessingException(Identifier, exception.ToString());
 2601                throw;
 602            }
 603            finally
 604            {
 6605                if (releaseGuard)
 606                {
 4607                    ProcessingStartStopSemaphore.Release();
 608                }
 609            }
 4610            Logger.StopProcessingComplete(Identifier);
 4611        }
 612
 613        /// <summary>
 614        /// Runs the Receive task in as many threads as are
 615        /// specified in the <see cref="MaxConcurrentCalls"/> property.
 616        /// </summary>
 617        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 618        private async Task RunReceiveTaskAsync(
 619            CancellationToken cancellationToken)
 620        {
 4621            List<Task> tasks = new List<Task>();
 622            try
 623            {
 8624                while (!cancellationToken.IsCancellationRequested)
 625                {
 28626                    foreach (ReceiverManager receiverManager in _receiverManagers)
 627                    {
 8628                        await MessageHandlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 629                        // hold onto all the tasks that we are starting so that when cancellation is requested,
 630                        // we can await them to make sure we surface any unexpected exceptions, i.e. exceptions
 631                        // other than TaskCanceledExceptions
 4632                        tasks.Add(ReceiveAndProcessMessagesAsync(receiverManager, cancellationToken));
 4633                    }
 4634                    if (tasks.Count > MaxConcurrentCalls)
 635                    {
 0636                        tasks.RemoveAll(t => t.IsCompleted);
 637                    }
 638                }
 639            }
 640            finally
 641            {
 4642                await Task.WhenAll(tasks).ConfigureAwait(false);
 643            }
 0644        }
 645
 646        private async Task ReceiveAndProcessMessagesAsync(
 647            ReceiverManager receiverManager,
 648            CancellationToken cancellationToken)
 649        {
 650            try
 651            {
 4652                await receiverManager.ReceiveAndProcessMessagesAsync(cancellationToken).ConfigureAwait(false);
 4653            }
 654            finally
 655            {
 4656                MessageHandlerSemaphore.Release();
 657            }
 4658        }
 659
 660        /// <summary>
 661        /// Invokes a specified action only if this <see cref="ServiceBusProcessor" /> instance is not running.
 662        /// </summary>
 663        ///
 664        /// <param name="action">The action to invoke.</param>
 665        ///
 666        /// <exception cref="InvalidOperationException">Occurs when this method is invoked while the event processor is 
 667        internal void EnsureNotRunningAndInvoke(Action action)
 668        {
 72669            if (ActiveReceiveTask == null)
 670            {
 671                try
 672                {
 72673                    ProcessingStartStopSemaphore.Wait();
 72674                    if (ActiveReceiveTask == null)
 675                    {
 72676                        action?.Invoke();
 677                    }
 678                    else
 679                    {
 0680                        throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
 681                    }
 682                }
 683                finally
 684                {
 72685                    ProcessingStartStopSemaphore.Release();
 72686                }
 687            }
 688            else
 689            {
 0690                throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
 691            }
 72692        }
 693    }
 694}