|  |  | 1 |  | // Copyright (c) Microsoft Corporation. All rights reserved. | 
|  |  | 2 |  | // Licensed under the MIT License. | 
|  |  | 3 |  |  | 
|  |  | 4 |  | using System; | 
|  |  | 5 |  | using System.Collections.Concurrent; | 
|  |  | 6 |  | using System.Collections.Generic; | 
|  |  | 7 |  | using System.ComponentModel; | 
|  |  | 8 |  | using System.Diagnostics.CodeAnalysis; | 
|  |  | 9 |  | using System.Globalization; | 
|  |  | 10 |  | using System.Runtime.ExceptionServices; | 
|  |  | 11 |  | using System.Threading; | 
|  |  | 12 |  | using System.Threading.Tasks; | 
|  |  | 13 |  | using Azure.Core; | 
|  |  | 14 |  | using Azure.Messaging.EventHubs.Consumer; | 
|  |  | 15 |  | using Azure.Messaging.EventHubs.Core; | 
|  |  | 16 |  | using Azure.Messaging.EventHubs.Diagnostics; | 
|  |  | 17 |  | using Azure.Messaging.EventHubs.Primitives; | 
|  |  | 18 |  | using Azure.Messaging.EventHubs.Processor; | 
|  |  | 19 |  | using Azure.Messaging.EventHubs.Processor.Diagnostics; | 
|  |  | 20 |  | using Azure.Storage.Blobs; | 
|  |  | 21 |  |  | 
|  |  | 22 |  | namespace Azure.Messaging.EventHubs | 
|  |  | 23 |  | { | 
|  |  | 24 |  |     /// <summary> | 
|  |  | 25 |  |     ///   Allows for consuming and processing events across all partitions of a given Event Hub within the scope of a sp | 
|  |  | 26 |  |     ///   consumer group.  The processor is capable of collaborating with other instances for the same Event Hub and con | 
|  |  | 27 |  |     ///   group pairing to share work by using a common storage platform to communicate.  Fault tolerance is also built- | 
|  |  | 28 |  |     ///   allowing the processor to be resilient in the face of errors. | 
|  |  | 29 |  |     /// </summary> | 
|  |  | 30 |  |     /// | 
|  |  | 31 |  |     [SuppressMessage("Usage", "CA1001:Types that own disposable fields should be disposable.", Justification = "Disposal | 
|  |  | 32 |  |     public class EventProcessorClient : EventProcessor<EventProcessorPartition> | 
|  |  | 33 |  |     { | 
|  |  | 34 |  |         /// <summary>The delegate to invoke when attempting to update a checkpoint using an empty event.</summary> | 
|  | 4 | 35 |  |         private static readonly Func<CancellationToken, Task> EmptyEventUpdateCheckpoint = cancellationToken => throw ne | 
|  |  | 36 |  |  | 
|  |  | 37 |  |         /// <summary>The set of default options for the processor.</summary> | 
|  | 2 | 38 |  |         private static readonly EventProcessorClientOptions DefaultClientOptions = new EventProcessorClientOptions(); | 
|  |  | 39 |  |  | 
|  |  | 40 |  |         /// <summary>The default starting position for the processor.</summary> | 
|  | 0 | 41 |  |         private readonly EventPosition DefaultStartingPosition = new EventProcessorOptions().DefaultStartingPosition; | 
|  |  | 42 |  |  | 
|  |  | 43 |  |         /// <summary>The set of default starting positions for partitions being processed; these are collected at initia | 
|  | 0 | 44 |  |         private readonly ConcurrentDictionary<string, EventPosition> PartitionStartingPositionDefaults = new ConcurrentD | 
|  |  | 45 |  |  | 
|  |  | 46 |  |         /// <summary>The primitive for synchronizing access during start and set handler operations.</summary> | 
|  | 0 | 47 |  |         private readonly SemaphoreSlim ProcessorStatusGuard = new SemaphoreSlim(1, 1); | 
|  |  | 48 |  |  | 
|  |  | 49 |  |         /// <summary>The handler to be called just before event processing starts for a given partition.</summary> | 
|  |  | 50 |  |         private Func<PartitionInitializingEventArgs, Task> _partitionInitializingAsync; | 
|  |  | 51 |  |  | 
|  |  | 52 |  |         /// <summary>The handler to be called once event processing stops for a given partition.</summary> | 
|  |  | 53 |  |         private Func<PartitionClosingEventArgs, Task> _partitionClosingAsync; | 
|  |  | 54 |  |  | 
|  |  | 55 |  |         /// <summary>Responsible for processing events received from the Event Hubs service.</summary> | 
|  |  | 56 |  |         private Func<ProcessEventArgs, Task> _processEventAsync; | 
|  |  | 57 |  |  | 
|  |  | 58 |  |         /// <summary>Responsible for processing unhandled exceptions thrown while this processor is running.</summary> | 
|  |  | 59 |  |         private Func<ProcessErrorEventArgs, Task> _processErrorAsync; | 
|  |  | 60 |  |  | 
|  |  | 61 |  |         /// <summary> | 
|  |  | 62 |  |         ///    Performs the tasks to initialize a partition, and its associated context, for event processing. | 
|  |  | 63 |  |         /// | 
|  |  | 64 |  |         ///   <para>It is not recommended that the state of the processor be managed directly from within this method; r | 
|  |  | 65 |  |         ///   a deadlock scenario, especially if using the synchronous form of the call.</para> | 
|  |  | 66 |  |         /// </summary> | 
|  |  | 67 |  |         /// | 
|  |  | 68 |  |         [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju | 
|  |  | 69 |  |         [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s | 
|  |  | 70 |  |         [SuppressMessage("Design", "CA1065:Do not raise exceptions in unexpected locations", Justification = "Guidelines | 
|  |  | 71 |  |         public event Func<PartitionInitializingEventArgs, Task> PartitionInitializingAsync | 
|  |  | 72 |  |         { | 
|  |  | 73 |  |             add | 
|  |  | 74 |  |             { | 
|  | 20 | 75 |  |                 Argument.AssertNotNull(value, nameof(PartitionInitializingAsync)); | 
|  |  | 76 |  |  | 
|  | 18 | 77 |  |                 if (_partitionInitializingAsync != default) | 
|  |  | 78 |  |                 { | 
|  | 2 | 79 |  |                     throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned); | 
|  |  | 80 |  |                 } | 
|  |  | 81 |  |  | 
|  | 30 | 82 |  |                 EnsureNotRunningAndInvoke(() => _partitionInitializingAsync = value); | 
|  | 14 | 83 |  |             } | 
|  |  | 84 |  |  | 
|  |  | 85 |  |             remove | 
|  |  | 86 |  |             { | 
|  | 6 | 87 |  |                 Argument.AssertNotNull(value, nameof(PartitionInitializingAsync)); | 
|  |  | 88 |  |  | 
|  | 6 | 89 |  |                 if (_partitionInitializingAsync != value) | 
|  |  | 90 |  |                 { | 
|  | 4 | 91 |  |                     throw new ArgumentException(Resources.HandlerHasNotBeenAssigned); | 
|  |  | 92 |  |                 } | 
|  |  | 93 |  |  | 
|  | 4 | 94 |  |                 EnsureNotRunningAndInvoke(() => _partitionInitializingAsync = default); | 
|  | 2 | 95 |  |             } | 
|  |  | 96 |  |         } | 
|  |  | 97 |  |  | 
|  |  | 98 |  |         /// <summary> | 
|  |  | 99 |  |         ///   Performs the tasks needed when processing for a partition is being stopped.  This commonly occurs when the | 
|  |  | 100 |  |         ///   the current event processor instance is shutting down. | 
|  |  | 101 |  |         /// | 
|  |  | 102 |  |         ///   <para>It is not recommended that the state of the processor be managed directly from within this method; r | 
|  |  | 103 |  |         ///   a deadlock scenario, especially if using the synchronous form of the call.</para> | 
|  |  | 104 |  |         /// </summary> | 
|  |  | 105 |  |         /// | 
|  |  | 106 |  |         [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju | 
|  |  | 107 |  |         [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s | 
|  |  | 108 |  |         [SuppressMessage("Design", "CA1065:Do not raise exceptions in unexpected locations", Justification = "Guidelines | 
|  |  | 109 |  |         public event Func<PartitionClosingEventArgs, Task> PartitionClosingAsync | 
|  |  | 110 |  |         { | 
|  |  | 111 |  |             add | 
|  |  | 112 |  |             { | 
|  | 16 | 113 |  |                 Argument.AssertNotNull(value, nameof(PartitionClosingAsync)); | 
|  |  | 114 |  |  | 
|  | 14 | 115 |  |                 if (_partitionClosingAsync != default) | 
|  |  | 116 |  |                 { | 
|  | 2 | 117 |  |                     throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned); | 
|  |  | 118 |  |                 } | 
|  |  | 119 |  |  | 
|  | 22 | 120 |  |                 EnsureNotRunningAndInvoke(() => _partitionClosingAsync = value); | 
|  | 10 | 121 |  |             } | 
|  |  | 122 |  |  | 
|  |  | 123 |  |             remove | 
|  |  | 124 |  |             { | 
|  | 6 | 125 |  |                 Argument.AssertNotNull(value, nameof(PartitionClosingAsync)); | 
|  |  | 126 |  |  | 
|  | 6 | 127 |  |                 if (_partitionClosingAsync != value) | 
|  |  | 128 |  |                 { | 
|  | 4 | 129 |  |                     throw new ArgumentException(Resources.HandlerHasNotBeenAssigned); | 
|  |  | 130 |  |                 } | 
|  |  | 131 |  |  | 
|  | 4 | 132 |  |                 EnsureNotRunningAndInvoke(() => _partitionClosingAsync = default); | 
|  | 2 | 133 |  |             } | 
|  |  | 134 |  |         } | 
|  |  | 135 |  |  | 
|  |  | 136 |  |         /// <summary> | 
|  |  | 137 |  |         ///  Performs the tasks needed to process a batch of events for a given partition as they are read from the Even | 
|  |  | 138 |  |         /// | 
|  |  | 139 |  |         ///  <para>Should an exception occur within the code for this handler, the <see cref="EventProcessorClient" /> w | 
|  |  | 140 |  |         ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account, including the | 
|  |  | 141 |  |         ///   as appropriate.</para> | 
|  |  | 142 |  |         /// | 
|  |  | 143 |  |         ///  <para>It is not recommended that the state of the processor be managed directly from within this handler; r | 
|  |  | 144 |  |         ///   a deadlock scenario, especially if using the synchronous form of the call.</para> | 
|  |  | 145 |  |         /// </summary> | 
|  |  | 146 |  |         /// | 
|  |  | 147 |  |         [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju | 
|  |  | 148 |  |         [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s | 
|  |  | 149 |  |         [SuppressMessage("Design", "CA1065:Do not raise exceptions in unexpected locations", Justification = "Guidelines | 
|  |  | 150 |  |         public event Func<ProcessEventArgs, Task> ProcessEventAsync | 
|  |  | 151 |  |         { | 
|  |  | 152 |  |             add | 
|  |  | 153 |  |             { | 
|  | 52 | 154 |  |                 Argument.AssertNotNull(value, nameof(ProcessEventAsync)); | 
|  |  | 155 |  |  | 
|  | 50 | 156 |  |                 if (_processEventAsync != default) | 
|  |  | 157 |  |                 { | 
|  | 2 | 158 |  |                     throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned); | 
|  |  | 159 |  |                 } | 
|  |  | 160 |  |  | 
|  | 96 | 161 |  |                 EnsureNotRunningAndInvoke(() => _processEventAsync = value); | 
|  | 48 | 162 |  |             } | 
|  |  | 163 |  |  | 
|  |  | 164 |  |             remove | 
|  |  | 165 |  |             { | 
|  | 10 | 166 |  |                 Argument.AssertNotNull(value, nameof(ProcessEventAsync)); | 
|  |  | 167 |  |  | 
|  | 10 | 168 |  |                 if (_processEventAsync != value) | 
|  |  | 169 |  |                 { | 
|  | 4 | 170 |  |                     throw new ArgumentException(Resources.HandlerHasNotBeenAssigned); | 
|  |  | 171 |  |                 } | 
|  |  | 172 |  |  | 
|  | 10 | 173 |  |                 EnsureNotRunningAndInvoke(() => _processEventAsync = default); | 
|  | 4 | 174 |  |             } | 
|  |  | 175 |  |         } | 
|  |  | 176 |  |  | 
|  |  | 177 |  |         /// <summary> | 
|  |  | 178 |  |         ///   Performs the tasks needed when an unexpected exception occurs within the operation of the event processor  | 
|  |  | 179 |  |         /// | 
|  |  | 180 |  |         ///   <para>This error handler is invoked when there is an exception observed within the <see cref="EventProcess | 
|  |  | 181 |  |         ///   code that has been implemented to process events or other event handlers and extension points that execute | 
|  |  | 182 |  |         ///   make every effort to recover from exceptions and continue processing.  Should an exception that cannot be  | 
|  |  | 183 |  |         ///   ownership of all partitions that it was processing so that work may be redistributed.</para> | 
|  |  | 184 |  |         /// | 
|  |  | 185 |  |         ///   <para>The exceptions surfaced to this method may be fatal or non-fatal; because the processor may not be a | 
|  |  | 186 |  |         ///   exception was fatal or whether its state was corrupted, this method has responsibility for making the dete | 
|  |  | 187 |  |         ///   should be terminated or restarted.  The method may do so by calling Stop on the processor instance and the | 
|  |  | 188 |  |         /// | 
|  |  | 189 |  |         ///   <para>It is recommended that, for production scenarios, the decision be made by considering observations m | 
|  |  | 190 |  |         ///   when initializing processing for a partition, and the method invoked when processing for a partition is st | 
|  |  | 191 |  |         ///   data from their monitoring platforms in this decision as well.</para> | 
|  |  | 192 |  |         /// | 
|  |  | 193 |  |         ///   <para>As with event processing, should an exception occur in the code for the error handler, the event pro | 
|  |  | 194 |  |         ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account and guard agai | 
|  |  | 195 |  |         /// </summary> | 
|  |  | 196 |  |         /// | 
|  |  | 197 |  |         [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju | 
|  |  | 198 |  |         [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s | 
|  |  | 199 |  |         [SuppressMessage("Design", "CA1065:Do not raise exceptions in unexpected locations", Justification = "Guidelines | 
|  |  | 200 |  |         public event Func<ProcessErrorEventArgs, Task> ProcessErrorAsync | 
|  |  | 201 |  |         { | 
|  |  | 202 |  |             add | 
|  |  | 203 |  |             { | 
|  | 44 | 204 |  |                 Argument.AssertNotNull(value, nameof(ProcessErrorAsync)); | 
|  |  | 205 |  |  | 
|  | 42 | 206 |  |                 if (_processErrorAsync != default) | 
|  |  | 207 |  |                 { | 
|  | 2 | 208 |  |                     throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned); | 
|  |  | 209 |  |                 } | 
|  |  | 210 |  |  | 
|  | 80 | 211 |  |                 EnsureNotRunningAndInvoke(() => _processErrorAsync = value); | 
|  | 40 | 212 |  |             } | 
|  |  | 213 |  |  | 
|  |  | 214 |  |             remove | 
|  |  | 215 |  |             { | 
|  | 10 | 216 |  |                 Argument.AssertNotNull(value, nameof(ProcessErrorAsync)); | 
|  |  | 217 |  |  | 
|  | 10 | 218 |  |                 if (_processErrorAsync != value) | 
|  |  | 219 |  |                 { | 
|  | 4 | 220 |  |                     throw new ArgumentException(Resources.HandlerHasNotBeenAssigned); | 
|  |  | 221 |  |                 } | 
|  |  | 222 |  |  | 
|  | 10 | 223 |  |                 EnsureNotRunningAndInvoke(() => _processErrorAsync = default); | 
|  | 4 | 224 |  |             } | 
|  |  | 225 |  |         } | 
|  |  | 226 |  |  | 
|  |  | 227 |  |         /// <summary> | 
|  |  | 228 |  |         ///   The fully qualified Event Hubs namespace that the processor is associated with.  This is likely | 
|  |  | 229 |  |         ///   to be similar to <c>{yournamespace}.servicebus.windows.net</c>. | 
|  |  | 230 |  |         /// </summary> | 
|  |  | 231 |  |         /// | 
|  |  | 232 |  |         public new string FullyQualifiedNamespace | 
|  |  | 233 |  |         { | 
|  | 32 | 234 |  |             get => base.FullyQualifiedNamespace; | 
|  |  | 235 |  |         } | 
|  |  | 236 |  |  | 
|  |  | 237 |  |         /// <summary> | 
|  |  | 238 |  |         ///   The name of the Event Hub that the processor is connected to, specific to the | 
|  |  | 239 |  |         ///   Event Hubs namespace that contains it. | 
|  |  | 240 |  |         /// </summary> | 
|  |  | 241 |  |         /// | 
|  |  | 242 |  |         public new string EventHubName | 
|  |  | 243 |  |         { | 
|  | 118 | 244 |  |             get => base.EventHubName; | 
|  |  | 245 |  |         } | 
|  |  | 246 |  |  | 
|  |  | 247 |  |         /// <summary> | 
|  |  | 248 |  |         ///   The name of the consumer group this event processor is associated with.  Events will be | 
|  |  | 249 |  |         ///   read only in the context of this group. | 
|  |  | 250 |  |         /// </summary> | 
|  |  | 251 |  |         /// | 
|  |  | 252 |  |         public new string ConsumerGroup | 
|  |  | 253 |  |         { | 
|  | 118 | 254 |  |             get => base.ConsumerGroup; | 
|  |  | 255 |  |         } | 
|  |  | 256 |  |  | 
|  |  | 257 |  |         /// <summary> | 
|  |  | 258 |  |         ///   Indicates whether or not this event processor is currently running. | 
|  |  | 259 |  |         /// </summary> | 
|  |  | 260 |  |         /// | 
|  |  | 261 |  |         public new bool IsRunning | 
|  |  | 262 |  |         { | 
|  | 304 | 263 |  |             get => base.IsRunning; | 
|  | 0 | 264 |  |             protected set => base.IsRunning = value; | 
|  |  | 265 |  |         } | 
|  |  | 266 |  |  | 
|  |  | 267 |  |         /// <summary> | 
|  |  | 268 |  |         ///   A unique name used to identify this event processor. | 
|  |  | 269 |  |         /// </summary> | 
|  |  | 270 |  |         /// | 
|  | 86 | 271 |  |         public new string Identifier => base.Identifier; | 
|  |  | 272 |  |  | 
|  |  | 273 |  |         /// <summary> | 
|  |  | 274 |  |         ///   The instance of <see cref="EventProcessorClientEventSource" /> which can be mocked for testing. | 
|  |  | 275 |  |         /// </summary> | 
|  |  | 276 |  |         /// | 
|  | 0 | 277 |  |         internal EventProcessorClientEventSource Logger { get; set; } = EventProcessorClientEventSource.Log; | 
|  |  | 278 |  |  | 
|  |  | 279 |  |         /// <summary> | 
|  |  | 280 |  |         ///   Responsible for creation of checkpoints and for ownership claim. | 
|  |  | 281 |  |         /// </summary> | 
|  |  | 282 |  |         /// | 
|  | 56 | 283 |  |         private StorageManager StorageManager { get; } | 
|  |  | 284 |  |  | 
|  |  | 285 |  |         /// <summary> | 
|  |  | 286 |  |         ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class. | 
|  |  | 287 |  |         /// </summary> | 
|  |  | 288 |  |         /// | 
|  |  | 289 |  |         /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab | 
|  |  | 290 |  |         /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re | 
|  |  | 291 |  |         /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i | 
|  |  | 292 |  |         /// | 
|  |  | 293 |  |         /// <remarks> | 
|  |  | 294 |  |         ///   <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see | 
|  |  | 295 |  |         ///   does not assume the ability to manage the storage account and is safe to run with only read/write permissi | 
|  |  | 296 |  |         /// | 
|  |  | 297 |  |         ///   <para>If the connection string is copied from the Event Hubs namespace, it will likely not contain the nam | 
|  |  | 298 |  |         ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]] | 
|  |  | 299 |  |         ///   connection string.  For example, ";EntityPath=telemetry-hub". | 
|  |  | 300 |  |         /// | 
|  |  | 301 |  |         ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s | 
|  |  | 302 |  |         ///   Event Hub will result in a connection string that contains the name.</para> | 
|  |  | 303 |  |         /// </remarks> | 
|  |  | 304 |  |         /// | 
|  |  | 305 |  |         /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/> | 
|  |  | 306 |  |         /// | 
|  |  | 307 |  |         public EventProcessorClient(BlobContainerClient checkpointStore, | 
|  |  | 308 |  |                                     string consumerGroup, | 
|  | 4 | 309 |  |                                     string connectionString) : this(checkpointStore, consumerGroup, connectionString, nu | 
|  |  | 310 |  |         { | 
|  | 0 | 311 |  |         } | 
|  |  | 312 |  |  | 
|  |  | 313 |  |         /// <summary> | 
|  |  | 314 |  |         ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class. | 
|  |  | 315 |  |         /// </summary> | 
|  |  | 316 |  |         /// | 
|  |  | 317 |  |         /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab | 
|  |  | 318 |  |         /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re | 
|  |  | 319 |  |         /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i | 
|  |  | 320 |  |         /// <param name="clientOptions">The set of options to use for this processor.</param> | 
|  |  | 321 |  |         /// | 
|  |  | 322 |  |         /// <remarks> | 
|  |  | 323 |  |         ///   <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see | 
|  |  | 324 |  |         ///   does not assume the ability to manage the storage account and is safe to run with only read/write permissi | 
|  |  | 325 |  |         /// | 
|  |  | 326 |  |         ///   <para>If the connection string is copied from the Event Hubs namespace, it will likely not contain the nam | 
|  |  | 327 |  |         ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]] | 
|  |  | 328 |  |         ///   connection string.  For example, ";EntityPath=telemetry-hub". | 
|  |  | 329 |  |         /// | 
|  |  | 330 |  |         ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s | 
|  |  | 331 |  |         ///   Event Hub will result in a connection string that contains the name.</para> | 
|  |  | 332 |  |         /// </remarks> | 
|  |  | 333 |  |         /// | 
|  |  | 334 |  |         /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/> | 
|  |  | 335 |  |         /// | 
|  |  | 336 |  |         public EventProcessorClient(BlobContainerClient checkpointStore, | 
|  |  | 337 |  |                                     string consumerGroup, | 
|  |  | 338 |  |                                     string connectionString, | 
|  | 10 | 339 |  |                                     EventProcessorClientOptions clientOptions) : this(checkpointStore, consumerGroup, co | 
|  |  | 340 |  |         { | 
|  | 4 | 341 |  |         } | 
|  |  | 342 |  |  | 
|  |  | 343 |  |         /// <summary> | 
|  |  | 344 |  |         ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class. | 
|  |  | 345 |  |         /// </summary> | 
|  |  | 346 |  |         /// | 
|  |  | 347 |  |         /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab | 
|  |  | 348 |  |         /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re | 
|  |  | 349 |  |         /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i | 
|  |  | 350 |  |         /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param> | 
|  |  | 351 |  |         /// | 
|  |  | 352 |  |         /// <remarks> | 
|  |  | 353 |  |         ///   <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see | 
|  |  | 354 |  |         ///   does not assume the ability to manage the storage account and is safe to run with only read/write permissi | 
|  |  | 355 |  |         /// | 
|  |  | 356 |  |         ///   <para>If the connection string is copied from the Event Hub itself, it will contain the name of the desire | 
|  |  | 357 |  |         ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub  | 
|  |  | 358 |  |         ///   passed only once, either as part of the connection string or separately.</para> | 
|  |  | 359 |  |         /// </remarks> | 
|  |  | 360 |  |         /// | 
|  |  | 361 |  |         /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/> | 
|  |  | 362 |  |         /// | 
|  |  | 363 |  |         public EventProcessorClient(BlobContainerClient checkpointStore, | 
|  |  | 364 |  |                                     string consumerGroup, | 
|  |  | 365 |  |                                     string connectionString, | 
|  | 0 | 366 |  |                                     string eventHubName) : this(checkpointStore, consumerGroup, connectionString, eventH | 
|  |  | 367 |  |         { | 
|  | 0 | 368 |  |         } | 
|  |  | 369 |  |  | 
|  |  | 370 |  |         /// <summary> | 
|  |  | 371 |  |         ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class. | 
|  |  | 372 |  |         /// </summary> | 
|  |  | 373 |  |         /// | 
|  |  | 374 |  |         /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab | 
|  |  | 375 |  |         /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re | 
|  |  | 376 |  |         /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i | 
|  |  | 377 |  |         /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param> | 
|  |  | 378 |  |         /// <param name="clientOptions">The set of options to use for this processor.</param> | 
|  |  | 379 |  |         /// | 
|  |  | 380 |  |         /// <remarks> | 
|  |  | 381 |  |         ///   <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see | 
|  |  | 382 |  |         ///   does not assume the ability to manage the storage account and is safe to run with only read/write permissi | 
|  |  | 383 |  |         /// | 
|  |  | 384 |  |         ///   <para>If the connection string is copied from the Event Hub itself, it will contain the name of the desire | 
|  |  | 385 |  |         ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub  | 
|  |  | 386 |  |         ///   passed only once, either as part of the connection string or separately.</para> | 
|  |  | 387 |  |         /// </remarks> | 
|  |  | 388 |  |         /// | 
|  |  | 389 |  |         /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/> | 
|  |  | 390 |  |         /// | 
|  |  | 391 |  |         public EventProcessorClient(BlobContainerClient checkpointStore, | 
|  |  | 392 |  |                                     string consumerGroup, | 
|  |  | 393 |  |                                     string connectionString, | 
|  |  | 394 |  |                                     string eventHubName, | 
|  | 22 | 395 |  |                                     EventProcessorClientOptions clientOptions) : base((clientOptions ?? DefaultClientOpt | 
|  |  | 396 |  |         { | 
|  | 10 | 397 |  |             Argument.AssertNotNull(checkpointStore, nameof(checkpointStore)); | 
|  | 8 | 398 |  |             StorageManager = CreateStorageManager(checkpointStore); | 
|  | 8 | 399 |  |         } | 
|  |  | 400 |  |  | 
|  |  | 401 |  |         /// <summary> | 
|  |  | 402 |  |         ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class. | 
|  |  | 403 |  |         /// </summary> | 
|  |  | 404 |  |         /// | 
|  |  | 405 |  |         /// <param name="checkpointStore">The client responsible for persisting checkpoints and processor state to durab | 
|  |  | 406 |  |         /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re | 
|  |  | 407 |  |         /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel | 
|  |  | 408 |  |         /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param> | 
|  |  | 409 |  |         /// <param name="credential">The Azure identity credential to use for authorization.  Access controls may be spe | 
|  |  | 410 |  |         /// <param name="clientOptions">The set of options to use for this processor.</param> | 
|  |  | 411 |  |         /// | 
|  |  | 412 |  |         /// <remarks> | 
|  |  | 413 |  |         ///   The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see cref= | 
|  |  | 414 |  |         ///   does not assume the ability to manage the storage account and is safe to run without permission to manage  | 
|  |  | 415 |  |         /// </remarks> | 
|  |  | 416 |  |         /// | 
|  |  | 417 |  |         public EventProcessorClient(BlobContainerClient checkpointStore, | 
|  |  | 418 |  |                                     string consumerGroup, | 
|  |  | 419 |  |                                     string fullyQualifiedNamespace, | 
|  |  | 420 |  |                                     string eventHubName, | 
|  |  | 421 |  |                                     TokenCredential credential, | 
|  | 22 | 422 |  |                                     EventProcessorClientOptions clientOptions = default) : base((clientOptions ?? Defaul | 
|  |  | 423 |  |         { | 
|  | 6 | 424 |  |             Argument.AssertNotNull(checkpointStore, nameof(checkpointStore)); | 
|  | 4 | 425 |  |             StorageManager = CreateStorageManager(checkpointStore); | 
|  | 4 | 426 |  |         } | 
|  |  | 427 |  |  | 
|  |  | 428 |  |         /// <summary> | 
|  |  | 429 |  |         ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class. | 
|  |  | 430 |  |         /// </summary> | 
|  |  | 431 |  |         /// | 
|  |  | 432 |  |         /// <param name="storageManager">Responsible for creation of checkpoints and for ownership claim.</param> | 
|  |  | 433 |  |         /// <param name="consumerGroup">The name of the consumer group this processor is associated with.  Events are re | 
|  |  | 434 |  |         /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel | 
|  |  | 435 |  |         /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param> | 
|  |  | 436 |  |         /// <param name="cacheEventCount">The maximum number of events that will be read from the Event Hubs service and | 
|  |  | 437 |  |         /// <param name="credential">An Azure identity credential to satisfy base class requirements; this credential ma | 
|  |  | 438 |  |         /// <param name="clientOptions">The set of options to use for this processor.</param> | 
|  |  | 439 |  |         /// | 
|  |  | 440 |  |         /// <remarks> | 
|  |  | 441 |  |         ///   This constructor is intended only to support functional testing and mocking; it should not be used for pro | 
|  |  | 442 |  |         /// </remarks> | 
|  |  | 443 |  |         /// | 
|  |  | 444 |  |         internal EventProcessorClient(StorageManager storageManager, | 
|  |  | 445 |  |                                       string consumerGroup, | 
|  |  | 446 |  |                                       string fullyQualifiedNamespace, | 
|  |  | 447 |  |                                       string eventHubName, | 
|  |  | 448 |  |                                       int cacheEventCount, | 
|  |  | 449 |  |                                       TokenCredential credential, | 
|  | 88 | 450 |  |                                       EventProcessorOptions clientOptions) : base(cacheEventCount, consumerGroup, fullyQ | 
|  |  | 451 |  |         { | 
|  | 88 | 452 |  |             Argument.AssertNotNull(storageManager, nameof(storageManager)); | 
|  |  | 453 |  |  | 
|  | 88 | 454 |  |             DefaultStartingPosition = (clientOptions?.DefaultStartingPosition ?? DefaultStartingPosition); | 
|  | 88 | 455 |  |             StorageManager = storageManager; | 
|  | 88 | 456 |  |         } | 
|  |  | 457 |  |  | 
|  |  | 458 |  |         /// <summary> | 
|  |  | 459 |  |         ///   Initializes a new instance of the <see cref="EventProcessorClient"/> class. | 
|  |  | 460 |  |         /// </summary> | 
|  |  | 461 |  |         /// | 
|  | 0 | 462 |  |         protected EventProcessorClient() : base() | 
|  |  | 463 |  |         { | 
|  | 0 | 464 |  |         } | 
|  |  | 465 |  |  | 
|  |  | 466 |  |         /// <summary> | 
|  |  | 467 |  |         ///   Signals the <see cref="EventProcessorClient" /> to begin processing events.  Should this method be called  | 
|  |  | 468 |  |         ///   is running, no action is taken. | 
|  |  | 469 |  |         /// </summary> | 
|  |  | 470 |  |         /// | 
|  |  | 471 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 472 |  |         /// | 
|  |  | 473 |  |         public override async Task StartProcessingAsync(CancellationToken cancellationToken = default) | 
|  |  | 474 |  |         { | 
|  | 26 | 475 |  |             cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | 
|  | 24 | 476 |  |             var releaseGuard = false; | 
|  |  | 477 |  |  | 
|  |  | 478 |  |             try | 
|  |  | 479 |  |             { | 
|  | 24 | 480 |  |                 await ProcessorStatusGuard.WaitAsync(cancellationToken).ConfigureAwait(false); | 
|  | 24 | 481 |  |                 releaseGuard = true; | 
|  |  | 482 |  |  | 
|  | 24 | 483 |  |                 if (_processEventAsync == null) | 
|  |  | 484 |  |                 { | 
|  | 2 | 485 |  |                     throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartE | 
|  |  | 486 |  |                 } | 
|  |  | 487 |  |  | 
|  | 22 | 488 |  |                 if (_processErrorAsync == null) | 
|  |  | 489 |  |                 { | 
|  | 2 | 490 |  |                     throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartE | 
|  |  | 491 |  |                 } | 
|  |  | 492 |  |  | 
|  | 20 | 493 |  |                 await base.StartProcessingAsync(cancellationToken).ConfigureAwait(false); | 
|  | 20 | 494 |  |             } | 
|  | 0 | 495 |  |             catch (OperationCanceledException) | 
|  |  | 496 |  |             { | 
|  | 0 | 497 |  |                 throw new TaskCanceledException(); | 
|  |  | 498 |  |             } | 
|  |  | 499 |  |             finally | 
|  |  | 500 |  |             { | 
|  | 24 | 501 |  |                 if (releaseGuard) | 
|  |  | 502 |  |                 { | 
|  | 24 | 503 |  |                     ProcessorStatusGuard.Release(); | 
|  |  | 504 |  |                 } | 
|  |  | 505 |  |             } | 
|  | 20 | 506 |  |         } | 
|  |  | 507 |  |  | 
|  |  | 508 |  |         /// <summary> | 
|  |  | 509 |  |         ///   Signals the <see cref="EventProcessorClient" /> to begin processing events.  Should this method be called  | 
|  |  | 510 |  |         ///   is running, no action is taken. | 
|  |  | 511 |  |         /// </summary> | 
|  |  | 512 |  |         /// | 
|  |  | 513 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 514 |  |         /// | 
|  |  | 515 |  |         public override void StartProcessing(CancellationToken cancellationToken = default) | 
|  |  | 516 |  |         { | 
|  | 16 | 517 |  |             cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | 
|  | 14 | 518 |  |             var releaseGuard = false; | 
|  |  | 519 |  |  | 
|  |  | 520 |  |             try | 
|  |  | 521 |  |             { | 
|  | 14 | 522 |  |                 ProcessorStatusGuard.Wait(cancellationToken); | 
|  | 14 | 523 |  |                 releaseGuard = true; | 
|  |  | 524 |  |  | 
|  | 14 | 525 |  |                 if (_processEventAsync == null) | 
|  |  | 526 |  |                 { | 
|  | 2 | 527 |  |                     throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartE | 
|  |  | 528 |  |                 } | 
|  |  | 529 |  |  | 
|  | 12 | 530 |  |                 if (_processErrorAsync == null) | 
|  |  | 531 |  |                 { | 
|  | 2 | 532 |  |                     throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartE | 
|  |  | 533 |  |                 } | 
|  |  | 534 |  |  | 
|  | 10 | 535 |  |                 base.StartProcessing(cancellationToken); | 
|  | 10 | 536 |  |             } | 
|  | 0 | 537 |  |             catch (OperationCanceledException) | 
|  |  | 538 |  |             { | 
|  | 0 | 539 |  |                 throw new TaskCanceledException(); | 
|  |  | 540 |  |             } | 
|  |  | 541 |  |             finally | 
|  |  | 542 |  |             { | 
|  | 14 | 543 |  |                 if (releaseGuard) | 
|  |  | 544 |  |                 { | 
|  | 14 | 545 |  |                     ProcessorStatusGuard.Release(); | 
|  |  | 546 |  |                 } | 
|  | 14 | 547 |  |             } | 
|  | 10 | 548 |  |         } | 
|  |  | 549 |  |  | 
|  |  | 550 |  |         /// <summary> | 
|  |  | 551 |  |         ///   Signals the <see cref="EventProcessorClient" /> to stop processing events.  Should this method be called w | 
|  |  | 552 |  |         ///   is not running, no action is taken. | 
|  |  | 553 |  |         /// </summary> | 
|  |  | 554 |  |         /// | 
|  |  | 555 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 556 |  |         /// | 
|  | 26 | 557 |  |         public override Task StopProcessingAsync(CancellationToken cancellationToken = default) => base.StopProcessingAs | 
|  |  | 558 |  |  | 
|  |  | 559 |  |         /// <summary> | 
|  |  | 560 |  |         ///   Signals the <see cref="EventProcessorClient" /> to stop processing events.  Should this method be called w | 
|  |  | 561 |  |         ///   is not running, no action is taken. | 
|  |  | 562 |  |         /// </summary> | 
|  |  | 563 |  |         /// | 
|  |  | 564 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 565 |  |         /// | 
|  | 8 | 566 |  |         public override void StopProcessing(CancellationToken cancellationToken = default) => base.StopProcessing(cancel | 
|  |  | 567 |  |  | 
|  |  | 568 |  |         /// <summary> | 
|  |  | 569 |  |         ///   Determines whether the specified <see cref="System.Object" /> is equal to this instance. | 
|  |  | 570 |  |         /// </summary> | 
|  |  | 571 |  |         /// | 
|  |  | 572 |  |         /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param> | 
|  |  | 573 |  |         /// | 
|  |  | 574 |  |         /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c> | 
|  |  | 575 |  |         /// | 
|  |  | 576 |  |         [EditorBrowsable(EditorBrowsableState.Never)] | 
|  | 0 | 577 |  |         public override bool Equals(object obj) => base.Equals(obj); | 
|  |  | 578 |  |  | 
|  |  | 579 |  |         /// <summary> | 
|  |  | 580 |  |         ///   Returns a hash code for this instance. | 
|  |  | 581 |  |         /// </summary> | 
|  |  | 582 |  |         /// | 
|  |  | 583 |  |         /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha | 
|  |  | 584 |  |         /// | 
|  |  | 585 |  |         [EditorBrowsable(EditorBrowsableState.Never)] | 
|  | 0 | 586 |  |         public override int GetHashCode() => base.GetHashCode(); | 
|  |  | 587 |  |  | 
|  |  | 588 |  |         /// <summary> | 
|  |  | 589 |  |         ///   Converts the instance to string representation. | 
|  |  | 590 |  |         /// </summary> | 
|  |  | 591 |  |         /// | 
|  |  | 592 |  |         /// <returns>A <see cref="System.String" /> that represents this instance.</returns> | 
|  |  | 593 |  |         /// | 
|  |  | 594 |  |         [EditorBrowsable(EditorBrowsableState.Never)] | 
|  | 0 | 595 |  |         public override string ToString() => base.ToString(); | 
|  |  | 596 |  |  | 
|  |  | 597 |  |         /// <summary> | 
|  |  | 598 |  |         ///   Updates the checkpoint using the given information for the associated partition and consumer group in the  | 
|  |  | 599 |  |         /// </summary> | 
|  |  | 600 |  |         /// | 
|  |  | 601 |  |         /// <param name="eventData">The event containing the information to be stored in the checkpoint.</param> | 
|  |  | 602 |  |         /// <param name="context">The context of the partition the checkpoint is associated with.</param> | 
|  |  | 603 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 604 |  |         /// | 
|  |  | 605 |  |         internal Task UpdateCheckpointAsync(EventData eventData, | 
|  |  | 606 |  |                                             PartitionContext context, | 
|  |  | 607 |  |                                             CancellationToken cancellationToken) | 
|  |  | 608 |  |         { | 
|  | 14 | 609 |  |             cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | 
|  |  | 610 |  |  | 
|  | 12 | 611 |  |             Argument.AssertNotNull(eventData, nameof(eventData)); | 
|  | 12 | 612 |  |             Argument.AssertInRange(eventData.Offset, long.MinValue + 1, long.MaxValue, nameof(eventData.Offset)); | 
|  | 12 | 613 |  |             Argument.AssertInRange(eventData.SequenceNumber, long.MinValue + 1, long.MaxValue, nameof(eventData.Sequence | 
|  | 12 | 614 |  |             Argument.AssertNotNull(context, nameof(context)); | 
|  |  | 615 |  |  | 
|  | 12 | 616 |  |             Logger.UpdateCheckpointStart(context.PartitionId, Identifier, EventHubName, ConsumerGroup); | 
|  |  | 617 |  |  | 
|  | 12 | 618 |  |             using var scope = EventDataInstrumentation.ScopeFactory.CreateScope(DiagnosticProperty.EventProcessorCheckpo | 
|  | 12 | 619 |  |             scope.Start(); | 
|  |  | 620 |  |  | 
|  |  | 621 |  |             try | 
|  |  | 622 |  |             { | 
|  |  | 623 |  |                 // Parameter validation is done by Checkpoint constructor. | 
|  |  | 624 |  |  | 
|  | 12 | 625 |  |                 var checkpoint = new EventProcessorCheckpoint | 
|  | 12 | 626 |  |                 { | 
|  | 12 | 627 |  |                     FullyQualifiedNamespace = FullyQualifiedNamespace, | 
|  | 12 | 628 |  |                     EventHubName = EventHubName, | 
|  | 12 | 629 |  |                     ConsumerGroup = ConsumerGroup, | 
|  | 12 | 630 |  |                     PartitionId = context.PartitionId, | 
|  | 12 | 631 |  |                     StartingPosition = EventPosition.FromOffset(eventData.Offset) | 
|  | 12 | 632 |  |                 }; | 
|  |  | 633 |  |  | 
|  | 12 | 634 |  |                 return StorageManager.UpdateCheckpointAsync(checkpoint, eventData, cancellationToken); | 
|  |  | 635 |  |             } | 
|  | 2 | 636 |  |             catch (Exception ex) | 
|  |  | 637 |  |             { | 
|  |  | 638 |  |                 // In case of failure, there is no need to call the error handler because the exception can | 
|  |  | 639 |  |                 // be thrown directly to the caller here. | 
|  |  | 640 |  |  | 
|  | 2 | 641 |  |                 scope.Failed(ex); | 
|  | 2 | 642 |  |                 Logger.UpdateCheckpointError(context.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message); | 
|  |  | 643 |  |  | 
|  | 2 | 644 |  |                 throw; | 
|  |  | 645 |  |             } | 
|  |  | 646 |  |             finally | 
|  |  | 647 |  |             { | 
|  | 12 | 648 |  |                 Logger.UpdateCheckpointComplete(context.PartitionId, Identifier, EventHubName, ConsumerGroup); | 
|  | 12 | 649 |  |             } | 
|  | 10 | 650 |  |         } | 
|  |  | 651 |  |  | 
|  |  | 652 |  |         /// <summary> | 
|  |  | 653 |  |         ///   Creates an <see cref="EventHubConnection" /> to use for communicating with the Event Hubs service. | 
|  |  | 654 |  |         /// </summary> | 
|  |  | 655 |  |         /// | 
|  |  | 656 |  |         /// <returns>The requested <see cref="EventHubConnection" />.</returns> | 
|  |  | 657 |  |         /// | 
|  | 0 | 658 |  |         protected override EventHubConnection CreateConnection() => base.CreateConnection(); | 
|  |  | 659 |  |  | 
|  |  | 660 |  |         /// <summary> | 
|  |  | 661 |  |         ///   Produces a list of the available checkpoints for the Event Hub and consumer group associated with the | 
|  |  | 662 |  |         ///   event processor instance, so that processing for a given set of partitions can be properly initialized. | 
|  |  | 663 |  |         /// </summary> | 
|  |  | 664 |  |         /// | 
|  |  | 665 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 666 |  |         /// | 
|  |  | 667 |  |         /// <returns>The set of checkpoints for the processor to take into account when initializing partitions.</return | 
|  |  | 668 |  |         /// | 
|  |  | 669 |  |         /// <remarks> | 
|  |  | 670 |  |         ///   Should a partition not have a corresponding checkpoint, the <see cref="EventProcessorOptions.DefaultStarti | 
|  |  | 671 |  |         ///   be used to initialize the partition for processing. | 
|  |  | 672 |  |         /// | 
|  |  | 673 |  |         ///   In the event that a custom starting point is desired for a single partition, or each partition should star | 
|  |  | 674 |  |         ///   it is recommended that this method express that intent by returning checkpoints for those partitions with  | 
|  |  | 675 |  |         ///   starting location set. | 
|  |  | 676 |  |         /// </remarks> | 
|  |  | 677 |  |         /// | 
|  |  | 678 |  |         protected override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken canc | 
|  |  | 679 |  |         { | 
|  | 8 | 680 |  |             var checkpoints = await StorageManager.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerG | 
|  |  | 681 |  |  | 
|  |  | 682 |  |             // If there was no initialization handler, no custom starting positions | 
|  |  | 683 |  |             // could have been specified.  Return the checkpoints without further processing. | 
|  |  | 684 |  |  | 
|  | 8 | 685 |  |             if (_partitionInitializingAsync == null) | 
|  |  | 686 |  |             { | 
|  | 4 | 687 |  |                 return checkpoints; | 
|  |  | 688 |  |             } | 
|  |  | 689 |  |  | 
|  |  | 690 |  |             // Process the checkpoints to inject mock checkpoints for partitions that | 
|  |  | 691 |  |             // specify a custom default and do not have an actual checkpoint. | 
|  |  | 692 |  |  | 
|  | 4 | 693 |  |             return ProcessCheckpointStartingPositions(checkpoints); | 
|  | 8 | 694 |  |         } | 
|  |  | 695 |  |  | 
|  |  | 696 |  |         /// <summary> | 
|  |  | 697 |  |         ///   Produces a list of the ownership assignments for partitions between each of the cooperating event processo | 
|  |  | 698 |  |         ///   instances for a given Event Hub and consumer group pairing.  This method is used when load balancing to al | 
|  |  | 699 |  |         ///   the processor to discover other active collaborators and to make decisions about how to best balance work | 
|  |  | 700 |  |         ///   between them. | 
|  |  | 701 |  |         /// </summary> | 
|  |  | 702 |  |         /// | 
|  |  | 703 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 704 |  |         /// | 
|  |  | 705 |  |         /// <returns>The set of ownership records to take into account when making load balancing decisions.</returns> | 
|  |  | 706 |  |         /// | 
|  |  | 707 |  |         protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken canc | 
|  | 2 | 708 |  |             StorageManager.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken); | 
|  |  | 709 |  |  | 
|  |  | 710 |  |         /// <summary> | 
|  |  | 711 |  |         ///   Attempts to claim ownership of the specified partitions for processing.  This method is used by | 
|  |  | 712 |  |         ///   load balancing to allow event processor instances to distribute the responsibility for processing | 
|  |  | 713 |  |         ///   partitions for a given Event Hub and consumer group pairing amongst the active event processors. | 
|  |  | 714 |  |         /// </summary> | 
|  |  | 715 |  |         /// | 
|  |  | 716 |  |         /// <param name="desiredOwnership">The set of partition ownership desired by the event processor instance; this  | 
|  |  | 717 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 718 |  |         /// | 
|  |  | 719 |  |         /// <returns>The set of ownership records for the partitions that were successfully claimed; this is expected to | 
|  |  | 720 |  |         /// | 
|  |  | 721 |  |         protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProc | 
|  |  | 722 |  |                                                                                                    CancellationToken can | 
|  | 28 | 723 |  |             StorageManager.ClaimOwnershipAsync(desiredOwnership, cancellationToken); | 
|  |  | 724 |  |  | 
|  |  | 725 |  |         /// <summary> | 
|  |  | 726 |  |         ///   Performs the tasks needed to process a batch of events for a given partition as they are read from the Eve | 
|  |  | 727 |  |         /// </summary> | 
|  |  | 728 |  |         /// | 
|  |  | 729 |  |         /// <param name="events">The batch of events to be processed.</param> | 
|  |  | 730 |  |         /// <param name="partition">The context of the partition from which the events were read.</param> | 
|  |  | 731 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 732 |  |         /// | 
|  |  | 733 |  |         /// <remarks> | 
|  |  | 734 |  |         ///   <para>The number of events in the <paramref name="events"/> batch may vary.  The batch will contain a numb | 
|  |  | 735 |  |         ///   requested when the processor was created, depending on the availability of events in the partition within  | 
|  |  | 736 |  |         ///   interval. | 
|  |  | 737 |  |         /// | 
|  |  | 738 |  |         ///   If there are enough events available in the Event Hub partition to fill a batch of the requested size, the | 
|  |  | 739 |  |         ///   immediately.  If there were not a sufficient number of events available in the partition to populate a ful | 
|  |  | 740 |  |         ///   to reach the requested batch size until the <see cref="EventProcessorOptions.MaximumWaitTime"/> has elapse | 
|  |  | 741 |  |         ///   available by the end of that period. | 
|  |  | 742 |  |         /// | 
|  |  | 743 |  |         ///   If a <see cref="EventProcessorOptions.MaximumWaitTime"/> was not requested, indicated by setting the optio | 
|  |  | 744 |  |         ///   partition until a full batch of the requested size could be populated and will not dispatch any partial ba | 
|  |  | 745 |  |         /// | 
|  |  | 746 |  |         ///   <para>Should an exception occur within the code for this method, the event processor will allow it to bubb | 
|  |  | 747 |  |         ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account and guard agai | 
|  |  | 748 |  |         /// | 
|  |  | 749 |  |         ///   <para>It is not recommended that the state of the processor be managed directly from within this method; r | 
|  |  | 750 |  |         ///   a deadlock scenario, especially if using the synchronous form of the call.</para> | 
|  |  | 751 |  |         /// </remarks> | 
|  |  | 752 |  |         /// | 
|  |  | 753 |  |         protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, | 
|  |  | 754 |  |                                                                   EventProcessorPartition partition, | 
|  |  | 755 |  |                                                                   CancellationToken cancellationToken) | 
|  |  | 756 |  |         { | 
|  |  | 757 |  |             cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | 
|  |  | 758 |  |  | 
|  |  | 759 |  |             var context = default(PartitionContext); | 
|  |  | 760 |  |             var eventArgs = default(ProcessEventArgs); | 
|  |  | 761 |  |             var caughtExceptions = default(List<Exception>); | 
|  |  | 762 |  |             var emptyBatch = true; | 
|  |  | 763 |  |  | 
|  |  | 764 |  |             try | 
|  |  | 765 |  |             { | 
|  |  | 766 |  |                 Logger.EventBatchProcessingStart(partition.PartitionId, Identifier, EventHubName, ConsumerGroup); | 
|  |  | 767 |  |  | 
|  |  | 768 |  |                 // Attempt to process each event in the batch, marking if the batch was non-empty.  Exceptions during | 
|  |  | 769 |  |                 // processing should be logged and cached, as the batch must be processed completely to avoid losing eve | 
|  |  | 770 |  |  | 
|  |  | 771 |  |                 foreach (var eventData in events) | 
|  |  | 772 |  |                 { | 
|  |  | 773 |  |                     emptyBatch = false; | 
|  |  | 774 |  |  | 
|  |  | 775 |  |                     try | 
|  |  | 776 |  |                     { | 
|  | 0 | 777 |  |                         context ??= new ProcessorPartitionContext(partition.PartitionId, () => ReadLastEnqueuedEventProp | 
|  | 38 | 778 |  |                         eventArgs = new ProcessEventArgs(context, eventData, updateToken => UpdateCheckpointAsync(eventD | 
|  |  | 779 |  |  | 
|  |  | 780 |  |                         await _processEventAsync(eventArgs).ConfigureAwait(false); | 
|  |  | 781 |  |                     } | 
|  |  | 782 |  |                     catch (Exception ex) when (!(ex is TaskCanceledException)) | 
|  |  | 783 |  |                     { | 
|  |  | 784 |  |                         // This exception is not surfaced to the error handler or bubbled, as the entire batch must be | 
|  |  | 785 |  |                         // processed or events will be lost.  Preserve the exceptions, should any occur. | 
|  |  | 786 |  |  | 
|  |  | 787 |  |                         Logger.EventBatchProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, | 
|  |  | 788 |  |  | 
|  |  | 789 |  |                         caughtExceptions ??= new List<Exception>(); | 
|  |  | 790 |  |                         caughtExceptions.Add(ex); | 
|  |  | 791 |  |                     } | 
|  |  | 792 |  |                 } | 
|  |  | 793 |  |  | 
|  |  | 794 |  |                 // If the event batch was empty, then dispatch to the handler; the base class will ensure that empty bat | 
|  |  | 795 |  |                 // are requested and will not invoke this method should empties not be sent to the handler. | 
|  |  | 796 |  |  | 
|  |  | 797 |  |                 if (emptyBatch) | 
|  |  | 798 |  |                 { | 
|  |  | 799 |  |                     eventArgs = new ProcessEventArgs(new EmptyPartitionContext(partition.PartitionId), null, EmptyEventU | 
|  |  | 800 |  |                     await _processEventAsync(eventArgs).ConfigureAwait(false); | 
|  |  | 801 |  |                 } | 
|  |  | 802 |  |             } | 
|  |  | 803 |  |             catch (Exception ex) when (!(ex is TaskCanceledException)) | 
|  |  | 804 |  |             { | 
|  |  | 805 |  |                 // This exception was either not related to processing events or was the result of sending an empty batc | 
|  |  | 806 |  |                 // processed.  Since there would be no other caught exceptions, tread this like a single case. | 
|  |  | 807 |  |  | 
|  |  | 808 |  |                 Logger.EventBatchProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Mess | 
|  |  | 809 |  |                 throw; | 
|  |  | 810 |  |             } | 
|  |  | 811 |  |             finally | 
|  |  | 812 |  |             { | 
|  |  | 813 |  |                 Logger.EventBatchProcessingComplete(partition.PartitionId, Identifier, EventHubName, ConsumerGroup); | 
|  |  | 814 |  |             } | 
|  |  | 815 |  |  | 
|  |  | 816 |  |             // Deal with any exceptions that occurred while processing the batch.  If more than one was | 
|  |  | 817 |  |             // present, preserve them in an aggregate exception for transport. | 
|  |  | 818 |  |  | 
|  |  | 819 |  |             if (caughtExceptions != null) | 
|  |  | 820 |  |             { | 
|  |  | 821 |  |                 if (caughtExceptions.Count == 1) | 
|  |  | 822 |  |                 { | 
|  |  | 823 |  |                     ExceptionDispatchInfo.Capture(caughtExceptions[0]).Throw(); | 
|  |  | 824 |  |                 } | 
|  |  | 825 |  |  | 
|  |  | 826 |  |                 throw new AggregateException(Resources.AggregateEventProcessingExceptionMessage, caughtExceptions); | 
|  |  | 827 |  |             } | 
|  |  | 828 |  |         } | 
|  |  | 829 |  |  | 
|  |  | 830 |  |         /// <summary> | 
|  |  | 831 |  |         ///   Performs the tasks needed when an unexpected exception occurs within the operation of the | 
|  |  | 832 |  |         ///   event processor infrastructure. | 
|  |  | 833 |  |         /// </summary> | 
|  |  | 834 |  |         /// | 
|  |  | 835 |  |         /// <param name="exception">The exception that occurred during operation of the event processor.</param> | 
|  |  | 836 |  |         /// <param name="partition">The context of the partition associated with the error, if any; otherwise, <c>null</ | 
|  |  | 837 |  |         /// <param name="operationDescription">A short textual description of the operation during which the exception o | 
|  |  | 838 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 839 |  |         /// | 
|  |  | 840 |  |         /// <remarks> | 
|  |  | 841 |  |         ///   This error handler is invoked when there is an exception observed within the event processor itself; it is | 
|  |  | 842 |  |         ///   code that has been implemented to process events or other overrides and extension points that are not crit | 
|  |  | 843 |  |         ///   The event processor will make every effort to recover from exceptions and continue processing.  Should an  | 
|  |  | 844 |  |         ///   from be encountered, the processor will attempt to forfeit ownership of all partitions that it was process | 
|  |  | 845 |  |         /// | 
|  |  | 846 |  |         ///   The exceptions surfaced to this method may be fatal or non-fatal; because the processor may not be able to | 
|  |  | 847 |  |         ///   exception was fatal or whether its state was corrupted, this method has responsibility for making the dete | 
|  |  | 848 |  |         ///   should be terminated or restarted.  The method may do so by calling Stop on the processor instance and the | 
|  |  | 849 |  |         /// | 
|  |  | 850 |  |         ///   It is recommended that, for production scenarios, the decision be made by considering observations made by | 
|  |  | 851 |  |         ///   when initializing processing for a partition, and the method invoked when processing for a partition is st | 
|  |  | 852 |  |         ///   data from their monitoring platforms in this decision as well. | 
|  |  | 853 |  |         /// | 
|  |  | 854 |  |         ///   As with event processing, should an exception occur in the code for the error handler, the event processor | 
|  |  | 855 |  |         ///   it in any way.  Developers are strongly encouraged to take exception scenarios into account and guard agai | 
|  |  | 856 |  |         /// </remarks> | 
|  |  | 857 |  |         /// | 
|  |  | 858 |  |         protected override async Task OnProcessingErrorAsync(Exception exception, | 
|  |  | 859 |  |                                                              EventProcessorPartition partition, | 
|  |  | 860 |  |                                                              string operationDescription, | 
|  |  | 861 |  |                                                              CancellationToken cancellationToken) | 
|  |  | 862 |  |         { | 
|  | 4 | 863 |  |             var eventArgs = new ProcessErrorEventArgs(partition?.PartitionId, operationDescription, exception, cancellat | 
|  | 4 | 864 |  |             await _processErrorAsync(eventArgs).ConfigureAwait(false); | 
|  | 4 | 865 |  |         } | 
|  |  | 866 |  |  | 
|  |  | 867 |  |         /// <summary> | 
|  |  | 868 |  |         ///   Performs the tasks to initialize a partition, and its associated context, for event processing. | 
|  |  | 869 |  |         /// </summary> | 
|  |  | 870 |  |         /// | 
|  |  | 871 |  |         /// <param name="partition">The context of the partition being initialized.  Only the well-known members of the  | 
|  |  | 872 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 873 |  |         /// | 
|  |  | 874 |  |         /// <remarks> | 
|  |  | 875 |  |         ///   It is not recommended that the state of the processor be managed directly from within this method; request | 
|  |  | 876 |  |         ///   a deadlock scenario, especially if using the synchronous form of the call. | 
|  |  | 877 |  |         /// </remarks> | 
|  |  | 878 |  |         /// | 
|  |  | 879 |  |         protected override async Task OnInitializingPartitionAsync(EventProcessorPartition partition, | 
|  |  | 880 |  |                                                                    CancellationToken cancellationToken) | 
|  |  | 881 |  |         { | 
|  |  | 882 |  |             // Handlers cannot be changed while the processor is running; it is safe to check and call | 
|  |  | 883 |  |             // without capturing a local reference. | 
|  |  | 884 |  |  | 
|  | 8 | 885 |  |             if (_partitionInitializingAsync != null) | 
|  |  | 886 |  |             { | 
|  | 6 | 887 |  |                 var eventArgs = new PartitionInitializingEventArgs(partition.PartitionId, DefaultStartingPosition, cance | 
|  | 6 | 888 |  |                 await _partitionInitializingAsync(eventArgs).ConfigureAwait(false); | 
|  |  | 889 |  |  | 
|  | 6 | 890 |  |                 PartitionStartingPositionDefaults[partition.PartitionId] = eventArgs.DefaultStartingPosition; | 
|  | 6 | 891 |  |             } | 
|  | 8 | 892 |  |         } | 
|  |  | 893 |  |  | 
|  |  | 894 |  |         /// <summary> | 
|  |  | 895 |  |         ///   Performs the tasks needed when processing for a partition is being stopped.  This commonly occurs when the | 
|  |  | 896 |  |         ///   is claimed by another event processor instance or when the current event processor instance is shutting do | 
|  |  | 897 |  |         /// </summary> | 
|  |  | 898 |  |         /// | 
|  |  | 899 |  |         /// <param name="partition">The context of the partition for which processing is being stopped.</param> | 
|  |  | 900 |  |         /// <param name="reason">The reason that processing is being stopped for the partition.</param> | 
|  |  | 901 |  |         /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t | 
|  |  | 902 |  |         /// | 
|  |  | 903 |  |         /// <remarks> | 
|  |  | 904 |  |         ///   It is not recommended that the state of the processor be managed directly from within this method; request | 
|  |  | 905 |  |         ///   a deadlock scenario, especially if using the synchronous form of the call. | 
|  |  | 906 |  |         /// </remarks> | 
|  |  | 907 |  |         /// | 
|  |  | 908 |  |         protected override async Task OnPartitionProcessingStoppedAsync(EventProcessorPartition partition, | 
|  |  | 909 |  |                                                                         ProcessingStoppedReason reason, | 
|  |  | 910 |  |                                                                         CancellationToken cancellationToken) | 
|  |  | 911 |  |         { | 
|  |  | 912 |  |             // Handlers cannot be changed while the processor is running; it is safe to check and call | 
|  |  | 913 |  |             // without capturing a local reference. | 
|  |  | 914 |  |  | 
|  | 2 | 915 |  |             if (_partitionClosingAsync != null) | 
|  |  | 916 |  |             { | 
|  | 2 | 917 |  |                 var eventArgs = new PartitionClosingEventArgs(partition.PartitionId, reason, cancellationToken); | 
|  | 2 | 918 |  |                 await _partitionClosingAsync(eventArgs).ConfigureAwait(false); | 
|  |  | 919 |  |             } | 
|  |  | 920 |  |  | 
|  | 2 | 921 |  |             PartitionStartingPositionDefaults.TryRemove(partition.PartitionId, out var _); | 
|  | 2 | 922 |  |         } | 
|  |  | 923 |  |  | 
|  |  | 924 |  |         /// <summary> | 
|  |  | 925 |  |         ///   Creates a <see cref="StorageManager" /> to use for interacting with durable storage. | 
|  |  | 926 |  |         /// </summary> | 
|  |  | 927 |  |         /// | 
|  |  | 928 |  |         /// <param name="checkpointStore">The client responsible for interaction with durable storage, responsible for p | 
|  |  | 929 |  |         /// | 
|  |  | 930 |  |         /// <returns>A <see cref="StorageManager" /> with the requested configuration.</returns> | 
|  |  | 931 |  |         /// | 
|  | 12 | 932 |  |         private StorageManager CreateStorageManager(BlobContainerClient checkpointStore) => new BlobsCheckpointStore(che | 
|  |  | 933 |  |  | 
|  |  | 934 |  |         /// <summary> | 
|  |  | 935 |  |         ///   Processes the starting positions for checkpoints, ensuring that any overrides set by the <see cref="Partit | 
|  |  | 936 |  |         ///   handler are respected when no natural checkpoint exists for the partition. | 
|  |  | 937 |  |         /// </summary> | 
|  |  | 938 |  |         /// | 
|  |  | 939 |  |         /// <param name="sourceCheckpoints">The checkpoint set to process.</param> | 
|  |  | 940 |  |         /// | 
|  |  | 941 |  |         /// <returns>An enumerable consisting of the <paramref name="sourceCheckpoints"/> and a set of artificial checkp | 
|  |  | 942 |  |         /// | 
|  |  | 943 |  |         private IEnumerable<EventProcessorCheckpoint> ProcessCheckpointStartingPositions(IEnumerable<EventProcessorCheck | 
|  |  | 944 |  |         { | 
|  | 4 | 945 |  |             var knownCheckpoints = new HashSet<string>(); | 
|  |  | 946 |  |  | 
|  |  | 947 |  |             // Return the checkpoints and track which partitions they belong to. | 
|  |  | 948 |  |  | 
|  | 28 | 949 |  |             foreach (var checkpoint in sourceCheckpoints) | 
|  |  | 950 |  |             { | 
|  | 10 | 951 |  |                 knownCheckpoints.Add(checkpoint.PartitionId); | 
|  | 10 | 952 |  |                 yield return checkpoint; | 
|  |  | 953 |  |             } | 
|  |  | 954 |  |  | 
|  |  | 955 |  |             // For any partitions with custom default starting point, emit an artificial | 
|  |  | 956 |  |             // checkpoint if a natural checkpoint did not exist. | 
|  |  | 957 |  |  | 
|  | 16 | 958 |  |             foreach (var partition in PartitionStartingPositionDefaults.Keys) | 
|  |  | 959 |  |             { | 
|  | 4 | 960 |  |                 if (!knownCheckpoints.Contains(partition)) | 
|  |  | 961 |  |                 { | 
|  | 2 | 962 |  |                     yield return new EventProcessorCheckpoint | 
|  | 2 | 963 |  |                     { | 
|  | 2 | 964 |  |                        FullyQualifiedNamespace = FullyQualifiedNamespace, | 
|  | 2 | 965 |  |                        EventHubName = EventHubName, | 
|  | 2 | 966 |  |                        ConsumerGroup = ConsumerGroup, | 
|  | 2 | 967 |  |                        PartitionId = partition, | 
|  | 2 | 968 |  |                        StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partition, out EventPosition pos | 
|  | 2 | 969 |  |                     }; | 
|  |  | 970 |  |                 } | 
|  |  | 971 |  |             } | 
|  | 4 | 972 |  |         } | 
|  |  | 973 |  |  | 
|  |  | 974 |  |         /// <summary> | 
|  |  | 975 |  |         ///   Invokes a specified action only if this <see cref="EventProcessorClient" /> instance is not running. | 
|  |  | 976 |  |         /// </summary> | 
|  |  | 977 |  |         /// | 
|  |  | 978 |  |         /// <param name="action">The action to invoke.</param> | 
|  |  | 979 |  |         /// | 
|  |  | 980 |  |         /// <exception cref="InvalidOperationException">Occurs when this method is invoked while the event processor is  | 
|  |  | 981 |  |         /// | 
|  |  | 982 |  |         private void EnsureNotRunningAndInvoke(Action action) | 
|  |  | 983 |  |         { | 
|  | 132 | 984 |  |             var releaseGuard = false; | 
|  |  | 985 |  |  | 
|  | 132 | 986 |  |             if (!IsRunning) | 
|  |  | 987 |  |             { | 
|  |  | 988 |  |                 try | 
|  |  | 989 |  |                 { | 
|  | 124 | 990 |  |                     ProcessorStatusGuard.Wait(); | 
|  | 124 | 991 |  |                     releaseGuard = true; | 
|  |  | 992 |  |  | 
|  | 124 | 993 |  |                     if (!IsRunning) | 
|  |  | 994 |  |                     { | 
|  | 124 | 995 |  |                         action?.Invoke(); | 
|  | 124 | 996 |  |                         return; | 
|  |  | 997 |  |                     } | 
|  | 0 | 998 |  |                 } | 
|  |  | 999 |  |                 finally | 
|  |  | 1000 |  |                 { | 
|  | 124 | 1001 |  |                     if (releaseGuard) | 
|  |  | 1002 |  |                     { | 
|  | 124 | 1003 |  |                         ProcessorStatusGuard.Release(); | 
|  |  | 1004 |  |                     } | 
|  | 124 | 1005 |  |                 } | 
|  |  | 1006 |  |             } | 
|  |  | 1007 |  |  | 
|  | 8 | 1008 |  |             throw new InvalidOperationException(Resources.RunningEventProcessorCannotPerformOperation); | 
|  | 124 | 1009 |  |         } | 
|  |  | 1010 |  |  | 
|  |  | 1011 |  |         /// <summary> | 
|  |  | 1012 |  |         ///   Creates the set of options to pass to the base <see cref="EventProcessorClient" />. | 
|  |  | 1013 |  |         /// </summary> | 
|  |  | 1014 |  |         /// | 
|  |  | 1015 |  |         /// <param name="clientOptions">The set of client options for the <see cref="EventProcessorClient" /> instance.< | 
|  |  | 1016 |  |         /// | 
|  |  | 1017 |  |         /// <returns>The set of options to use for the base processor.</returns> | 
|  |  | 1018 |  |         /// | 
|  |  | 1019 |  |         private static EventProcessorOptions CreateOptions(EventProcessorClientOptions clientOptions) | 
|  |  | 1020 |  |         { | 
|  | 48 | 1021 |  |             clientOptions ??= DefaultClientOptions; | 
|  |  | 1022 |  |  | 
|  | 48 | 1023 |  |             return new EventProcessorOptions | 
|  | 48 | 1024 |  |             { | 
|  | 48 | 1025 |  |                 ConnectionOptions = clientOptions.ConnectionOptions.Clone(), | 
|  | 48 | 1026 |  |                 RetryOptions = clientOptions.RetryOptions.Clone(), | 
|  | 48 | 1027 |  |                 Identifier = clientOptions.Identifier, | 
|  | 48 | 1028 |  |                 MaximumWaitTime = clientOptions.MaximumWaitTime, | 
|  | 48 | 1029 |  |                 TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties, | 
|  | 48 | 1030 |  |                 LoadBalancingStrategy = clientOptions.LoadBalancingStrategy, | 
|  | 48 | 1031 |  |                 PrefetchCount = clientOptions.PrefetchCount | 
|  | 48 | 1032 |  |             }; | 
|  |  | 1033 |  |         } | 
|  |  | 1034 |  |  | 
|  |  | 1035 |  |         /// <summary> | 
|  |  | 1036 |  |         ///   Represents a basic partition context for event processing within the processor client. | 
|  |  | 1037 |  |         /// </summary> | 
|  |  | 1038 |  |         /// | 
|  |  | 1039 |  |         /// <seealso cref="Azure.Messaging.EventHubs.Consumer.PartitionContext" /> | 
|  |  | 1040 |  |         /// | 
|  |  | 1041 |  |         private class ProcessorPartitionContext : PartitionContext | 
|  |  | 1042 |  |         { | 
|  |  | 1043 |  |             /// <summary>A function that can be used to read the last enqueued event properties for the partition.</summ | 
|  |  | 1044 |  |             private Func<LastEnqueuedEventProperties> _readLastEnqueuedEventProperties; | 
|  |  | 1045 |  |  | 
|  |  | 1046 |  |             /// <summary> | 
|  |  | 1047 |  |             ///   Initializes a new instance of the <see cref="EmptyPartitionContext" /> class. | 
|  |  | 1048 |  |             /// </summary> | 
|  |  | 1049 |  |             /// | 
|  |  | 1050 |  |             /// <param name="partitionId">The identifier of the partition that the context represents.</param> | 
|  |  | 1051 |  |             /// <param name="readLastEnqueuedEventProperties">A function that can be used to read the last enqueued even | 
|  |  | 1052 |  |             /// | 
|  |  | 1053 |  |             public ProcessorPartitionContext(string partitionId, | 
|  | 10 | 1054 |  |                                              Func<LastEnqueuedEventProperties> readLastEnqueuedEventProperties) : base(p | 
|  |  | 1055 |  |             { | 
|  | 10 | 1056 |  |                 _readLastEnqueuedEventProperties = readLastEnqueuedEventProperties; | 
|  | 10 | 1057 |  |             } | 
|  |  | 1058 |  |  | 
|  |  | 1059 |  |             /// <summary> | 
|  |  | 1060 |  |             ///   A set of information about the last enqueued event of a partition, not available for the | 
|  |  | 1061 |  |             ///   empty context. | 
|  |  | 1062 |  |             /// </summary> | 
|  |  | 1063 |  |             /// | 
|  |  | 1064 |  |             /// <returns>The set of properties for the last event that was enqueued to the partition.</returns> | 
|  |  | 1065 |  |             /// | 
|  | 0 | 1066 |  |             public override LastEnqueuedEventProperties ReadLastEnqueuedEventProperties() => _readLastEnqueuedEventPrope | 
|  |  | 1067 |  |         } | 
|  |  | 1068 |  |  | 
|  |  | 1069 |  |         /// <summary> | 
|  |  | 1070 |  |         ///   Represents a basic partition context for event processing when the | 
|  |  | 1071 |  |         ///   full context was not available. | 
|  |  | 1072 |  |         /// </summary> | 
|  |  | 1073 |  |         /// | 
|  |  | 1074 |  |         /// <seealso cref="Azure.Messaging.EventHubs.Consumer.PartitionContext" /> | 
|  |  | 1075 |  |         /// | 
|  |  | 1076 |  |         private class EmptyPartitionContext : PartitionContext | 
|  |  | 1077 |  |         { | 
|  |  | 1078 |  |             /// <summary> | 
|  |  | 1079 |  |             ///   Initializes a new instance of the <see cref="EmptyPartitionContext" /> class. | 
|  |  | 1080 |  |             /// </summary> | 
|  |  | 1081 |  |             /// | 
|  |  | 1082 |  |             /// <param name="partitionId">The identifier of the partition that the context represents.</param> | 
|  |  | 1083 |  |             /// | 
|  | 2 | 1084 |  |             public EmptyPartitionContext(string partitionId) : base(partitionId) | 
|  |  | 1085 |  |             { | 
|  | 2 | 1086 |  |             } | 
|  |  | 1087 |  |  | 
|  |  | 1088 |  |             /// <summary> | 
|  |  | 1089 |  |             ///   A set of information about the last enqueued event of a partition, not available for the | 
|  |  | 1090 |  |             ///   empty context. | 
|  |  | 1091 |  |             /// </summary> | 
|  |  | 1092 |  |             /// | 
|  |  | 1093 |  |             /// <returns>The set of properties for the last event that was enqueued to the partition.</returns> | 
|  |  | 1094 |  |             /// | 
|  |  | 1095 |  |             /// <exception cref="InvalidOperationException">The method call is not available on the <see cref="EmptyPart | 
|  |  | 1096 |  |             /// | 
|  |  | 1097 |  |             public override LastEnqueuedEventProperties ReadLastEnqueuedEventProperties() => | 
|  | 0 | 1098 |  |                 throw new InvalidOperationException(Resources.CannotReadLastEnqueuedEventPropertiesWithoutEvent); | 
|  |  | 1099 |  |         } | 
|  |  | 1100 |  |     } | 
|  |  | 1101 |  | } |