< Summary

Class:Azure.Messaging.EventHubs.EventProcessorClient
Assembly:Azure.Messaging.EventHubs.Processor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\src\EventProcessorClient.cs
Covered lines:183
Uncovered lines:22
Coverable lines:205
Total lines:1101
Line coverage:89.2% (183 of 205)
Covered branches:61
Total branches:64
Branch coverage:95.3% (61 of 64)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-100%100%
.ctor(...)-86.89%100%
add_PartitionInitializingAsync(...)-100%100%
remove_PartitionInitializingAsync(...)-100%100%
add_PartitionClosingAsync(...)-100%100%
remove_PartitionClosingAsync(...)-100%100%
add_ProcessEventAsync(...)-100%100%
remove_ProcessEventAsync(...)-100%100%
add_ProcessErrorAsync(...)-100%100%
remove_ProcessErrorAsync(...)-100%100%
get_FullyQualifiedNamespace()-100%100%
get_EventHubName()-100%100%
get_ConsumerGroup()-100%100%
get_IsRunning()-100%100%
set_IsRunning(...)-0%100%
get_Identifier()-100%100%
get_Logger()-0%100%
get_StorageManager()-100%100%
.ctor(...)-50%100%
.ctor(...)-100%100%
.ctor(...)-0%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor()-0%100%
StartProcessingAsync()-86.67%100%
StartProcessing(...)-87.5%100%
StopProcessingAsync(...)-100%100%
StopProcessing(...)-100%100%
Equals(...)-0%100%
GetHashCode()-0%100%
ToString()-0%100%
UpdateCheckpointAsync(...)-100%100%
CreateConnection()-0%100%
ListCheckpointsAsync()-100%100%
ListOwnershipAsync(...)-100%100%
ClaimOwnershipAsync(...)-100%100%
OnProcessingErrorAsync()-100%100%
OnInitializingPartitionAsync()-100%100%
OnPartitionProcessingStoppedAsync()-100%100%
CreateStorageManager(...)-100%100%
ProcessCheckpointStartingPositions()-100%87.5%
EnsureNotRunningAndInvoke(...)-92.31%75%
CreateOptions(...)-100%100%
.ctor(...)-100%100%
ReadLastEnqueuedEventProperties()-0%100%
.ctor(...)-100%100%
ReadLastEnqueuedEventProperties()-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\src\EventProcessorClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Concurrent;
 6using System.Collections.Generic;
 7using System.ComponentModel;
 8using System.Diagnostics.CodeAnalysis;
 9using System.Globalization;
 10using System.Runtime.ExceptionServices;
 11using System.Threading;
 12using System.Threading.Tasks;
 13using Azure.Core;
 14using Azure.Messaging.EventHubs.Consumer;
 15using Azure.Messaging.EventHubs.Core;
 16using Azure.Messaging.EventHubs.Diagnostics;
 17using Azure.Messaging.EventHubs.Primitives;
 18using Azure.Messaging.EventHubs.Processor;
 19using Azure.Messaging.EventHubs.Processor.Diagnostics;
 20using Azure.Storage.Blobs;
 21
 22namespace Azure.Messaging.EventHubs
 23{
 24    /// <summary>
 25    ///   Allows for consuming and processing events across all partitions of a given Event Hub within the scope of a sp
 26    ///   consumer group.  The processor is capable of collaborating with other instances for the same Event Hub and con
 27    ///   group pairing to share work by using a common storage platform to communicate.  Fault tolerance is also built-
 28    ///   allowing the processor to be resilient in the face of errors.
 29    /// </summary>
 30    ///
 31    [SuppressMessage("Usage", "CA1001:Types that own disposable fields should be disposable.", Justification = "Disposal
 32    public class EventProcessorClient : EventProcessor<EventProcessorPartition>
 33    {
 34        /// <summary>The delegate to invoke when attempting to update a checkpoint using an empty event.</summary>
 435        private static readonly Func<CancellationToken, Task> EmptyEventUpdateCheckpoint = cancellationToken => throw ne
 36
 37        /// <summary>The set of default options for the processor.</summary>
 238        private static readonly EventProcessorClientOptions DefaultClientOptions = new EventProcessorClientOptions();
 39
 40        /// <summary>The default starting position for the processor.</summary>
 041        private readonly EventPosition DefaultStartingPosition = new EventProcessorOptions().DefaultStartingPosition;
 42
 43        /// <summary>The set of default starting positions for partitions being processed; these are collected at initia
 044        private readonly ConcurrentDictionary<string, EventPosition> PartitionStartingPositionDefaults = new ConcurrentD
 45
 46        /// <summary>The primitive for synchronizing access during start and set handler operations.</summary>
 047        private readonly SemaphoreSlim ProcessorStatusGuard = new SemaphoreSlim(1, 1);
 48
 49        /// <summary>The handler to be called just before event processing starts for a given partition.</summary>
 50        private Func<PartitionInitializingEventArgs, Task> _partitionInitializingAsync;
 51
 52        /// <summary>The handler to be called once event processing stops for a given partition.</summary>
 53        private Func<PartitionClosingEventArgs, Task> _partitionClosingAsync;
 54
 55        /// <summary>Responsible for processing events received from the Event Hubs service.</summary>
 56        private Func<ProcessEventArgs, Task> _processEventAsync;
 57
 58        /// <summary>Responsible for processing unhandled exceptions thrown while this processor is running.</summary>
 59        private Func<ProcessErrorEventArgs, Task> _processErrorAsync;
 60
 61        /// <summary>
 62        ///    Performs the tasks to initialize a partition, and its associated context, for event processing.
 63        ///
 64        ///   <para>It is not recommended that the state of the processor be managed directly from within this method; r
 65        ///   a deadlock scenario, especially if using the synchronous form of the call.</para>
 66        /// </summary>
 67        ///
 68        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 69        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 70        [SuppressMessage("Design", "CA1065:Do not raise exceptions in unexpected locations", Justification = "Guidelines
 71        public event Func<PartitionInitializingEventArgs, Task> PartitionInitializingAsync
 72        {
 73            add
 74            {
 2075                Argument.AssertNotNull(value, nameof(PartitionInitializingAsync));
 76
 1877                if (_partitionInitializingAsync != default)
 78                {
 279                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 80                }
 81
 3082                EnsureNotRunningAndInvoke(() => _partitionInitializingAsync = value);
 1483            }
 84
 85            remove
 86            {
 687                Argument.AssertNotNull(value, nameof(PartitionInitializingAsync));
 88
 689                if (_partitionInitializingAsync != value)
 90                {
 491                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 92                }
 93
 494                EnsureNotRunningAndInvoke(() => _partitionInitializingAsync = default);
 295            }
 96        }
 97
 98        /// <summary>
 99        ///   Performs the tasks needed when processing for a partition is being stopped.  This commonly occurs when the
 100        ///   the current event processor instance is shutting down.
 101        ///
 102        ///   <para>It is not recommended that the state of the processor be managed directly from within this method; r
 103        ///   a deadlock scenario, especially if using the synchronous form of the call.</para>
 104        /// </summary>
 105        ///
 106        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 107        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 108        [SuppressMessage("Design", "CA1065:Do not raise exceptions in unexpected locations", Justification = "Guidelines
 109        public event Func<PartitionClosingEventArgs, Task> PartitionClosingAsync
 110        {
 111            add
 112            {
 16113                Argument.AssertNotNull(value, nameof(PartitionClosingAsync));
 114
 14115                if (_partitionClosingAsync != default)
 116                {
 2117                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 118                }
 119
 22120                EnsureNotRunningAndInvoke(() => _partitionClosingAsync = value);
 10121            }
 122
 123            remove
 124            {
 6125                Argument.AssertNotNull(value, nameof(PartitionClosingAsync));
 126
 6127                if (_partitionClosingAsync != value)
 128                {
 4129                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 130                }
 131
 4132                EnsureNotRunningAndInvoke(() => _partitionClosingAsync = default);
 2133            }
 134        }
 135
 136        /// <summary>
 137        ///  Performs the tasks needed to process a batch of events for a given partition as they are read from the Even
 138        ///
 139        ///  <para>Should an exception occur within the code for this handler, the <see cref="EventProcessorClient" /> w
 140        ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account, including the
 141        ///   as appropriate.</para>
 142        ///
 143        ///  <para>It is not recommended that the state of the processor be managed directly from within this handler; r
 144        ///   a deadlock scenario, especially if using the synchronous form of the call.</para>
 145        /// </summary>
 146        ///
 147        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 148        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 149        [SuppressMessage("Design", "CA1065:Do not raise exceptions in unexpected locations", Justification = "Guidelines
 150        public event Func<ProcessEventArgs, Task> ProcessEventAsync
 151        {
 152            add
 153            {
 52154                Argument.AssertNotNull(value, nameof(ProcessEventAsync));
 155
 50156                if (_processEventAsync != default)
 157                {
 2158                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 159                }
 160
 96161                EnsureNotRunningAndInvoke(() => _processEventAsync = value);
 48162            }
 163
 164            remove
 165            {
 10166                Argument.AssertNotNull(value, nameof(ProcessEventAsync));
 167
 10168                if (_processEventAsync != value)
 169                {
 4170                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 171                }
 172
 10173                EnsureNotRunningAndInvoke(() => _processEventAsync = default);
 4174            }
 175        }
 176
 177        /// <summary>
 178        ///   Performs the tasks needed when an unexpected exception occurs within the operation of the event processor 
 179        ///
 180        ///   <para>This error handler is invoked when there is an exception observed within the <see cref="EventProcess
 181        ///   code that has been implemented to process events or other event handlers and extension points that execute
 182        ///   make every effort to recover from exceptions and continue processing.  Should an exception that cannot be 
 183        ///   ownership of all partitions that it was processing so that work may be redistributed.</para>
 184        ///
 185        ///   <para>The exceptions surfaced to this method may be fatal or non-fatal; because the processor may not be a
 186        ///   exception was fatal or whether its state was corrupted, this method has responsibility for making the dete
 187        ///   should be terminated or restarted.  The method may do so by calling Stop on the processor instance and the
 188        ///
 189        ///   <para>It is recommended that, for production scenarios, the decision be made by considering observations m
 190        ///   when initializing processing for a partition, and the method invoked when processing for a partition is st
 191        ///   data from their monitoring platforms in this decision as well.</para>
 192        ///
 193        ///   <para>As with event processing, should an exception occur in the code for the error handler, the event pro
 194        ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account and guard agai
 195        /// </summary>
 196        ///
 197        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 198        [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s
 199        [SuppressMessage("Design", "CA1065:Do not raise exceptions in unexpected locations", Justification = "Guidelines
 200        public event Func<ProcessErrorEventArgs, Task> ProcessErrorAsync
 201        {
 202            add
 203            {
 44204                Argument.AssertNotNull(value, nameof(ProcessErrorAsync));
 205
 42206                if (_processErrorAsync != default)
 207                {
 2208                    throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
 209                }
 210
 80211                EnsureNotRunningAndInvoke(() => _processErrorAsync = value);
 40212            }
 213
 214            remove
 215            {
 10216                Argument.AssertNotNull(value, nameof(ProcessErrorAsync));
 217
 10218                if (_processErrorAsync != value)
 219                {
 4220                    throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
 221                }
 222
 10223                EnsureNotRunningAndInvoke(() => _processErrorAsync = default);
 4224            }
 225        }
 226
 227        /// <summary>
 228        ///   The fully qualified Event Hubs namespace that the processor is associated with.  This is likely
 229        ///   to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
 230        /// </summary>
 231        ///
 232        public new string FullyQualifiedNamespace
 233        {
 32234            get => base.FullyQualifiedNamespace;
 235        }
 236
 237        /// <summary>
 238        ///   The name of the Event Hub that the processor is connected to, specific to the
 239        ///   Event Hubs namespace that contains it.
 240        /// </summary>
 241        ///
 242        public new string EventHubName
 243        {
 118244            get => base.EventHubName;
 245        }
 246
 247        /// <summary>
 248        ///   The name of the consumer group this event processor is associated with.  Events will be
 249        ///   read only in the context of this group.
 250        /// </summary>
 251        ///
 252        public new string ConsumerGroup
 253        {
 118254            get => base.ConsumerGroup;
 255        }
 256
 257        /// <summary>
 258        ///   Indicates whether or not this event processor is currently running.
 259        /// </summary>
 260        ///
 261        public new bool IsRunning
 262        {
 304263            get => base.IsRunning;
 0264            protected set => base.IsRunning = value;
 265        }
 266
 267        /// <summary>
 268        ///   A unique name used to identify this event processor.
 269        /// </summary>
 270        ///
 86271        public new string Identifier => base.Identifier;
 272
 273        /// <summary>
 274        ///   The instance of <see cref="EventProcessorClientEventSource" /> which can be mocked for testing.
 275        /// </summary>
 276        ///
 0277        internal EventProcessorClientEventSource Logger { get; set; } = EventProcessorClientEventSource.Log;
 278
 279        /// <summary>
 280        ///   Responsible for creation of checkpoints and for ownership claim.
 281        /// </summary>
 282        ///
 56283        private StorageManager StorageManager { get; }
 284
 285        /// <summary>
 286        ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class.
 287        /// </summary>
 288        ///
 289        /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab
 290        /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re
 291        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 292        ///
 293        /// <remarks>
 294        ///   <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see
 295        ///   does not assume the ability to manage the storage account and is safe to run with only read/write permissi
 296        ///
 297        ///   <para>If the connection string is copied from the Event Hubs namespace, it will likely not contain the nam
 298        ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]
 299        ///   connection string.  For example, ";EntityPath=telemetry-hub".
 300        ///
 301        ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s
 302        ///   Event Hub will result in a connection string that contains the name.</para>
 303        /// </remarks>
 304        ///
 305        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 306        ///
 307        public EventProcessorClient(BlobContainerClient checkpointStore,
 308                                    string consumerGroup,
 4309                                    string connectionString) : this(checkpointStore, consumerGroup, connectionString, nu
 310        {
 0311        }
 312
 313        /// <summary>
 314        ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class.
 315        /// </summary>
 316        ///
 317        /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab
 318        /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re
 319        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 320        /// <param name="clientOptions">The set of options to use for this processor.</param>
 321        ///
 322        /// <remarks>
 323        ///   <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see
 324        ///   does not assume the ability to manage the storage account and is safe to run with only read/write permissi
 325        ///
 326        ///   <para>If the connection string is copied from the Event Hubs namespace, it will likely not contain the nam
 327        ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]
 328        ///   connection string.  For example, ";EntityPath=telemetry-hub".
 329        ///
 330        ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s
 331        ///   Event Hub will result in a connection string that contains the name.</para>
 332        /// </remarks>
 333        ///
 334        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 335        ///
 336        public EventProcessorClient(BlobContainerClient checkpointStore,
 337                                    string consumerGroup,
 338                                    string connectionString,
 10339                                    EventProcessorClientOptions clientOptions) : this(checkpointStore, consumerGroup, co
 340        {
 4341        }
 342
 343        /// <summary>
 344        ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class.
 345        /// </summary>
 346        ///
 347        /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab
 348        /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re
 349        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 350        /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param>
 351        ///
 352        /// <remarks>
 353        ///   <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see
 354        ///   does not assume the ability to manage the storage account and is safe to run with only read/write permissi
 355        ///
 356        ///   <para>If the connection string is copied from the Event Hub itself, it will contain the name of the desire
 357        ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub 
 358        ///   passed only once, either as part of the connection string or separately.</para>
 359        /// </remarks>
 360        ///
 361        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 362        ///
 363        public EventProcessorClient(BlobContainerClient checkpointStore,
 364                                    string consumerGroup,
 365                                    string connectionString,
 0366                                    string eventHubName) : this(checkpointStore, consumerGroup, connectionString, eventH
 367        {
 0368        }
 369
 370        /// <summary>
 371        ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class.
 372        /// </summary>
 373        ///
 374        /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab
 375        /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re
 376        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 377        /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param>
 378        /// <param name="clientOptions">The set of options to use for this processor.</param>
 379        ///
 380        /// <remarks>
 381        ///   <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see
 382        ///   does not assume the ability to manage the storage account and is safe to run with only read/write permissi
 383        ///
 384        ///   <para>If the connection string is copied from the Event Hub itself, it will contain the name of the desire
 385        ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub 
 386        ///   passed only once, either as part of the connection string or separately.</para>
 387        /// </remarks>
 388        ///
 389        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 390        ///
 391        public EventProcessorClient(BlobContainerClient checkpointStore,
 392                                    string consumerGroup,
 393                                    string connectionString,
 394                                    string eventHubName,
 22395                                    EventProcessorClientOptions clientOptions) : base((clientOptions ?? DefaultClientOpt
 396        {
 10397            Argument.AssertNotNull(checkpointStore, nameof(checkpointStore));
 8398            StorageManager = CreateStorageManager(checkpointStore);
 8399        }
 400
 401        /// <summary>
 402        ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class.
 403        /// </summary>
 404        ///
 405        /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab
 406        /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re
 407        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel
 408        /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param>
 409        /// <param name="credential">The Azure identity credential to use for authorization.  Access controls may be spe
 410        /// <param name="clientOptions">The set of options to use for this processor.</param>
 411        ///
 412        /// <remarks>
 413        ///   The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see cref=
 414        ///   does not assume the ability to manage the storage account and is safe to run without permission to manage 
 415        /// </remarks>
 416        ///
 417        public EventProcessorClient(BlobContainerClient checkpointStore,
 418                                    string consumerGroup,
 419                                    string fullyQualifiedNamespace,
 420                                    string eventHubName,
 421                                    TokenCredential credential,
 22422                                    EventProcessorClientOptions clientOptions = default) : base((clientOptions ?? Defaul
 423        {
 6424            Argument.AssertNotNull(checkpointStore, nameof(checkpointStore));
 4425            StorageManager = CreateStorageManager(checkpointStore);
 4426        }
 427
 428        /// <summary>
 429        ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class.
 430        /// </summary>
 431        ///
 432        /// <param name="storageManager">Responsible for creation of checkpoints and for ownership claim.</param>
 433        /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re
 434        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel
 435        /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param>
 436        /// <param name="cacheEventCount">The maximum number of events that will be read from the Event Hubs service and
 437        /// <param name="credential">An Azure identity credential to satisfy base class requirements; this credential ma
 438        /// <param name="clientOptions">The set of options to use for this processor.</param>
 439        ///
 440        /// <remarks>
 441        ///   This constructor is intended only to support functional testing and mocking; it should not be used for pro
 442        /// </remarks>
 443        ///
 444        internal EventProcessorClient(StorageManager storageManager,
 445                                      string consumerGroup,
 446                                      string fullyQualifiedNamespace,
 447                                      string eventHubName,
 448                                      int cacheEventCount,
 449                                      TokenCredential credential,
 88450                                      EventProcessorOptions clientOptions) : base(cacheEventCount, consumerGroup, fullyQ
 451        {
 88452            Argument.AssertNotNull(storageManager, nameof(storageManager));
 453
 88454            DefaultStartingPosition = (clientOptions?.DefaultStartingPosition ?? DefaultStartingPosition);
 88455            StorageManager = storageManager;
 88456        }
 457
 458        /// <summary>
 459        ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class.
 460        /// </summary>
 461        ///
 0462        protected EventProcessorClient() : base()
 463        {
 0464        }
 465
 466        /// <summary>
 467        ///   Signals the <see cref="EventProcessorClient" /> to begin processing events.  Should this method be called 
 468        ///   is running, no action is taken.
 469        /// </summary>
 470        ///
 471        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 472        ///
 473        public override async Task StartProcessingAsync(CancellationToken cancellationToken = default)
 474        {
 26475            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 24476            var releaseGuard = false;
 477
 478            try
 479            {
 24480                await ProcessorStatusGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
 24481                releaseGuard = true;
 482
 24483                if (_processEventAsync == null)
 484                {
 2485                    throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartE
 486                }
 487
 22488                if (_processErrorAsync == null)
 489                {
 2490                    throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartE
 491                }
 492
 20493                await base.StartProcessingAsync(cancellationToken).ConfigureAwait(false);
 20494            }
 0495            catch (OperationCanceledException)
 496            {
 0497                throw new TaskCanceledException();
 498            }
 499            finally
 500            {
 24501                if (releaseGuard)
 502                {
 24503                    ProcessorStatusGuard.Release();
 504                }
 505            }
 20506        }
 507
 508        /// <summary>
 509        ///   Signals the <see cref="EventProcessorClient" /> to begin processing events.  Should this method be called 
 510        ///   is running, no action is taken.
 511        /// </summary>
 512        ///
 513        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 514        ///
 515        public override void StartProcessing(CancellationToken cancellationToken = default)
 516        {
 16517            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 14518            var releaseGuard = false;
 519
 520            try
 521            {
 14522                ProcessorStatusGuard.Wait(cancellationToken);
 14523                releaseGuard = true;
 524
 14525                if (_processEventAsync == null)
 526                {
 2527                    throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartE
 528                }
 529
 12530                if (_processErrorAsync == null)
 531                {
 2532                    throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartE
 533                }
 534
 10535                base.StartProcessing(cancellationToken);
 10536            }
 0537            catch (OperationCanceledException)
 538            {
 0539                throw new TaskCanceledException();
 540            }
 541            finally
 542            {
 14543                if (releaseGuard)
 544                {
 14545                    ProcessorStatusGuard.Release();
 546                }
 14547            }
 10548        }
 549
 550        /// <summary>
 551        ///   Signals the <see cref="EventProcessorClient" /> to stop processing events.  Should this method be called w
 552        ///   is not running, no action is taken.
 553        /// </summary>
 554        ///
 555        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 556        ///
 26557        public override Task StopProcessingAsync(CancellationToken cancellationToken = default) => base.StopProcessingAs
 558
 559        /// <summary>
 560        ///   Signals the <see cref="EventProcessorClient" /> to stop processing events.  Should this method be called w
 561        ///   is not running, no action is taken.
 562        /// </summary>
 563        ///
 564        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 565        ///
 8566        public override void StopProcessing(CancellationToken cancellationToken = default) => base.StopProcessing(cancel
 567
 568        /// <summary>
 569        ///   Determines whether the specified <see cref="System.Object" /> is equal to this instance.
 570        /// </summary>
 571        ///
 572        /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param>
 573        ///
 574        /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c>
 575        ///
 576        [EditorBrowsable(EditorBrowsableState.Never)]
 0577        public override bool Equals(object obj) => base.Equals(obj);
 578
 579        /// <summary>
 580        ///   Returns a hash code for this instance.
 581        /// </summary>
 582        ///
 583        /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha
 584        ///
 585        [EditorBrowsable(EditorBrowsableState.Never)]
 0586        public override int GetHashCode() => base.GetHashCode();
 587
 588        /// <summary>
 589        ///   Converts the instance to string representation.
 590        /// </summary>
 591        ///
 592        /// <returns>A <see cref="System.String" /> that represents this instance.</returns>
 593        ///
 594        [EditorBrowsable(EditorBrowsableState.Never)]
 0595        public override string ToString() => base.ToString();
 596
 597        /// <summary>
 598        ///   Updates the checkpoint using the given information for the associated partition and consumer group in the 
 599        /// </summary>
 600        ///
 601        /// <param name="eventData">The event containing the information to be stored in the checkpoint.</param>
 602        /// <param name="context">The context of the partition the checkpoint is associated with.</param>
 603        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 604        ///
 605        internal Task UpdateCheckpointAsync(EventData eventData,
 606                                            PartitionContext context,
 607                                            CancellationToken cancellationToken)
 608        {
 14609            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 610
 12611            Argument.AssertNotNull(eventData, nameof(eventData));
 12612            Argument.AssertInRange(eventData.Offset, long.MinValue + 1, long.MaxValue, nameof(eventData.Offset));
 12613            Argument.AssertInRange(eventData.SequenceNumber, long.MinValue + 1, long.MaxValue, nameof(eventData.Sequence
 12614            Argument.AssertNotNull(context, nameof(context));
 615
 12616            Logger.UpdateCheckpointStart(context.PartitionId, Identifier, EventHubName, ConsumerGroup);
 617
 12618            using var scope = EventDataInstrumentation.ScopeFactory.CreateScope(DiagnosticProperty.EventProcessorCheckpo
 12619            scope.Start();
 620
 621            try
 622            {
 623                // Parameter validation is done by Checkpoint constructor.
 624
 12625                var checkpoint = new EventProcessorCheckpoint
 12626                {
 12627                    FullyQualifiedNamespace = FullyQualifiedNamespace,
 12628                    EventHubName = EventHubName,
 12629                    ConsumerGroup = ConsumerGroup,
 12630                    PartitionId = context.PartitionId,
 12631                    StartingPosition = EventPosition.FromOffset(eventData.Offset)
 12632                };
 633
 12634                return StorageManager.UpdateCheckpointAsync(checkpoint, eventData, cancellationToken);
 635            }
 2636            catch (Exception ex)
 637            {
 638                // In case of failure, there is no need to call the error handler because the exception can
 639                // be thrown directly to the caller here.
 640
 2641                scope.Failed(ex);
 2642                Logger.UpdateCheckpointError(context.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
 643
 2644                throw;
 645            }
 646            finally
 647            {
 12648                Logger.UpdateCheckpointComplete(context.PartitionId, Identifier, EventHubName, ConsumerGroup);
 12649            }
 10650        }
 651
 652        /// <summary>
 653        ///   Creates an <see cref="EventHubConnection" /> to use for communicating with the Event Hubs service.
 654        /// </summary>
 655        ///
 656        /// <returns>The requested <see cref="EventHubConnection" />.</returns>
 657        ///
 0658        protected override EventHubConnection CreateConnection() => base.CreateConnection();
 659
 660        /// <summary>
 661        ///   Produces a list of the available checkpoints for the Event Hub and consumer group associated with the
 662        ///   event processor instance, so that processing for a given set of partitions can be properly initialized.
 663        /// </summary>
 664        ///
 665        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 666        ///
 667        /// <returns>The set of checkpoints for the processor to take into account when initializing partitions.</return
 668        ///
 669        /// <remarks>
 670        ///   Should a partition not have a corresponding checkpoint, the <see cref="EventProcessorOptions.DefaultStarti
 671        ///   be used to initialize the partition for processing.
 672        ///
 673        ///   In the event that a custom starting point is desired for a single partition, or each partition should star
 674        ///   it is recommended that this method express that intent by returning checkpoints for those partitions with 
 675        ///   starting location set.
 676        /// </remarks>
 677        ///
 678        protected override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken canc
 679        {
 8680            var checkpoints = await StorageManager.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerG
 681
 682            // If there was no initialization handler, no custom starting positions
 683            // could have been specified.  Return the checkpoints without further processing.
 684
 8685            if (_partitionInitializingAsync == null)
 686            {
 4687                return checkpoints;
 688            }
 689
 690            // Process the checkpoints to inject mock checkpoints for partitions that
 691            // specify a custom default and do not have an actual checkpoint.
 692
 4693            return ProcessCheckpointStartingPositions(checkpoints);
 8694        }
 695
 696        /// <summary>
 697        ///   Produces a list of the ownership assignments for partitions between each of the cooperating event processo
 698        ///   instances for a given Event Hub and consumer group pairing.  This method is used when load balancing to al
 699        ///   the processor to discover other active collaborators and to make decisions about how to best balance work
 700        ///   between them.
 701        /// </summary>
 702        ///
 703        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 704        ///
 705        /// <returns>The set of ownership records to take into account when making load balancing decisions.</returns>
 706        ///
 707        protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken canc
 2708            StorageManager.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken);
 709
 710        /// <summary>
 711        ///   Attempts to claim ownership of the specified partitions for processing.  This method is used by
 712        ///   load balancing to allow event processor instances to distribute the responsibility for processing
 713        ///   partitions for a given Event Hub and consumer group pairing amongst the active event processors.
 714        /// </summary>
 715        ///
 716        /// <param name="desiredOwnership">The set of partition ownership desired by the event processor instance; this 
 717        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 718        ///
 719        /// <returns>The set of ownership records for the partitions that were successfully claimed; this is expected to
 720        ///
 721        protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProc
 722                                                                                                   CancellationToken can
 28723            StorageManager.ClaimOwnershipAsync(desiredOwnership, cancellationToken);
 724
 725        /// <summary>
 726        ///   Performs the tasks needed to process a batch of events for a given partition as they are read from the Eve
 727        /// </summary>
 728        ///
 729        /// <param name="events">The batch of events to be processed.</param>
 730        /// <param name="partition">The context of the partition from which the events were read.</param>
 731        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 732        ///
 733        /// <remarks>
 734        ///   <para>The number of events in the <paramref name="events"/> batch may vary.  The batch will contain a numb
 735        ///   requested when the processor was created, depending on the availability of events in the partition within 
 736        ///   interval.
 737        ///
 738        ///   If there are enough events available in the Event Hub partition to fill a batch of the requested size, the
 739        ///   immediately.  If there were not a sufficient number of events available in the partition to populate a ful
 740        ///   to reach the requested batch size until the <see cref="EventProcessorOptions.MaximumWaitTime"/> has elapse
 741        ///   available by the end of that period.
 742        ///
 743        ///   If a <see cref="EventProcessorOptions.MaximumWaitTime"/> was not requested, indicated by setting the optio
 744        ///   partition until a full batch of the requested size could be populated and will not dispatch any partial ba
 745        ///
 746        ///   <para>Should an exception occur within the code for this method, the event processor will allow it to bubb
 747        ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account and guard agai
 748        ///
 749        ///   <para>It is not recommended that the state of the processor be managed directly from within this method; r
 750        ///   a deadlock scenario, especially if using the synchronous form of the call.</para>
 751        /// </remarks>
 752        ///
 753        protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData> events,
 754                                                                  EventProcessorPartition partition,
 755                                                                  CancellationToken cancellationToken)
 756        {
 757            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 758
 759            var context = default(PartitionContext);
 760            var eventArgs = default(ProcessEventArgs);
 761            var caughtExceptions = default(List<Exception>);
 762            var emptyBatch = true;
 763
 764            try
 765            {
 766                Logger.EventBatchProcessingStart(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
 767
 768                // Attempt to process each event in the batch, marking if the batch was non-empty.  Exceptions during
 769                // processing should be logged and cached, as the batch must be processed completely to avoid losing eve
 770
 771                foreach (var eventData in events)
 772                {
 773                    emptyBatch = false;
 774
 775                    try
 776                    {
 0777                        context ??= new ProcessorPartitionContext(partition.PartitionId, () => ReadLastEnqueuedEventProp
 38778                        eventArgs = new ProcessEventArgs(context, eventData, updateToken => UpdateCheckpointAsync(eventD
 779
 780                        await _processEventAsync(eventArgs).ConfigureAwait(false);
 781                    }
 782                    catch (Exception ex) when (!(ex is TaskCanceledException))
 783                    {
 784                        // This exception is not surfaced to the error handler or bubbled, as the entire batch must be
 785                        // processed or events will be lost.  Preserve the exceptions, should any occur.
 786
 787                        Logger.EventBatchProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup,
 788
 789                        caughtExceptions ??= new List<Exception>();
 790                        caughtExceptions.Add(ex);
 791                    }
 792                }
 793
 794                // If the event batch was empty, then dispatch to the handler; the base class will ensure that empty bat
 795                // are requested and will not invoke this method should empties not be sent to the handler.
 796
 797                if (emptyBatch)
 798                {
 799                    eventArgs = new ProcessEventArgs(new EmptyPartitionContext(partition.PartitionId), null, EmptyEventU
 800                    await _processEventAsync(eventArgs).ConfigureAwait(false);
 801                }
 802            }
 803            catch (Exception ex) when (!(ex is TaskCanceledException))
 804            {
 805                // This exception was either not related to processing events or was the result of sending an empty batc
 806                // processed.  Since there would be no other caught exceptions, tread this like a single case.
 807
 808                Logger.EventBatchProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Mess
 809                throw;
 810            }
 811            finally
 812            {
 813                Logger.EventBatchProcessingComplete(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
 814            }
 815
 816            // Deal with any exceptions that occurred while processing the batch.  If more than one was
 817            // present, preserve them in an aggregate exception for transport.
 818
 819            if (caughtExceptions != null)
 820            {
 821                if (caughtExceptions.Count == 1)
 822                {
 823                    ExceptionDispatchInfo.Capture(caughtExceptions[0]).Throw();
 824                }
 825
 826                throw new AggregateException(Resources.AggregateEventProcessingExceptionMessage, caughtExceptions);
 827            }
 828        }
 829
 830        /// <summary>
 831        ///   Performs the tasks needed when an unexpected exception occurs within the operation of the
 832        ///   event processor infrastructure.
 833        /// </summary>
 834        ///
 835        /// <param name="exception">The exception that occurred during operation of the event processor.</param>
 836        /// <param name="partition">The context of the partition associated with the error, if any; otherwise, <c>null</
 837        /// <param name="operationDescription">A short textual description of the operation during which the exception o
 838        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 839        ///
 840        /// <remarks>
 841        ///   This error handler is invoked when there is an exception observed within the event processor itself; it is
 842        ///   code that has been implemented to process events or other overrides and extension points that are not crit
 843        ///   The event processor will make every effort to recover from exceptions and continue processing.  Should an 
 844        ///   from be encountered, the processor will attempt to forfeit ownership of all partitions that it was process
 845        ///
 846        ///   The exceptions surfaced to this method may be fatal or non-fatal; because the processor may not be able to
 847        ///   exception was fatal or whether its state was corrupted, this method has responsibility for making the dete
 848        ///   should be terminated or restarted.  The method may do so by calling Stop on the processor instance and the
 849        ///
 850        ///   It is recommended that, for production scenarios, the decision be made by considering observations made by
 851        ///   when initializing processing for a partition, and the method invoked when processing for a partition is st
 852        ///   data from their monitoring platforms in this decision as well.
 853        ///
 854        ///   As with event processing, should an exception occur in the code for the error handler, the event processor
 855        ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account and guard agai
 856        /// </remarks>
 857        ///
 858        protected override async Task OnProcessingErrorAsync(Exception exception,
 859                                                             EventProcessorPartition partition,
 860                                                             string operationDescription,
 861                                                             CancellationToken cancellationToken)
 862        {
 4863            var eventArgs = new ProcessErrorEventArgs(partition?.PartitionId, operationDescription, exception, cancellat
 4864            await _processErrorAsync(eventArgs).ConfigureAwait(false);
 4865        }
 866
 867        /// <summary>
 868        ///   Performs the tasks to initialize a partition, and its associated context, for event processing.
 869        /// </summary>
 870        ///
 871        /// <param name="partition">The context of the partition being initialized.  Only the well-known members of the 
 872        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 873        ///
 874        /// <remarks>
 875        ///   It is not recommended that the state of the processor be managed directly from within this method; request
 876        ///   a deadlock scenario, especially if using the synchronous form of the call.
 877        /// </remarks>
 878        ///
 879        protected override async Task OnInitializingPartitionAsync(EventProcessorPartition partition,
 880                                                                   CancellationToken cancellationToken)
 881        {
 882            // Handlers cannot be changed while the processor is running; it is safe to check and call
 883            // without capturing a local reference.
 884
 8885            if (_partitionInitializingAsync != null)
 886            {
 6887                var eventArgs = new PartitionInitializingEventArgs(partition.PartitionId, DefaultStartingPosition, cance
 6888                await _partitionInitializingAsync(eventArgs).ConfigureAwait(false);
 889
 6890                PartitionStartingPositionDefaults[partition.PartitionId] = eventArgs.DefaultStartingPosition;
 6891            }
 8892        }
 893
 894        /// <summary>
 895        ///   Performs the tasks needed when processing for a partition is being stopped.  This commonly occurs when the
 896        ///   is claimed by another event processor instance or when the current event processor instance is shutting do
 897        /// </summary>
 898        ///
 899        /// <param name="partition">The context of the partition for which processing is being stopped.</param>
 900        /// <param name="reason">The reason that processing is being stopped for the partition.</param>
 901        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 902        ///
 903        /// <remarks>
 904        ///   It is not recommended that the state of the processor be managed directly from within this method; request
 905        ///   a deadlock scenario, especially if using the synchronous form of the call.
 906        /// </remarks>
 907        ///
 908        protected override async Task OnPartitionProcessingStoppedAsync(EventProcessorPartition partition,
 909                                                                        ProcessingStoppedReason reason,
 910                                                                        CancellationToken cancellationToken)
 911        {
 912            // Handlers cannot be changed while the processor is running; it is safe to check and call
 913            // without capturing a local reference.
 914
 2915            if (_partitionClosingAsync != null)
 916            {
 2917                var eventArgs = new PartitionClosingEventArgs(partition.PartitionId, reason, cancellationToken);
 2918                await _partitionClosingAsync(eventArgs).ConfigureAwait(false);
 919            }
 920
 2921            PartitionStartingPositionDefaults.TryRemove(partition.PartitionId, out var _);
 2922        }
 923
 924        /// <summary>
 925        ///   Creates a <see cref="StorageManager" /> to use for interacting with durable storage.
 926        /// </summary>
 927        ///
 928        /// <param name="checkpointStore">The client responsible for interaction with durable storage, responsible for p
 929        ///
 930        /// <returns>A <see cref="StorageManager" /> with the requested configuration.</returns>
 931        ///
 12932        private StorageManager CreateStorageManager(BlobContainerClient checkpointStore) => new BlobsCheckpointStore(che
 933
 934        /// <summary>
 935        ///   Processes the starting positions for checkpoints, ensuring that any overrides set by the <see cref="Partit
 936        ///   handler are respected when no natural checkpoint exists for the partition.
 937        /// </summary>
 938        ///
 939        /// <param name="sourceCheckpoints">The checkpoint set to process.</param>
 940        ///
 941        /// <returns>An enumerable consisting of the <paramref name="sourceCheckpoints"/> and a set of artificial checkp
 942        ///
 943        private IEnumerable<EventProcessorCheckpoint> ProcessCheckpointStartingPositions(IEnumerable<EventProcessorCheck
 944        {
 4945            var knownCheckpoints = new HashSet<string>();
 946
 947            // Return the checkpoints and track which partitions they belong to.
 948
 28949            foreach (var checkpoint in sourceCheckpoints)
 950            {
 10951                knownCheckpoints.Add(checkpoint.PartitionId);
 10952                yield return checkpoint;
 953            }
 954
 955            // For any partitions with custom default starting point, emit an artificial
 956            // checkpoint if a natural checkpoint did not exist.
 957
 16958            foreach (var partition in PartitionStartingPositionDefaults.Keys)
 959            {
 4960                if (!knownCheckpoints.Contains(partition))
 961                {
 2962                    yield return new EventProcessorCheckpoint
 2963                    {
 2964                       FullyQualifiedNamespace = FullyQualifiedNamespace,
 2965                       EventHubName = EventHubName,
 2966                       ConsumerGroup = ConsumerGroup,
 2967                       PartitionId = partition,
 2968                       StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partition, out EventPosition pos
 2969                    };
 970                }
 971            }
 4972        }
 973
 974        /// <summary>
 975        ///   Invokes a specified action only if this <see cref="EventProcessorClient" /> instance is not running.
 976        /// </summary>
 977        ///
 978        /// <param name="action">The action to invoke.</param>
 979        ///
 980        /// <exception cref="InvalidOperationException">Occurs when this method is invoked while the event processor is 
 981        ///
 982        private void EnsureNotRunningAndInvoke(Action action)
 983        {
 132984            var releaseGuard = false;
 985
 132986            if (!IsRunning)
 987            {
 988                try
 989                {
 124990                    ProcessorStatusGuard.Wait();
 124991                    releaseGuard = true;
 992
 124993                    if (!IsRunning)
 994                    {
 124995                        action?.Invoke();
 124996                        return;
 997                    }
 0998                }
 999                finally
 1000                {
 1241001                    if (releaseGuard)
 1002                    {
 1241003                        ProcessorStatusGuard.Release();
 1004                    }
 1241005                }
 1006            }
 1007
 81008            throw new InvalidOperationException(Resources.RunningEventProcessorCannotPerformOperation);
 1241009        }
 1010
 1011        /// <summary>
 1012        ///   Creates the set of options to pass to the base <see cref="EventProcessorClient" />.
 1013        /// </summary>
 1014        ///
 1015        /// <param name="clientOptions">The set of client options for the <see cref="EventProcessorClient" /> instance.<
 1016        ///
 1017        /// <returns>The set of options to use for the base processor.</returns>
 1018        ///
 1019        private static EventProcessorOptions CreateOptions(EventProcessorClientOptions clientOptions)
 1020        {
 481021            clientOptions ??= DefaultClientOptions;
 1022
 481023            return new EventProcessorOptions
 481024            {
 481025                ConnectionOptions = clientOptions.ConnectionOptions.Clone(),
 481026                RetryOptions = clientOptions.RetryOptions.Clone(),
 481027                Identifier = clientOptions.Identifier,
 481028                MaximumWaitTime = clientOptions.MaximumWaitTime,
 481029                TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties,
 481030                LoadBalancingStrategy = clientOptions.LoadBalancingStrategy,
 481031                PrefetchCount = clientOptions.PrefetchCount
 481032            };
 1033        }
 1034
 1035        /// <summary>
 1036        ///   Represents a basic partition context for event processing within the processor client.
 1037        /// </summary>
 1038        ///
 1039        /// <seealso cref="Azure.Messaging.EventHubs.Consumer.PartitionContext" />
 1040        ///
 1041        private class ProcessorPartitionContext : PartitionContext
 1042        {
 1043            /// <summary>A function that can be used to read the last enqueued event properties for the partition.</summ
 1044            private Func<LastEnqueuedEventProperties> _readLastEnqueuedEventProperties;
 1045
 1046            /// <summary>
 1047            ///   Initializes a new instance of the <see cref="EmptyPartitionContext" /> class.
 1048            /// </summary>
 1049            ///
 1050            /// <param name="partitionId">The identifier of the partition that the context represents.</param>
 1051            /// <param name="readLastEnqueuedEventProperties">A function that can be used to read the last enqueued even
 1052            ///
 1053            public ProcessorPartitionContext(string partitionId,
 101054                                             Func<LastEnqueuedEventProperties> readLastEnqueuedEventProperties) : base(p
 1055            {
 101056                _readLastEnqueuedEventProperties = readLastEnqueuedEventProperties;
 101057            }
 1058
 1059            /// <summary>
 1060            ///   A set of information about the last enqueued event of a partition, not available for the
 1061            ///   empty context.
 1062            /// </summary>
 1063            ///
 1064            /// <returns>The set of properties for the last event that was enqueued to the partition.</returns>
 1065            ///
 01066            public override LastEnqueuedEventProperties ReadLastEnqueuedEventProperties() => _readLastEnqueuedEventPrope
 1067        }
 1068
 1069        /// <summary>
 1070        ///   Represents a basic partition context for event processing when the
 1071        ///   full context was not available.
 1072        /// </summary>
 1073        ///
 1074        /// <seealso cref="Azure.Messaging.EventHubs.Consumer.PartitionContext" />
 1075        ///
 1076        private class EmptyPartitionContext : PartitionContext
 1077        {
 1078            /// <summary>
 1079            ///   Initializes a new instance of the <see cref="EmptyPartitionContext" /> class.
 1080            /// </summary>
 1081            ///
 1082            /// <param name="partitionId">The identifier of the partition that the context represents.</param>
 1083            ///
 21084            public EmptyPartitionContext(string partitionId) : base(partitionId)
 1085            {
 21086            }
 1087
 1088            /// <summary>
 1089            ///   A set of information about the last enqueued event of a partition, not available for the
 1090            ///   empty context.
 1091            /// </summary>
 1092            ///
 1093            /// <returns>The set of properties for the last event that was enqueued to the partition.</returns>
 1094            ///
 1095            /// <exception cref="InvalidOperationException">The method call is not available on the <see cref="EmptyPart
 1096            ///
 1097            public override LastEnqueuedEventProperties ReadLastEnqueuedEventProperties() =>
 01098                throw new InvalidOperationException(Resources.CannotReadLastEnqueuedEventPropertiesWithoutEvent);
 1099        }
 1100    }
 1101}