< Summary

Class:Azure.Messaging.EventHubs.Primitives.EventProcessor`1
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Primitives\EventProcessor{TPartition}.cs
Covered lines:376
Uncovered lines:33
Coverable lines:409
Total lines:1619
Line coverage:91.9% (376 of 409)
Covered branches:144
Total branches:162
Branch coverage:88.8% (144 of 162)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-85.96%88.46%
get_FullyQualifiedNamespace()-100%100%
get_EventHubName()-100%100%
get_ConsumerGroup()-100%100%
get_Identifier()-100%100%
get_IsRunning()-80%75%
set_IsRunning(...)-0%100%
get_Status()-81.25%85.71%
get_Logger()-0%100%
get_RetryPolicy()-100%100%
get_ActivePartitionProcessors()-0%100%
get_ConnectionFactory()-100%100%
get_LoadBalancer()-100%100%
get_Options()-100%100%
get_EventBatchMaximumCount()-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor()-0%100%
StartProcessingAsync()-100%100%
StartProcessing(...)-100%100%
StopProcessingAsync()-100%100%
StopProcessing(...)-100%100%
Equals(...)-0%100%
GetHashCode()-0%100%
ToString()-100%100%
CreateConsumer(...)-100%100%
CreateStorageManager(...)-100%100%
CreatePartitionLoadBalancer(...)-100%100%
ProcessEventBatchAsync()-100%100%
CreatePartitionProcessor(...)-95.77%88.89%
<CreatePartitionProcessor()-94.92%87.5%
CreateConnection()-100%100%
OnInitializingPartitionAsync(...)-100%100%
OnPartitionProcessingStoppedAsync(...)-100%100%
ReadLastEnqueuedEventProperties(...)-100%100%
StartProcessingInternalAsync()-88.89%80%
StopProcessingInternalAsync()-91.67%87.5%
RunProcessingAsync()-84.62%62.5%
PerformLoadBalancingAsync()-100%100%
<PerformLoadBalancingAsync()-100%100%
TryStartProcessingPartitionAsync()-96.55%75%
TryStopProcessingPartitionAsync()-80%100%
InvokeOnProcessingErrorAsync(...)-100%100%
get_Processor()-100%100%
.ctor(...)-100%100%
ListOwnershipAsync()-100%100%
ClaimOwnershipAsync()-100%100%
ListCheckpointsAsync()-100%100%
UpdateCheckpointAsync(...)-100%100%
.ctor(...)-100%100%
Dispose()-100%50%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Primitives\EventProcessor{TPartition}.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Concurrent;
 6using System.Collections.Generic;
 7using System.ComponentModel;
 8using System.Diagnostics.CodeAnalysis;
 9using System.Globalization;
 10using System.Linq;
 11using System.Runtime.ExceptionServices;
 12using System.Threading;
 13using System.Threading.Tasks;
 14using Azure.Core;
 15using Azure.Core.Diagnostics;
 16using Azure.Core.Pipeline;
 17using Azure.Messaging.EventHubs.Consumer;
 18using Azure.Messaging.EventHubs.Core;
 19using Azure.Messaging.EventHubs.Diagnostics;
 20using Azure.Messaging.EventHubs.Processor;
 21
 22namespace Azure.Messaging.EventHubs.Primitives
 23{
 24    /// <summary>
 25    ///   Provides a base for creating a custom processor for consuming events across all partitions of a given Event Hu
 26    ///   within the scope of a specific consumer group.  The processor is capable of collaborating with other instances
 27    ///   the same Event Hub and consumer group pairing to share work by using a common storage platform to communicate.
 28    ///   tolerance is also built-in, allowing the processor to be resilient in the face of errors.
 29    /// </summary>
 30    ///
 31    /// <typeparam name="TPartition">The context of the partition for which an operation is being performed.</typeparam>
 32    ///
 33    [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
 34    public abstract class EventProcessor<TPartition> where TPartition : EventProcessorPartition, new()
 35    {
 36        /// <summary>The maximum number of failed consumers to allow when processing a partition; failed consumers are t
 37        private const int MaximumFailedConsumerCount = 1;
 38
 39        /// <summary>The primitive for synchronizing access when starting and stopping the processor.</summary>
 040        private readonly SemaphoreSlim ProcessorRunningGuard = new SemaphoreSlim(1, 1);
 41
 42        /// <summary>Indicates whether or not this event processor is currently running.  Used only for mocking purposes
 43        private bool? _isRunningOverride;
 44
 45        /// <summary>Indicates the current state of the processor; used for mocking and manual status updates.</summary>
 46        private EventProcessorStatus? _statusOverride;
 47
 48        /// <summary>The task responsible for managing the operations of the processor when it is running.</summary>
 49        private Task _runningProcessorTask;
 50
 51        /// <summary>A <see cref="CancellationTokenSource"/> instance to signal the request to cancel the current runnin
 52        private CancellationTokenSource _runningProcessorCancellationSource;
 53
 54        /// <summary>
 55        ///   The fully qualified Event Hubs namespace that the processor is associated with.  This is likely
 56        ///   to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
 57        /// </summary>
 58        ///
 303012259        public string FullyQualifiedNamespace { get; }
 60
 61        /// <summary>
 62        ///   The name of the Event Hub that the processor is connected to, specific to the
 63        ///   Event Hubs namespace that contains it.
 64        /// </summary>
 65        ///
 303103866        public string EventHubName { get; }
 67
 68        /// <summary>
 69        ///   The name of the consumer group this event processor is associated with.  Events will be
 70        ///   read only in the context of this group.
 71        /// </summary>
 72        ///
 118073        public string ConsumerGroup { get; }
 74
 75        /// <summary>
 76        ///   A unique name used to identify this event processor.
 77        /// </summary>
 78        ///
 111279        public string Identifier { get; }
 80
 81        /// <summary>
 82        ///   Indicates whether or not this event processor is currently running.
 83        /// </summary>
 84        ///
 85        public bool IsRunning
 86        {
 87            get
 88            {
 6489                var running = _isRunningOverride;
 90
 6491                if (running.HasValue)
 92                {
 093                    return running.Value;
 94                }
 95
 6496                var status = Status;
 6497                return ((status == EventProcessorStatus.Running) || (status == EventProcessorStatus.Stopping));
 98            }
 99
 0100            protected set => _isRunningOverride = value;
 101        }
 102
 103        /// <summary>
 104        ///   Indicates the current state of the processor.
 105        /// </summary>
 106        ///
 107        internal EventProcessorStatus Status
 108        {
 109            get
 110            {
 111                EventProcessorStatus? statusOverride;
 112
 113                // If there is no active processor task, ensure that it is not
 114                // in the process of starting by attempting to acquire the semaphore.
 115                //
 116                // If the semaphore could not be acquired, then there is an active start/stop
 117                // operation in progress indicating that the processor is not yet running or
 118                // will not be running.
 119
 196120                if (_runningProcessorTask == null)
 121                {
 122                    try
 123                    {
 60124                        if (!ProcessorRunningGuard.Wait(100))
 125                        {
 0126                            return (_statusOverride ?? EventProcessorStatus.NotRunning);
 127                        }
 128
 60129                        statusOverride = _statusOverride;
 60130                    }
 131                    finally
 132                    {
 60133                        ProcessorRunningGuard.Release();
 60134                    }
 135                }
 136                else
 137                {
 136138                    statusOverride = _statusOverride;
 139                }
 140
 196141                if (statusOverride.HasValue)
 142                {
 0143                    return statusOverride.Value;
 144                }
 145
 196146                if ((_runningProcessorTask?.IsFaulted) ?? (false))
 147                {
 24148                    return EventProcessorStatus.Faulted;
 149                }
 150
 172151                if ((!_runningProcessorTask?.IsCompleted) ?? (false))
 152                {
 112153                    return EventProcessorStatus.Running;
 154                }
 155
 60156                return EventProcessorStatus.NotRunning;
 0157            }
 158        }
 159
 160        /// <summary>
 161        ///   The instance of <see cref="EventHubsEventSource" /> which can be mocked for testing.
 162        /// </summary>
 163        ///
 0164        internal EventHubsEventSource Logger { get; set; } = EventHubsEventSource.Log;
 165
 166        /// <summary>
 167        ///   The active policy which governs retry attempts for the processor.
 168        /// </summary>
 169        ///
 144170        protected EventHubsRetryPolicy RetryPolicy { get; }
 171
 172        /// <summary>
 173        ///   The set of currently active partition processing tasks issued by this event processor and their associated
 174        ///   token sources that can be used to cancel the operation.  Partition identifiers are used as keys.
 175        /// </summary>
 176        ///
 0177        private ConcurrentDictionary<string, PartitionProcessor> ActivePartitionProcessors { get; } = new ConcurrentDict
 178
 179        /// <summary>
 180        ///   A factory used to create new <see cref="EventHubConnection" /> instances.
 181        /// </summary>
 182        ///
 10183        private Func<EventHubConnection> ConnectionFactory { get; }
 184
 185        /// <summary>
 186        ///   Responsible for ownership claim for load balancing.
 187        /// </summary>
 188        ///
 490189        private PartitionLoadBalancer LoadBalancer { get; }
 190
 191        /// <summary>
 192        ///   The set of options to use with the <see cref="EventProcessor{TPartition}" />  instance.
 193        /// </summary>
 194        ///
 6088524195        private EventProcessorOptions Options { get; }
 196
 197        /// <summary>
 198        ///   The desired number of events to include in a batch to be processed.  This size is the maximum count in a b
 199        /// </summary>
 200        ///
 3044156201        private int EventBatchMaximumCount { get; }
 202
 203        /// <summary>
 204        ///   Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class.
 205        /// </summary>
 206        ///
 207        /// <param name="eventBatchMaximumCount">The desired number of events to include in a batch to be processed.  Th
 208        /// <param name="consumerGroup">The name of the consumer group the processor is associated with.  Events are rea
 209        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel
 210        /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param>
 211        /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma
 212        /// <param name="options">The set of options to use for the processor.</param>
 213        /// <param name="loadBalancer">The load balancer to use for coordinating processing with other event processor i
 214        ///
 222215        internal EventProcessor(int eventBatchMaximumCount,
 222216                                string consumerGroup,
 222217                                string fullyQualifiedNamespace,
 222218                                string eventHubName,
 222219                                TokenCredential credential,
 222220                                EventProcessorOptions options = default,
 222221                                PartitionLoadBalancer loadBalancer = default)
 222        {
 222223            Argument.AssertInRange(eventBatchMaximumCount, 1, int.MaxValue, nameof(eventBatchMaximumCount));
 214224            Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
 210225            Argument.AssertWellFormedEventHubsNamespace(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace));
 204226            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 200227            Argument.AssertNotNull(credential, nameof(credential));
 228
 198229            options = options?.Clone() ?? new EventProcessorOptions();
 230
 202231            ConnectionFactory = () => new EventHubConnection(fullyQualifiedNamespace, eventHubName, credential, options.
 198232            FullyQualifiedNamespace = fullyQualifiedNamespace;
 198233            EventHubName = eventHubName;
 198234            ConsumerGroup = consumerGroup;
 198235            Identifier = string.IsNullOrEmpty(options.Identifier) ? Guid.NewGuid().ToString() : options.Identifier;
 198236            RetryPolicy = options.RetryOptions.ToRetryPolicy();
 198237            Options = options;
 198238            EventBatchMaximumCount = eventBatchMaximumCount;
 239
 240#pragma warning disable CA2214 // Do not call overridable methods in constructors.  The virtual methods are internal and
 198241            LoadBalancer = loadBalancer ?? CreatePartitionLoadBalancer(CreateStorageManager(this), Identifier, ConsumerG
 242#pragma warning restore CA2214 // Do not call overridable methods in constructors.
 198243        }
 244
 245        /// <summary>
 246        ///   Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class.
 247        /// </summary>
 248        ///
 249        /// <param name="eventBatchMaximumCount">The desired number of events to include in a batch to be processed.  Th
 250        /// <param name="consumerGroup">The name of the consumer group the processor is associated with.  Events are rea
 251        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 252        /// <param name="options">The set of options to use for the processor.</param>
 253        ///
 254        /// <remarks>
 255        ///   If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of t
 256        ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]
 257        ///   connection string.  For example, ";EntityPath=telemetry-hub".
 258        ///
 259        ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s
 260        ///   Event Hub will result in a connection string that contains the name.
 261        /// </remarks>
 262        ///
 263        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 264        ///
 265        protected EventProcessor(int eventBatchMaximumCount,
 266                                 string consumerGroup,
 267                                 string connectionString,
 38268                                 EventProcessorOptions options = default) : this(eventBatchMaximumCount, consumerGroup, 
 269        {
 10270        }
 271
 272        /// <summary>
 273        ///   Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class.
 274        /// </summary>
 275        ///
 276        /// <param name="eventBatchMaximumCount">The desired number of events to include in a batch to be processed.  Th
 277        /// <param name="consumerGroup">The name of the consumer group the processor is associated with.  Events are rea
 278        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 279        /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param>
 280        /// <param name="options">The set of options to use for the processor.</param>
 281        ///
 282        /// <remarks>
 283        ///   If the connection string is copied from the Event Hub itself, it will contain the name of the desired Even
 284        ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub 
 285        ///   passed only once, either as part of the connection string or separately.
 286        /// </remarks>
 287        ///
 288        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 289        ///
 48290        protected EventProcessor(int eventBatchMaximumCount,
 48291                                 string consumerGroup,
 48292                                 string connectionString,
 48293                                 string eventHubName,
 48294                                 EventProcessorOptions options = default)
 295        {
 48296            Argument.AssertInRange(eventBatchMaximumCount, 1, int.MaxValue, nameof(eventBatchMaximumCount));
 40297            Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
 36298            Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString));
 299
 28300            options = options?.Clone() ?? new EventProcessorOptions();
 301
 28302            var connectionStringProperties = ConnectionStringParser.Parse(connectionString);
 28303            connectionStringProperties.Validate(eventHubName, nameof(connectionString));
 304
 20305            ConnectionFactory = () => new EventHubConnection(connectionString, eventHubName, options.ConnectionOptions);
 14306            FullyQualifiedNamespace = connectionStringProperties.Endpoint.Host;
 14307            EventHubName = string.IsNullOrEmpty(eventHubName) ? connectionStringProperties.EventHubName : eventHubName;
 14308            ConsumerGroup = consumerGroup;
 14309            Identifier = string.IsNullOrEmpty(options.Identifier) ? Guid.NewGuid().ToString() : options.Identifier;
 14310            RetryPolicy = options.RetryOptions.ToRetryPolicy();
 14311            Options = options;
 14312            EventBatchMaximumCount = eventBatchMaximumCount;
 313
 314#pragma warning disable CA2214 // Do not call overridable methods in constructors.  The virtual methods are internal and
 14315            LoadBalancer = CreatePartitionLoadBalancer(CreateStorageManager(this), Identifier, ConsumerGroup, FullyQuali
 316#pragma warning restore CA2214 // Do not call overridable methods in constructors
 14317        }
 318
 319        /// <summary>
 320        ///   Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class.
 321        /// </summary>
 322        ///
 323        /// <param name="eventBatchMaximumCount">The desired number of events to include in a batch to be processed.  Th
 324        /// <param name="consumerGroup">The name of the consumer group the processor is associated with.  Events are rea
 325        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel
 326        /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param>
 327        /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma
 328        /// <param name="options">The set of options to use for the processor.</param>
 329        ///
 330        protected EventProcessor(int eventBatchMaximumCount,
 331                                 string consumerGroup,
 332                                 string fullyQualifiedNamespace,
 333                                 string eventHubName,
 334                                 TokenCredential credential,
 172335                                 EventProcessorOptions options = default) : this(eventBatchMaximumCount, consumerGroup, 
 336        {
 148337        }
 338
 339        /// <summary>
 340        ///   Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class.
 341        /// </summary>
 342        ///
 0343        protected EventProcessor()
 344        {
 0345        }
 346
 347        /// <summary>
 348        ///   Signals the <see cref="EventProcessor{TPartition}" /> to begin processing events.  Should this method be c
 349        ///   is running, no action is taken.
 350        /// </summary>
 351        ///
 352        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 353        ///
 354        public virtual async Task StartProcessingAsync(CancellationToken cancellationToken = default) =>
 106355            await StartProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false);
 356
 357        /// <summary>
 358        ///   Signals the <see cref="EventProcessor{TPartition}" /> to begin processing events.  Should this method be c
 359        ///   is running, no action is taken.
 360        /// </summary>
 361        ///
 362        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 363        ///
 364        public virtual void StartProcessing(CancellationToken cancellationToken = default) =>
 14365            StartProcessingInternalAsync(false, cancellationToken).EnsureCompleted();
 366
 367        /// <summary>
 368        ///   Signals the <see cref="EventProcessor{TPartition}" /> to stop processing events.  Should this method be ca
 369        ///   is not running, no action is taken.
 370        /// </summary>
 371        ///
 372        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 373        ///
 374        public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default) =>
 98375            await StopProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false);
 376
 377        /// <summary>
 378        ///   Signals the <see cref="EventProcessor{TPartition}" /> to stop processing events.  Should this method be ca
 379        ///   is not running, no action is taken.
 380        /// </summary>
 381        ///
 382        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 383        ///
 384        public virtual void StopProcessing(CancellationToken cancellationToken = default) =>
 30385            StopProcessingInternalAsync(false, cancellationToken).EnsureCompleted();
 386
 387        /// <summary>
 388        ///   Determines whether the specified <see cref="System.Object" /> is equal to this instance.
 389        /// </summary>
 390        ///
 391        /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param>
 392        ///
 393        /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c>
 394        ///
 395        [EditorBrowsable(EditorBrowsableState.Never)]
 0396        public override bool Equals(object obj) => base.Equals(obj);
 397
 398        /// <summary>
 399        ///   Returns a hash code for this instance.
 400        /// </summary>
 401        ///
 402        /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha
 403        ///
 404        [EditorBrowsable(EditorBrowsableState.Never)]
 0405        public override int GetHashCode() => base.GetHashCode();
 406
 407        /// <summary>
 408        ///   Converts the instance to string representation.
 409        /// </summary>
 410        ///
 411        /// <returns>A <see cref="System.String" /> that represents this instance.</returns>
 412        ///
 413        [EditorBrowsable(EditorBrowsableState.Never)]
 8414        public override string ToString() => $"Event Processor<{ typeof(TPartition).Name }>: { Identifier }";
 415
 416        /// <summary>
 417        ///   Creates an <see cref="TransportConsumer" /> to use for processing.
 418        /// </summary>
 419        ///
 420        /// <param name="consumerGroup">The consumer group to associate with the consumer.</param>
 421        /// <param name="partitionId">The partition to associated with the consumer.</param>
 422        /// <param name="eventPosition">The position in the event stream where the consumer should begin reading.</param
 423        /// <param name="connection">The connection to use for the consumer.</param>
 424        /// <param name="options">The options to use for configuring the consumer.</param>
 425        ///
 426        /// <returns>An <see cref="TransportConsumer" /> with the requested configuration.</returns>
 427        ///
 428        internal virtual TransportConsumer CreateConsumer(string consumerGroup,
 429                                                          string partitionId,
 430                                                          EventPosition eventPosition,
 431                                                          EventHubConnection connection,
 432                                                          EventProcessorOptions options) =>
 2433            connection.CreateTransportConsumer(consumerGroup, partitionId, eventPosition, options.RetryOptions.ToRetryPo
 434
 435        /// <summary>
 436        ///   Creates a <see cref="StorageManager" /> to use for interacting with durable storage.
 437        /// </summary>
 438        ///
 439        /// <param name="instance">The <see cref="EventProcessor{TPartition}" /> instance to associate with the storage 
 440        ///
 441        /// <returns>A <see cref="StorageManager" /> with the requested configuration.</returns>
 442        ///
 166443        internal virtual StorageManager CreateStorageManager(EventProcessor<TPartition> instance) => new DelegatingStora
 444
 445        /// <summary>
 446        ///   Creates a <see cref="PartitionLoadBalancer"/> for managing partition ownership for the event processor.
 447        /// </summary>
 448        ///
 449        /// <param name="storageManager">Responsible for managing persistence of the partition ownership data.</param>
 450        /// <param name="identifier">The identifier of the event processor associated with the load balancer.</param>
 451        /// <param name="consumerGroup">The name of the consumer group this load balancer is associated with.</param>
 452        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace that the processor is associa
 453        /// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
 454        /// <param name="ownershipExpiration">The minimum amount of time for an ownership to be considered expired witho
 455        ///
 456        internal virtual PartitionLoadBalancer CreatePartitionLoadBalancer(StorageManager storageManager,
 457                                                                           string identifier,
 458                                                                           string consumerGroup,
 459                                                                           string fullyQualifiedNamespace,
 460                                                                           string eventHubName,
 461                                                                           TimeSpan ownershipExpiration) =>
 162462            new PartitionLoadBalancer(storageManager, identifier, consumerGroup, fullyQualifiedNamespace, eventHubName, 
 463
 464        /// <summary>
 465        ///   Performs the tasks needed to process a batch of events.
 466        /// </summary>
 467        ///
 468        /// <param name="partition">The Event Hub partition whose processing should be started.</param>
 469        /// <param name="eventBatch">The batch of events to process.</param>
 470        /// <param name="dispatchEmptyBatches"><c>true</c> if empty batches should be dispatched to the handler; otherwi
 471        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 472        ///
 473        internal virtual async Task ProcessEventBatchAsync(TPartition partition,
 474                                                           IReadOnlyList<EventData> eventBatch,
 475                                                           bool dispatchEmptyBatches,
 476                                                           CancellationToken cancellationToken)
 477        {
 3029992478            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 479
 480            // If there were no events in the batch and empty batches should not be emitted,
 481            // take no further action.
 482
 3029962483            if (((eventBatch == null) || (eventBatch.Count <= 0)) && (!dispatchEmptyBatches))
 484            {
 2485                return;
 486            }
 487
 488            // Create the diagnostics scope used for distributed tracing and instrument the events in the batch.
 489
 3029960490            using var diagnosticScope = EventDataInstrumentation.ScopeFactory.CreateScope(DiagnosticProperty.EventProces
 3029960491            diagnosticScope.AddAttribute(DiagnosticProperty.KindAttribute, DiagnosticProperty.ConsumerKind);
 3029960492            diagnosticScope.AddAttribute(DiagnosticProperty.EventHubAttribute, EventHubName);
 3029960493            diagnosticScope.AddAttribute(DiagnosticProperty.EndpointAttribute, FullyQualifiedNamespace);
 494
 3029960495            if ((diagnosticScope.IsEnabled) && (eventBatch.Any()))
 496            {
 20497                foreach (var eventData in eventBatch)
 498                {
 6499                    if (EventDataInstrumentation.TryExtractDiagnosticId(eventData, out string diagnosticId))
 500                    {
 6501                        var attributes = new Dictionary<string, string>(1)
 6502                        {
 6503                            { DiagnosticProperty.EnqueuedTimeAttribute, eventData.EnqueuedTime.ToUnixTimeMilliseconds().
 6504                        };
 505
 6506                        diagnosticScope.AddLink(diagnosticId, attributes);
 507                    }
 508                }
 509            }
 510
 3029960511            diagnosticScope.Start();
 512
 513            // Dispatch the batch to the handler for processing.  Exceptions in the handler code are intended to be
 514            // unhandled by the processor; explicitly signal that the exception was observed in developer-provided
 515            // code.
 516
 517            try
 518            {
 3029960519                await OnProcessingEventBatchAsync(eventBatch, partition, cancellationToken).ConfigureAwait(false);
 3029958520            }
 2521            catch (Exception ex)
 522            {
 2523                diagnosticScope.Failed(ex);
 2524                throw new DeveloperCodeException(ex);
 525            }
 3029960526        }
 527
 528        /// <summary>
 529        ///   Creates the infrastructure for tracking the processing of a partition and begins processing the
 530        ///   partition in the background until cancellation is requested.
 531        /// </summary>
 532        ///
 533        /// <param name="partition">The Event Hub partition whose processing should be started.</param>
 534        /// <param name="startingPosition">The position within the event stream that processing should begin.</param>
 535        /// <param name="cancellationSource">A <see cref="CancellationTokenSource"/> instance to signal the request to c
 536        ///
 537        /// <returns>The <see cref="PartitionProcessor" /> encapsulating the processing task, its cancellation token, an
 538        ///
 539        /// <remarks>
 540        ///   This method makes liberal use of class methods and state in addition to the received parameters.
 541        /// </remarks>
 542        ///
 543        internal virtual PartitionProcessor CreatePartitionProcessor(TPartition partition,
 544                                                                     EventPosition startingPosition,
 545                                                                     CancellationTokenSource cancellationSource)
 546        {
 80547            cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>();
 78548            var consumer = default(TransportConsumer);
 549
 550            // If the tracking of the last enqueued event properties was requested, then read the
 551            // properties from the active consumer, which can change during processing in the event of
 552            // error scenarios.
 553
 554            LastEnqueuedEventProperties readLastEnquedEventInformation()
 555            {
 556                // This is not an expected scenario; the guard exists to prevent a race condition that is
 557                // unlikely, but possible, when partition processing is being stopped or consumer creation
 558                // outright failed.
 559
 8560                if ((consumer == null) || (consumer.IsClosed))
 561                {
 2562                    Argument.AssertNotClosed(true, Resources.ClientNeededForThisInformationNotAvailable);
 563                }
 564
 6565                return new LastEnqueuedEventProperties(consumer.LastReceivedEvent);
 566            }
 567
 568            // Define the routine to handle processing for the partition.
 569
 570            async Task performProcessing()
 571            {
 78572                cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>();
 573
 72574                var connection = default(EventHubConnection);
 72575                var retryDelay = default(TimeSpan?);
 72576                var capturedException = default(Exception);
 72577                var eventBatch = default(IReadOnlyList<EventData>);
 72578                var lastEvent = default(EventData);
 72579                var failedAttemptCount = 0;
 72580                var failedConsumerCount = 0;
 581
 582                // Create the connection to be used for spawning consumers; if the creation
 583                // fails, then consider the processing task to be failed.  The main processing
 584                // loop will take responsibility for attempting to restart or relinquishing ownership.
 585
 586                try
 587                {
 72588                    connection = CreateConnection();
 70589                }
 2590                catch (Exception ex)
 591                {
 592                    // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibil
 593                    // for observing or surfacing exceptions that may occur in the handler.
 594
 2595                    _ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationReadEvents, CancellationToken.Non
 2596                    Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, Consu
 597
 2598                    throw;
 599                }
 600
 70601                await using var connectionAwaiter = connection.ConfigureAwait(false);
 602
 603                // Continue processing the partition until cancellation is signaled or until the count of failed consume
 604                // Consumers which been consistently unable to receive and process events will be considered invalid and
 605
 122606                while ((!cancellationSource.IsCancellationRequested) && (failedConsumerCount <= MaximumFailedConsumerCou
 607                {
 608                    try
 609                    {
 90610                        consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, startingPosition, connection, Op
 611
 612                        // Allow the core dispatching loop to apply an additional set of retries over any provided by th
 613                        // itself, as a processor should be as resilient as possible and retain partition ownership if p
 614                        // able to make forward progress.  If the retries are exhausted or a non-retriable exception occ
 615                        // consumer will be considered invalid and potentially refreshed.
 616
 3044180617                        while (!cancellationSource.IsCancellationRequested)
 618                        {
 619                            try
 620                            {
 3044156621                                eventBatch = await consumer.ReceiveAsync(EventBatchMaximumCount, Options.MaximumWaitTime
 3044126622                                await ProcessEventBatchAsync(partition, eventBatch, Options.MaximumWaitTime.HasValue, ca
 623
 624                                // If the batch was successfully processed, capture the last event as the current starti
 625                                // event that the consumer becomes invalid and needs to be replaced.
 626
 3044086627                                lastEvent = (eventBatch != null && eventBatch.Count > 0) ? eventBatch[eventBatch.Count -
 628
 3044086629                                if ((lastEvent != null) && (lastEvent.Offset != long.MinValue))
 630                                {
 2631                                    startingPosition = EventPosition.FromOffset(lastEvent.Offset, false);
 632                                }
 633
 634                                // If event batches are successfully processed, then the need for forward progress is
 635                                // satisfied, and the failure counts should reset.
 636
 3044086637                                failedAttemptCount = 0;
 3044086638                                failedConsumerCount = 0;
 3044086639                            }
 30640                            catch (TaskCanceledException) when (cancellationSource.IsCancellationRequested)
 641                            {
 642                                // Do not log; this is an expected scenario when partition processing is asked to stop.
 643
 28644                                throw;
 645                            }
 42646                            catch (Exception ex) when (ex.IsNotType<DeveloperCodeException>())
 647                            {
 648                                // The error handler is invoked as a fire-and-forget task; the processor does not assume
 649                                // for observing or surfacing exceptions that may occur in the handler.
 650
 36651                                _ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationReadEvents, Cancellat
 652
 36653                                Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHu
 36654                                retryDelay = RetryPolicy.CalculateRetryDelay(ex, ++failedAttemptCount);
 655
 36656                                if (!retryDelay.HasValue)
 657                                {
 658                                    // If the exception should not be retried, then allow it to pass to the outer loop; 
 659                                    // to prevent being stuck in a corrupt state where the consumer is unable to read ev
 660
 32661                                    throw;
 662                                }
 663
 4664                                await Task.Delay(retryDelay.Value, cancellationSource.Token).ConfigureAwait(false);
 665                            }
 666                        }
 24667                    }
 32668                    catch (OperationCanceledException ex)
 669                    {
 32670                        throw new TaskCanceledException(ex.Message, ex);
 671                    }
 6672                    catch (DeveloperCodeException ex)
 673                    {
 674                        // Record that an exception was observed in developer-provided code, but consider it fatal and t
 675                        // steps to handle or dispatch it.
 676
 6677                        var message = string.Format(CultureInfo.InvariantCulture, Resources.DeveloperCodeExceptionMessag
 6678                        Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, C
 679
 6680                        ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
 0681                    }
 28682                    catch (Exception ex) when (ex.IsFatalException())
 683                    {
 0684                        throw;
 685                    }
 28686                    catch (Exception ex)
 687                    {
 28688                        ++failedConsumerCount;
 28689                        capturedException = ex;
 28690                    }
 691                    finally
 692                    {
 693                        try
 694                        {
 90695                            if (consumer != null)
 696                            {
 86697                                await consumer.CloseAsync(CancellationToken.None).ConfigureAwait(false);
 698                            }
 88699                        }
 2700                        catch (Exception ex)
 701                        {
 2702                            Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubNam
 703
 704                            // Do not bubble the exception, as the consumer is being refreshed; failure to close this co
 2705                        }
 706                    }
 707                }
 708
 709                // If there was an exception captured, then surface it.  Otherwise signal that cancellation took place.
 710
 32711                if (capturedException != null)
 712                {
 18713                    ExceptionDispatchInfo.Capture(capturedException).Throw();
 714                }
 715
 14716                throw new TaskCanceledException();
 0717            }
 718
 719            // Start processing in the background and return the processor
 720            // metadata.
 721
 78722            return new PartitionProcessor
 78723            (
 78724                Task.Run(performProcessing),
 78725                partition,
 78726                readLastEnquedEventInformation,
 78727                cancellationSource
 78728            );
 729        }
 730
 731        /// <summary>
 732        ///   Creates an <see cref="EventHubConnection" /> to use for communicating with the Event Hubs service.
 733        /// </summary>
 734        ///
 735        /// <returns>The requested <see cref="EventHubConnection" />.</returns>
 736        ///
 10737        protected internal virtual EventHubConnection CreateConnection() => ConnectionFactory();
 738
 739        /// <summary>
 740        ///   Produces a list of the available checkpoints for the Event Hub and consumer group associated with the
 741        ///   event processor instance, so that processing for a given set of partitions can be properly initialized.
 742        /// </summary>
 743        ///
 744        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 745        ///
 746        /// <returns>The set of checkpoints for the processor to take into account when initializing partitions.</return
 747        ///
 748        /// <remarks>
 749        ///   Should a partition not have a corresponding checkpoint, the <see cref="EventProcessorOptions.DefaultStarti
 750        ///   be used to initialize the partition for processing.
 751        ///
 752        ///   In the event that a custom starting point is desired for a single partition, or each partition should star
 753        ///   it is recommended that this method express that intent by returning checkpoints for those partitions with 
 754        ///   starting location set.
 755        /// </remarks>
 756        ///
 757        protected abstract Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken cancellati
 758
 759        /// <summary>
 760        ///   Produces a list of the ownership assignments for partitions between each of the cooperating event processo
 761        ///   instances for a given Event Hub and consumer group pairing.  This method is used when load balancing to al
 762        ///   the processor to discover other active collaborators and to make decisions about how to best balance work
 763        ///   between them.
 764        /// </summary>
 765        ///
 766        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 767        ///
 768        /// <returns>The set of ownership records to take into account when making load balancing decisions.</returns>
 769        ///
 770        protected abstract Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken canc
 771
 772        /// <summary>
 773        ///   Attempts to claim ownership of the specified partitions for processing.  This method is used by
 774        ///   load balancing to allow event processor instances to distribute the responsibility for processing
 775        ///   partitions for a given Event Hub and consumer group pairing amongst the active event processors.
 776        /// </summary>
 777        ///
 778        /// <param name="desiredOwnership">The set of partition ownership desired by the event processor instance; this 
 779        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 780        ///
 781        /// <returns>The set of ownership records for the partitions that were successfully claimed; this is expected to
 782        ///
 783        protected abstract Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProc
 784                                                                                                   CancellationToken can
 785
 786        /// <summary>
 787        ///   Performs the tasks needed to process a batch of events for a given partition as they are read from the Eve
 788        /// </summary>
 789        ///
 790        /// <param name="events">The batch of events to be processed.</param>
 791        /// <param name="partition">The context of the partition from which the events were read.</param>
 792        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 793        ///
 794        /// <remarks>
 795        ///   <para>The number of events in the <paramref name="events"/> batch may vary.  The batch will contain a numb
 796        ///   requested when the processor was created, depending on the availability of events in the partition within 
 797        ///   interval.
 798        ///
 799        ///   If there are enough events available in the Event Hub partition to fill a batch of the requested size, the
 800        ///   immediately.  If there were not a sufficient number of events available in the partition to populate a ful
 801        ///   to reach the requested batch size until the <see cref="EventProcessorOptions.MaximumWaitTime"/> has elapse
 802        ///   available by the end of that period.
 803        ///
 804        ///   If a <see cref="EventProcessorOptions.MaximumWaitTime"/> was not requested, indicated by setting the optio
 805        ///   partition until a full batch of the requested size could be populated and will not dispatch any partial ba
 806        ///
 807        ///   <para>Should an exception occur within the code for this method, the event processor will allow it to bubb
 808        ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account and guard agai
 809        ///
 810        ///   <para>It is not recommended that the state of the processor be managed directly from within this method; r
 811        ///   a deadlock scenario, especially if using the synchronous form of the call.</para>
 812        /// </remarks>
 813        ///
 814        protected abstract Task OnProcessingEventBatchAsync(IEnumerable<EventData> events,
 815                                                            TPartition partition,
 816                                                            CancellationToken cancellationToken);
 817
 818        /// <summary>
 819        ///   Performs the tasks needed when an unexpected exception occurs within the operation of the
 820        ///   event processor infrastructure.
 821        /// </summary>
 822        ///
 823        /// <param name="exception">The exception that occurred during operation of the event processor.</param>
 824        /// <param name="partition">The context of the partition associated with the error, if any; otherwise, <c>null</
 825        /// <param name="operationDescription">A short textual description of the operation during which the exception o
 826        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 827        ///
 828        /// <remarks>
 829        ///   This error handler is invoked when there is an exception observed within the event processor itself; it is
 830        ///   code that has been implemented to process events or other overrides and extension points that are not crit
 831        ///   The event processor will make every effort to recover from exceptions and continue processing.  Should an 
 832        ///   from be encountered, the processor will attempt to forfeit ownership of all partitions that it was process
 833        ///
 834        ///   The exceptions surfaced to this method may be fatal or non-fatal; because the processor may not be able to
 835        ///   exception was fatal or whether its state was corrupted, this method has responsibility for making the dete
 836        ///   should be terminated or restarted.  The method may do so by calling Stop on the processor instance and the
 837        ///
 838        ///   It is recommended that, for production scenarios, the decision be made by considering observations made by
 839        ///   when initializing processing for a partition, and the method invoked when processing for a partition is st
 840        ///   data from their monitoring platforms in this decision as well.
 841        ///
 842        ///   As with event processing, should an exception occur in the code for the error handler, the event processor
 843        ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account and guard agai
 844        /// </remarks>
 845        ///
 846        protected abstract Task OnProcessingErrorAsync(Exception exception,
 847                                                       TPartition partition,
 848                                                       string operationDescription,
 849                                                       CancellationToken cancellationToken);
 850
 851        /// <summary>
 852        ///   Performs the tasks to initialize a partition, and its associated context, for event processing.
 853        /// </summary>
 854        ///
 855        /// <param name="partition">The context of the partition being initialized.  Only the well-known members of the 
 856        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 857        ///
 858        /// <remarks>
 859        ///   It is not recommended that the state of the processor be managed directly from within this method; request
 860        ///   a deadlock scenario, especially if using the synchronous form of the call.
 861        /// </remarks>
 862        ///
 863        protected virtual Task OnInitializingPartitionAsync(TPartition partition,
 42864                                                            CancellationToken cancellationToken) => Task.CompletedTask;
 865
 866        /// <summary>
 867        ///   Performs the tasks needed when processing for a partition is being stopped.  This commonly occurs when the
 868        ///   is claimed by another event processor instance or when the current event processor instance is shutting do
 869        /// </summary>
 870        ///
 871        /// <param name="partition">The context of the partition for which processing is being stopped.</param>
 872        /// <param name="reason">The reason that processing is being stopped for the partition.</param>
 873        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 874        ///
 875        /// <remarks>
 876        ///   It is not recommended that the state of the processor be managed directly from within this method; request
 877        ///   a deadlock scenario, especially if using the synchronous form of the call.
 878        /// </remarks>
 879        ///
 880        protected virtual Task OnPartitionProcessingStoppedAsync(TPartition partition,
 881                                                                 ProcessingStoppedReason reason,
 42882                                                                 CancellationToken cancellationToken) => Task.CompletedT
 883
 884        /// <summary>
 885        ///   A set of information about the last enqueued event of a partition, as observed by the associated EventHubs
 886        ///   associated with this context as events are received from the Event Hubs service.  This is only available i
 887        ///   created with <see cref="ReadEventOptions.TrackLastEnqueuedEventProperties" /> set.
 888        /// </summary>
 889        ///
 890        /// <param name="partitionId">The identifier of the Event Hub partition to read the properties from.</param>
 891        ///
 892        /// <returns>The set of properties for the last event that was enqueued to the partition.</returns>
 893        ///
 894        /// <remarks>
 895        ///   When information about the partition's last enqueued event is being tracked, each event received from the 
 896        ///   service will carry metadata about the partition that it otherwise would not. This results in a small amoun
 897        ///   additional network bandwidth consumption that is generally a favorable trade-off when considered
 898        ///   against periodically making requests for partition properties using an Event Hub client.
 899        /// </remarks>
 900        ///
 901        /// <exception cref="InvalidOperationException">Occurs when this method is invoked without <see cref="EventProce
 902        ///
 903        protected virtual LastEnqueuedEventProperties ReadLastEnqueuedEventProperties(string partitionId)
 904        {
 4905            if (!ActivePartitionProcessors.TryGetValue(partitionId, out var processor))
 906            {
 2907                Argument.AssertNotClosed(true, Resources.ClientNeededForThisInformationNotAvailable);
 908            }
 909
 2910            return processor.ReadLastEnqueuedEventProperties();
 911        }
 912
 913        /// <summary>
 914        ///   Signals the <see cref="EventProcessor{TPartition}" /> to begin processing events. Should this method be ca
 915        /// </summary>
 916        ///
 917        /// <param name="async">When <c>true</c>, the method will be executed asynchronously; otherwise, it will execute
 918        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 919        ///
 920        private async Task StartProcessingInternalAsync(bool async,
 921                                                        CancellationToken cancellationToken)
 922        {
 120923            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 116924            Logger.EventProcessorStart(Identifier, EventHubName, ConsumerGroup);
 925
 116926            var releaseGuard = false;
 927
 928            try
 929            {
 930                // Acquire the semaphore used to synchronize processor starts and stops, respecting
 931                // the async flag.  When this is held, the state of the processor is stable.
 932
 116933                if (async)
 934                {
 104935                    await ProcessorRunningGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
 936                }
 937                else
 938                {
 12939                    ProcessorRunningGuard.Wait(cancellationToken);
 940                }
 941
 112942                releaseGuard = true;
 112943                _statusOverride = EventProcessorStatus.Starting;
 944
 945                // If the processor is already running, then it was started before the
 946                // semaphore was acquired; there is no work to be done.
 947
 112948                if (_runningProcessorTask != null)
 949                {
 8950                    return;
 951                }
 952
 953                // There should be no cancellation source, but guard against leaking resources in the
 954                // event of a processing crash or other exception.
 955
 104956                _runningProcessorCancellationSource?.Cancel();
 104957                _runningProcessorCancellationSource?.Dispose();
 104958                _runningProcessorCancellationSource = new CancellationTokenSource();
 959
 960                // Start processing events.
 961
 104962                ActivePartitionProcessors.Clear();
 104963                _runningProcessorTask = RunProcessingAsync(_runningProcessorCancellationSource.Token);
 104964            }
 4965            catch (OperationCanceledException ex)
 966            {
 4967                Logger.EventProcessorStartError(Identifier, EventHubName, ConsumerGroup, ex.Message);
 4968                throw new TaskCanceledException();
 969            }
 0970            catch (Exception ex)
 971            {
 0972                Logger.EventProcessorStartError(Identifier, EventHubName, ConsumerGroup, ex.Message);
 0973                throw;
 974            }
 975            finally
 976            {
 116977                _statusOverride = null;
 116978                Logger.EventProcessorStartComplete(Identifier, EventHubName, ConsumerGroup);
 979
 980                // If the cancellation token was signaled during the attempt to acquire the
 981                // semaphore, it cannot be safely released; ensure that it is held.
 982
 116983                if (releaseGuard)
 984                {
 112985                    ProcessorRunningGuard.Release();
 986                }
 987            }
 112988        }
 989
 990        /// <summary>
 991        ///   Signals the <see cref="EventProcessor{TPartition}" /> to stop processing events. Should this method be cal
 992        /// </summary>
 993        ///
 994        /// <param name="async">When <c>true</c>, the method will be executed asynchronously; otherwise, it will execute
 995        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 996        ///
 997        private async Task StopProcessingInternalAsync(bool async,
 998                                                       CancellationToken cancellationToken)
 999        {
 1281000            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 1201001            Logger.EventProcessorStop(Identifier, EventHubName, ConsumerGroup);
 1002
 1201003            var processingException = default(Exception);
 1201004            var releaseGuard = false;
 1005
 1006            try
 1007            {
 1008                // Acquire the semaphore used to synchronize processor starts and stops, respecting
 1009                // the async flag.  When this is held, the state of the processor is stable.
 1010
 1201011                if (async)
 1012                {
 921013                    await ProcessorRunningGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
 1014                }
 1015                else
 1016                {
 281017                    ProcessorRunningGuard.Wait(cancellationToken);
 1018                }
 1019
 1161020                releaseGuard = true;
 1161021                _statusOverride = EventProcessorStatus.Stopping;
 1022
 1023                // If the processor is not running, then it was never started or has been stopped
 1024                // before the semaphore was acquired; there is no work to be done.
 1025
 1161026                if (_runningProcessorTask == null)
 1027                {
 161028                    return;
 1029                }
 1030
 1031                // Request cancellation of the running processor task.
 1032
 1001033                _runningProcessorCancellationSource?.Cancel();
 1001034                _runningProcessorCancellationSource?.Dispose();
 1001035                _runningProcessorCancellationSource = null;
 1036
 1037                // Allow processing to complete.  If there was a processing or load balancing error,
 1038                // awaiting the task is where it will be surfaced.  Be sure to preserve it so
 1039                // that it can be surfaced.
 1040
 1041                try
 1042                {
 1001043                    if (async)
 1044                    {
 821045                        await _runningProcessorTask.ConfigureAwait(false);
 1046                    }
 1047                    else
 1048                    {
 1049#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi
 181050                        _runningProcessorTask.GetAwaiter().GetResult();
 1051#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi
 1052                    }
 01053                }
 841054                catch (TaskCanceledException)
 1055                {
 1056                    // This is expected; no action is needed.
 841057                }
 161058                catch (Exception ex)
 1059                {
 1060                    // Preserve the exception to surface once the tasks needed to fully stop are complete;
 1061                    // logging and invoking of the error handler will have already taken place.
 1062
 161063                    processingException = ex;
 161064                }
 1065
 1066                // With the processing task having completed, perform the necessary cleanup of partition processing task
 1067                // and surrender ownership.
 1068
 1001069                var stopPartitionProcessingTasks = ActivePartitionProcessors.Keys
 1381070                    .Select(partitionId => TryStopProcessingPartitionAsync(partitionId, ProcessingStoppedReason.Shutdown
 1001071                    .ToArray();
 1072
 1001073                if (async)
 1074                {
 821075                    await Task.WhenAll(stopPartitionProcessingTasks).ConfigureAwait(false);
 821076                    await LoadBalancer.RelinquishOwnershipAsync(CancellationToken.None).ConfigureAwait(false);
 1077                }
 1078                else
 1079                {
 181080                    Task.WaitAll(stopPartitionProcessingTasks);
 1081
 1082#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi
 181083                    LoadBalancer.RelinquishOwnershipAsync(CancellationToken.None).GetAwaiter().GetResult();
 1084#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi
 1085                }
 1086
 1001087                ActivePartitionProcessors.Clear();
 1088
 1089                // Dispose of the processing task and reset processing state to
 1090                // allow the processor to be restarted when this method completes.
 1091
 1001092                _runningProcessorTask.Dispose();
 1001093                _runningProcessorTask = null;
 1001094            }
 41095            catch (OperationCanceledException ex)
 1096            {
 41097                Logger.EventProcessorStopError(Identifier, EventHubName, ConsumerGroup, ex.Message);
 41098                throw new TaskCanceledException();
 1099            }
 01100            catch (Exception ex)
 1101            {
 01102                Logger.EventProcessorStopError(Identifier, EventHubName, ConsumerGroup, ex.Message);
 01103                throw;
 1104            }
 1105            finally
 1106            {
 1201107                _statusOverride = null;
 1201108                Logger.EventProcessorStopComplete(Identifier, EventHubName, ConsumerGroup);
 1109
 1110                // If the cancellation token was signaled during the attempt to acquire the
 1111                // semaphore, it cannot be safely released; ensure that it is held.
 1112
 1201113                if (releaseGuard)
 1114                {
 1161115                    ProcessorRunningGuard.Release();
 1116                }
 1117            }
 1118
 1119            // Surface any exception that was captured when the processing task was
 1120            // initially awaited.
 1121
 1001122            if (processingException != default)
 1123            {
 161124                ExceptionDispatchInfo.Capture(processingException).Throw();
 1125            }
 1001126        }
 1127
 1128        /// <summary>
 1129        ///   Performs the tasks needed to execute processing for this <see cref="EventProcessor{TPartition}" /> instanc
 1130        ///   load balancing between associated processors.
 1131        /// </summary>
 1132        ///
 1133        /// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal the request to cancel 
 1134        ///
 1135        private async Task RunProcessingAsync(CancellationToken cancellationToken)
 1136        {
 1041137            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 1138
 1139            try
 1140            {
 1041141                var connection = CreateConnection();
 881142                await using var connectionAwaiter = connection.ConfigureAwait(false);
 1143
 1144                ValueStopwatch cycleDuration;
 881145                var partitionIds = default(string[]);
 1146
 1081147                while (!cancellationToken.IsCancellationRequested)
 1148                {
 1081149                    cycleDuration = ValueStopwatch.StartNew();
 1150
 1151                    try
 1152                    {
 1081153                        partitionIds = await connection.GetPartitionIdsAsync(RetryPolicy, cancellationToken).ConfigureAw
 941154                    }
 141155                    catch (Exception ex) when (ex.IsNotType<TaskCanceledException>())
 1156                    {
 1157                        // Logging for exceptions with the service operation are responsibility of the connection.
 1158
 141159                        _ = InvokeOnProcessingErrorAsync(ex, null, Resources.OperationGetPartitionIds, CancellationToken
 141160                        partitionIds = default;
 141161                    }
 1162
 1081163                    var remainingTimeUntilNextCycle = await PerformLoadBalancingAsync(cycleDuration, partitionIds, cance
 1164
 1081165                    if (remainingTimeUntilNextCycle != TimeSpan.Zero)
 1166                    {
 1061167                        await Task.Delay(remainingTimeUntilNextCycle, cancellationToken).ConfigureAwait(false);
 1168                    }
 1169                }
 1170
 1171                // Cancellation has been requested; throw the corresponding exception to maintain consistent behavior.
 1172
 01173                throw new TaskCanceledException();
 01174            }
 841175            catch (OperationCanceledException ex)
 1176            {
 841177                throw new TaskCanceledException(ex.Message, ex);
 1178            }
 161179            catch (Exception ex) when (ex.IsFatalException())
 1180            {
 01181                throw;
 1182            }
 161183            catch (Exception ex)
 1184            {
 1185                // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
 1186                // for observing or surfacing exceptions that may occur in the handler.
 1187
 161188                _ = InvokeOnProcessingErrorAsync(ex, null, Resources.OperationEventProcessingLoop, CancellationToken.Non
 161189                Logger.EventProcessorTaskError(Identifier, EventHubName, ConsumerGroup, ex.Message);
 1190
 161191                throw;
 1192            }
 01193        }
 1194
 1195        /// <summary>
 1196        ///   Performs the tasks needed for a single cycle of load balancing.
 1197        /// </summary>
 1198        ///
 1199        /// <param name="cycleDuration">Responsible for tracking the duration of this cycle.  It is expected that caller
 1200        /// <param name="partitionIds">The valid set of partition identifiers for the Event Hub to which the processor i
 1201        /// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal the request to cancel 
 1202        ///
 1203        /// <returns>The interval that callers should delay before invoking the next load balancing cycle; <see cref="Ti
 1204        ///
 1205        private async Task<TimeSpan> PerformLoadBalancingAsync(ValueStopwatch cycleDuration,
 1206                                                               string[] partitionIds,
 1207                                                               CancellationToken cancellationToken)
 1208        {
 1081209            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 1210
 1211            // Perform the tasks needed for the load balancing cycle, including managing partition ownership in response
 1212            // lost, or faulted partition processors.
 1213            //
 1214            // Ensure that any errors observed, whether fatal or non-fatal, are surfaced to the exception handler in
 1215            // a fire-and-forget manner.  The processor does not assume responsibility for observing or surfacing except
 1216            // that may occur in the handler, so the associated task is intentionally unobserved.
 1217
 1081218            var claimedOwnership = default(EventProcessorPartitionOwnership);
 1219
 1081220            if ((partitionIds != default) && (partitionIds.Length > 0))
 1221            {
 1222                try
 1223                {
 581224                    claimedOwnership = await LoadBalancer.RunLoadBalancingAsync(partitionIds, cancellationToken).Configu
 501225                }
 1226                catch (EventHubsException ex)
 61227                    when (Resources.OperationClaimOwnership.Equals(ex.GetFailureOperation(), StringComparison.InvariantC
 1228                {
 1229                    // If a specific partition was associated with the failure, it is carried by a well-known data membe
 1230
 61231                    var partitionId = ex.GetFailureData<string>();
 1232
 61233                    var partition = (partitionId ?? string.Empty) switch
 61234                    {
 141235                        string id when (id.Length == 0) => null,
 121236                        string _ when (ActivePartitionProcessors.TryGetValue(partitionId, out var partitionProcessor)) =
 81237                        _ => new TPartition { PartitionId = partitionId }
 61238                    };
 1239
 61240                    _ = InvokeOnProcessingErrorAsync(ex.InnerException ?? ex, partition, Resources.OperationClaimOwnersh
 61241                    Logger.EventProcessorClaimOwnershipError(Identifier, EventHubName, ConsumerGroup, partitionId, ((ex.
 61242                }
 21243                catch (Exception ex) when (ex.IsNotType<TaskCanceledException>())
 1244                {
 21245                    _ = InvokeOnProcessingErrorAsync(ex, null, Resources.OperationLoadBalancing, CancellationToken.None)
 21246                    Logger.EventProcessorLoadBalancingError(Identifier, EventHubName, ConsumerGroup, ex.Message);
 21247                }
 1248
 1249                // If a partition was claimed, begin processing it if not already being processed.
 1250
 581251                if ((claimedOwnership != default) && (!ActivePartitionProcessors.ContainsKey(claimedOwnership.PartitionI
 1252                {
 381253                    await TryStartProcessingPartitionAsync(claimedOwnership.PartitionId, cancellationToken).ConfigureAwa
 1254                }
 1255            }
 1256
 1257            // Some ownership for some previously claimed partitions may have expired or have been stolen; stop the proc
 1258            // which are no longer owned.
 1259
 1081260            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 1261
 1081262            await Task.WhenAll(ActivePartitionProcessors.Keys
 1081263                .Except(LoadBalancer.OwnedPartitionIds)
 1121264                .Select(partitionId => TryStopProcessingPartitionAsync(partitionId, ProcessingStoppedReason.OwnershipLos
 1081265                .ConfigureAwait(false);
 1266
 1267            // The remaining processing tasks should be running.  To ensure that is the case, validate the status of the
 1268            // and restart processing if it has failed.  It is also possible that task creation failed when the processi
 1269            // in which case there would be no task; create them.
 1270
 1081271            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 1272
 1081273            await Task.WhenAll(LoadBalancer.OwnedPartitionIds
 1081274                .Select(async partitionId =>
 1081275                {
 1621276                    if (!ActivePartitionProcessors.TryGetValue(partitionId, out var partitionProcessor) || partitionProc
 1081277                    {
 1141278                        await TryStopProcessingPartitionAsync(partitionId, ProcessingStoppedReason.OwnershipLost, cancel
 1141279                        await TryStartProcessingPartitionAsync(partitionId, cancellationToken).ConfigureAwait(false);
 1081280                    }
 1621281                }))
 1081282                .ConfigureAwait(false);
 1283
 1284            // If load balancing is greedy and there was a partition claimed, then signal that there should be no delay 
 1285            // invoking the next load balancing cycle.
 1286
 1081287            if ((Options.LoadBalancingStrategy == LoadBalancingStrategy.Greedy) && (!LoadBalancer.IsBalanced))
 1288            {
 21289                return TimeSpan.Zero;
 1290            }
 1291
 1292            // Wait the remaining time, if any, to start the next cycle.
 1293
 1061294            return LoadBalancer.LoadBalanceInterval.CalculateRemaining(cycleDuration.GetElapsedTime());
 1081295        }
 1296
 1297        /// <summary>
 1298        ///   Attempts to begin processing the requested partition in the background and update tracking state
 1299        ///   so that processing can be stopped.
 1300        /// </summary>
 1301        ///
 1302        /// <param name="partitionId">The identifier of the Event Hub partition whose processing should be started.</par
 1303        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 1304        ///
 1305        /// <returns><c>true</c> if processing was successfully started; otherwise, <c>false</c>.</returns>
 1306        ///
 1307        /// <remarks>
 1308        ///   Exceptions encountered in this method will be logged and will result in the error handler being
 1309        ///   invoked.  They will not be surfaced to callers.  This is intended to be a safe operation consumed
 1310        ///   as part of the load balancing cycle, which is failure-tolerant.
 1311        /// </remarks>
 1312        ///
 1313        private async Task<bool> TryStartProcessingPartitionAsync(string partitionId,
 1314                                                                  CancellationToken cancellationToken)
 1315        {
 441316            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 441317            Logger.EventProcessorPartitionProcessingStart(partitionId, Identifier, EventHubName, ConsumerGroup);
 1318
 441319            var partition = new TPartition { PartitionId = partitionId };
 441320            var operationDescription = Resources.OperationClaimOwnership;
 441321            var startingPosition = Options.DefaultStartingPosition;
 441322            var cancellationSource = default(CancellationTokenSource);
 1323
 1324            try
 1325            {
 1326                // Initialize the partition context; the handler is responsible for initialing any custom fields of the 
 1327
 441328                await OnInitializingPartitionAsync(partition, cancellationToken).ConfigureAwait(false);
 1329
 1330                // Query the available checkpoints for the partition.
 1331
 441332                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 441333                operationDescription = Resources.OperationListCheckpoints;
 1334
 441335                var checkpoints = await ListCheckpointsAsync(cancellationToken).ConfigureAwait(false);
 441336                operationDescription = Resources.OperationClaimOwnership;
 1337
 1338                // Determine the starting position for processing the partition.
 1339
 901340                foreach (var checkpoint in checkpoints)
 1341                {
 21342                    if (checkpoint.PartitionId == partitionId)
 1343                    {
 21344                        startingPosition = checkpoint.StartingPosition;
 21345                        break;
 1346                    }
 1347                }
 1348
 1349                // Create and register the partition processor.  Ownership of the cancellationSource is transferred
 1350                // to the processor upon creation, including the responsibility for disposal.
 1351
 441352                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 1353
 441354                cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 441355                var processor = CreatePartitionProcessor(partition, startingPosition, cancellationSource);
 1356
 01357                ActivePartitionProcessors.AddOrUpdate(partitionId, processor, (key, value) => processor);
 401358                cancellationSource = null;
 1359
 401360                return true;
 1361            }
 41362            catch (Exception ex)
 1363            {
 1364                // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
 1365                // for observing or surfacing exceptions that may occur in the handler.
 1366
 41367                _ = InvokeOnProcessingErrorAsync(ex, partition, operationDescription, CancellationToken.None);
 41368                Logger.EventProcessorPartitionProcessingStartError(partitionId, Identifier, EventHubName, ConsumerGroup,
 1369
 41370                cancellationSource?.Cancel();
 41371                cancellationSource?.Dispose();
 41372                return false;
 1373            }
 1374            finally
 1375            {
 441376                Logger.EventProcessorPartitionProcessingStartComplete(partitionId, Identifier, EventHubName, ConsumerGro
 1377            }
 441378        }
 1379
 1380        /// <summary>
 1381        ///   Attempts to stop processing the requested partition.
 1382        /// </summary>
 1383        ///
 1384        /// <param name="partitionId">The identifier of the Event Hub partition whose processing should be stopped.</par
 1385        /// <param name="reason">The reason why the processing is being stopped.</param>
 1386        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 1387        ///
 1388        /// <returns><c>true</c> if the <paramref name="partitionId"/> was owned and was being processed; otherwise, <c>
 1389        ///
 1390        /// <remarks>
 1391        ///   Exceptions encountered when stopping processing for an owned partition will be logged and will result in t
 1392        ///   being invoked.  They will not be surfaced to callers.  This is intended to be a safe operation consumed
 1393        ///   as part of the load balancing cycle, which is failure-tolerant.
 1394        /// </remarks>
 1395        ///
 1396        private async Task<bool> TryStopProcessingPartitionAsync(string partitionId,
 1397                                                                 ProcessingStoppedReason reason,
 1398                                                                 CancellationToken cancellationToken)
 1399        {
 481400            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 481401            Logger.EventProcessorPartitionProcessingStop(partitionId, Identifier, EventHubName, ConsumerGroup);
 1402
 481403            var partition = default(TPartition);
 1404
 1405            try
 1406            {
 1407                // If the partition processor is not being tracked or could not be retrieved from the tracking items,
 1408                // then it cannot be stopped.
 1409
 481410                if (!ActivePartitionProcessors.TryRemove(partitionId, out var partitionProcessor))
 1411                {
 41412                    return false;
 1413                }
 1414
 1415                // Attempt to stop the processor; any exceptions should be treated as a problem with processing, not
 1416                // associated with the attempt to stop.
 1417
 441418                partition = partitionProcessor.Partition;
 1419
 1420                try
 1421                {
 441422                    partitionProcessor.CancellationSource.Cancel();
 441423                    await partitionProcessor.ProcessingTask.ConfigureAwait(false);
 41424                }
 381425                catch (TaskCanceledException)
 1426                {
 1427                    // This is expected; no action is needed.
 381428                }
 21429                catch
 1430                {
 1431                    // The processing task is in a failed state; any logging and dispatching
 1432                    // to the error handler happened in the processing task before the exception
 1433                    // was thrown.  All that remains is to override the reason for stopping.
 1434
 21435                    reason = ProcessingStoppedReason.OwnershipLost;
 21436                }
 1437
 441438                partitionProcessor.Dispose();
 441439                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 1440
 1441                // Notify the handler of the now-closed partition, awaiting completion to allow for a more deterministic
 1442                // for developers where the initialize and stop handlers will fire in a deterministic order and not inte
 1443                //
 1444                // Because the processor does not assume responsibility for observing or surfacing exceptions that may o
 1445                // errors are logged but the error handler is not invoked nor does an exception in the handler constitut
 1446                // processing the partition.  This also aims to prevent an infinite loop scenario where StopProcessing i
 1447                // error handler, which calls the partition stopped handler, which has an exception that again calls the
 1448
 1449                try
 1450                {
 441451                    await OnPartitionProcessingStoppedAsync(partition, reason, cancellationToken).ConfigureAwait(false);
 421452                }
 01453                catch (TaskCanceledException)
 1454                {
 1455                    // This is expected; no action is needed.
 01456                }
 21457                catch (Exception ex)
 1458                {
 21459                    Logger.EventProcessorPartitionProcessingStopError(partitionId, Identifier, EventHubName, ConsumerGro
 21460                }
 1461
 441462                return true;
 1463            }
 01464            catch (Exception ex) when (ex.IsNotType<TaskCanceledException>())
 1465            {
 1466                // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
 1467                // for observing or surfacing exceptions that may occur in the handler.
 1468
 01469                _ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationSurrenderOwnership, CancellationToken
 01470                Logger.EventProcessorPartitionProcessingStopError(partitionId, Identifier, EventHubName, ConsumerGroup, 
 1471
 01472                return false;
 1473            }
 1474            finally
 1475            {
 481476                Logger.EventProcessorPartitionProcessingStopComplete(partitionId, Identifier, EventHubName, ConsumerGrou
 1477            }
 481478        }
 1479
 1480        /// <summary>
 1481        ///   Performs the tasks needed invoke the <see cref="OnProcessingErrorAsync" /> method in the background,
 1482        ///   as it is intended to be a fire-and-forget operation.
 1483        /// </summary>
 1484        ///
 1485        /// <param name="exception">The exception that occurred during operation of the event processor.</param>
 1486        /// <param name="partition">The context of the partition associated with the error, if any; otherwise, <c>null</
 1487        /// <param name="operationDescription">A short textual description of the operation during which the exception o
 1488        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 1489        ///
 1490        private Task InvokeOnProcessingErrorAsync(Exception exception,
 1491                                                  TPartition partition,
 1492                                                  string operationDescription,
 1601493                                                  CancellationToken cancellationToken) => Task.Run(() => OnProcessingErr
 1494
 1495        /// <summary>
 1496        ///   A virtual <see cref="StorageManager" /> instance that delegates calls to the
 1497        ///   <see cref="EventProcessor{TPartition}" /> to which it is associated.
 1498        /// </summary>
 1499        ///
 1500        private class DelegatingStorageManager : StorageManager
 1501        {
 1502            /// <summary>
 1503            ///   The <see cref="EventProcessor{TPartition}" /> that the storage manager is associated with.
 1504            /// </summary>
 1505            ///
 581506            private EventProcessor<TPartition> Processor { get; }
 1507
 1508            /// <summary>
 1509            ///   Initializes a new instance of the <see cref="DelegatingStorageManager"/> class.
 1510            /// </summary>
 1511            ///
 1512            /// <param name="processor">The <see cref="EventProcessor{TPartition}" /> to associate the storage manager w
 1513            ///
 3321514            public DelegatingStorageManager(EventProcessor<TPartition> processor) => Processor = processor;
 1515
 1516            /// <summary>
 1517            ///   Retrieves a complete ownership list from the data store.
 1518            /// </summary>
 1519            ///
 1520            /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associa
 1521            /// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with.  This i
 1522            /// <param name="consumerGroup">The name of the consumer group the ownership are associated with. This is ig
 1523            /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to canc
 1524            ///
 1525            /// <returns>An enumerable containing all the existing ownership for the associated Event Hub and consumer g
 1526            ///
 1527            public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQua
 1528                                                                                                         string eventHub
 1529                                                                                                         string consumer
 21530                                                                                                         CancellationTok
 1531            /// <summary>
 1532            ///   Attempts to claim ownership of partitions for processing.
 1533            /// </summary>
 1534            ///
 1535            /// <param name="partitionOwnership">An enumerable containing all the ownership to claim.</param>
 1536            /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to canc
 1537            ///
 1538            /// <returns>An enumerable containing the successfully claimed ownership.</returns>
 1539            ///
 1540            public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<Ev
 541541                                                                                                          CancellationTo
 1542
 1543            /// <summary>
 1544            ///   Retrieves a list of all the checkpoints in a data store for a given namespace, Event Hub and consumer 
 1545            /// </summary>
 1546            ///
 1547            /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associa
 1548            /// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with. This is
 1549            /// <param name="consumerGroup">The name of the consumer group the checkpoints are associated with. This is 
 1550            /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to canc
 1551            ///
 1552            /// <returns>An enumerable containing all the existing checkpoints for the associated Event Hub and consumer
 1553            ///
 1554            public override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(string fullyQualified
 1555                                                                                                   string eventHubName,
 1556                                                                                                   string consumerGroup,
 21557                                                                                                   CancellationToken can
 1558
 1559            /// <summary>
 1560            ///   This method is not implemented for this type.
 1561            /// </summary>
 1562            ///
 1563            /// <param name="checkpoint">The checkpoint containing the information to be stored.</param>
 1564            /// <param name="eventData">The event to use as the basis for the checkpoint's starting position.</param>
 1565            /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to canc
 1566            ///
 1567            /// <exception cref="NotImplementedException">The method is not implemented for this type.</exception>
 1568            ///
 1569            public override Task UpdateCheckpointAsync(EventProcessorCheckpoint checkpoint,
 1570                                                       EventData eventData,
 21571                                                       CancellationToken cancellationToken) => throw new NotImplementedE
 1572        }
 1573
 1574        /// <summary>
 1575        ///   The set of information needed to track and manage the active processing
 1576        ///   of a partition.
 1577        /// </summary>
 1578        ///
 1579        internal class PartitionProcessor : IDisposable
 1580        {
 1581            /// <summary>The task that is performing the processing.</summary>
 1582            public readonly Task ProcessingTask;
 1583
 1584            /// <summary>The partition that is being processed.</summary>
 1585            public readonly TPartition Partition;
 1586
 1587            /// <summary>The source token that can be used to cancel the processing for the associated <see cref="Proces
 1588            public readonly CancellationTokenSource CancellationSource;
 1589
 1590            /// <summary>A function that can be used to read the information about the last enqueued event of the partit
 1591            public readonly Func<LastEnqueuedEventProperties> ReadLastEnqueuedEventProperties;
 1592
 1593            /// <summary>
 1594            ///   Initializes a new instance of the <see cref="PartitionProcessor"/> class.
 1595            /// </summary>
 1596            ///
 1597            /// <param name="processingTask">The task that is performing the processing.</param>
 1598            /// <param name="partition">The partition that is being processed.</param>
 1599            /// <param name="readLastEnqueuedEventProperties">A function that can be used to read the information about 
 1600            /// <param name="cancellationSource">he source token that can be used to cancel the processing.</param>
 1601            ///
 921602            public PartitionProcessor(Task processingTask,
 921603                                      TPartition partition,
 921604                                      Func<LastEnqueuedEventProperties> readLastEnqueuedEventProperties,
 1841605                                      CancellationTokenSource cancellationSource) => (ProcessingTask, Partition, ReadLas
 1606
 1607            /// <summary>
 1608            ///   Performs tasks needed to clean-up the disposable resources used by the processor.  This method does
 1609            ///   not assume responsibility for signaling the cancellation source or awaiting the <see cref="ProcessingT
 1610            /// </summary>
 1611            ///
 1612            public void Dispose()
 1613            {
 441614                CancellationSource?.Dispose();
 441615                ProcessingTask?.Dispose();
 441616            }
 1617        }
 1618    }
 1619}