| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Concurrent; |
| | 6 | | using System.Collections.Generic; |
| | 7 | | using System.ComponentModel; |
| | 8 | | using System.Diagnostics.CodeAnalysis; |
| | 9 | | using System.Globalization; |
| | 10 | | using System.Linq; |
| | 11 | | using System.Runtime.ExceptionServices; |
| | 12 | | using System.Threading; |
| | 13 | | using System.Threading.Tasks; |
| | 14 | | using Azure.Core; |
| | 15 | | using Azure.Core.Diagnostics; |
| | 16 | | using Azure.Core.Pipeline; |
| | 17 | | using Azure.Messaging.EventHubs.Consumer; |
| | 18 | | using Azure.Messaging.EventHubs.Core; |
| | 19 | | using Azure.Messaging.EventHubs.Diagnostics; |
| | 20 | | using Azure.Messaging.EventHubs.Processor; |
| | 21 | |
|
| | 22 | | namespace Azure.Messaging.EventHubs.Primitives |
| | 23 | | { |
| | 24 | | /// <summary> |
| | 25 | | /// Provides a base for creating a custom processor for consuming events across all partitions of a given Event Hu |
| | 26 | | /// within the scope of a specific consumer group. The processor is capable of collaborating with other instances |
| | 27 | | /// the same Event Hub and consumer group pairing to share work by using a common storage platform to communicate. |
| | 28 | | /// tolerance is also built-in, allowing the processor to be resilient in the face of errors. |
| | 29 | | /// </summary> |
| | 30 | | /// |
| | 31 | | /// <typeparam name="TPartition">The context of the partition for which an operation is being performed.</typeparam> |
| | 32 | | /// |
| | 33 | | [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] |
| | 34 | | public abstract class EventProcessor<TPartition> where TPartition : EventProcessorPartition, new() |
| | 35 | | { |
| | 36 | | /// <summary>The maximum number of failed consumers to allow when processing a partition; failed consumers are t |
| | 37 | | private const int MaximumFailedConsumerCount = 1; |
| | 38 | |
|
| | 39 | | /// <summary>The primitive for synchronizing access when starting and stopping the processor.</summary> |
| 0 | 40 | | private readonly SemaphoreSlim ProcessorRunningGuard = new SemaphoreSlim(1, 1); |
| | 41 | |
|
| | 42 | | /// <summary>Indicates whether or not this event processor is currently running. Used only for mocking purposes |
| | 43 | | private bool? _isRunningOverride; |
| | 44 | |
|
| | 45 | | /// <summary>Indicates the current state of the processor; used for mocking and manual status updates.</summary> |
| | 46 | | private EventProcessorStatus? _statusOverride; |
| | 47 | |
|
| | 48 | | /// <summary>The task responsible for managing the operations of the processor when it is running.</summary> |
| | 49 | | private Task _runningProcessorTask; |
| | 50 | |
|
| | 51 | | /// <summary>A <see cref="CancellationTokenSource"/> instance to signal the request to cancel the current runnin |
| | 52 | | private CancellationTokenSource _runningProcessorCancellationSource; |
| | 53 | |
|
| | 54 | | /// <summary> |
| | 55 | | /// The fully qualified Event Hubs namespace that the processor is associated with. This is likely |
| | 56 | | /// to be similar to <c>{yournamespace}.servicebus.windows.net</c>. |
| | 57 | | /// </summary> |
| | 58 | | /// |
| 3030122 | 59 | | public string FullyQualifiedNamespace { get; } |
| | 60 | |
|
| | 61 | | /// <summary> |
| | 62 | | /// The name of the Event Hub that the processor is connected to, specific to the |
| | 63 | | /// Event Hubs namespace that contains it. |
| | 64 | | /// </summary> |
| | 65 | | /// |
| 3031038 | 66 | | public string EventHubName { get; } |
| | 67 | |
|
| | 68 | | /// <summary> |
| | 69 | | /// The name of the consumer group this event processor is associated with. Events will be |
| | 70 | | /// read only in the context of this group. |
| | 71 | | /// </summary> |
| | 72 | | /// |
| 1180 | 73 | | public string ConsumerGroup { get; } |
| | 74 | |
|
| | 75 | | /// <summary> |
| | 76 | | /// A unique name used to identify this event processor. |
| | 77 | | /// </summary> |
| | 78 | | /// |
| 1112 | 79 | | public string Identifier { get; } |
| | 80 | |
|
| | 81 | | /// <summary> |
| | 82 | | /// Indicates whether or not this event processor is currently running. |
| | 83 | | /// </summary> |
| | 84 | | /// |
| | 85 | | public bool IsRunning |
| | 86 | | { |
| | 87 | | get |
| | 88 | | { |
| 64 | 89 | | var running = _isRunningOverride; |
| | 90 | |
|
| 64 | 91 | | if (running.HasValue) |
| | 92 | | { |
| 0 | 93 | | return running.Value; |
| | 94 | | } |
| | 95 | |
|
| 64 | 96 | | var status = Status; |
| 64 | 97 | | return ((status == EventProcessorStatus.Running) || (status == EventProcessorStatus.Stopping)); |
| | 98 | | } |
| | 99 | |
|
| 0 | 100 | | protected set => _isRunningOverride = value; |
| | 101 | | } |
| | 102 | |
|
| | 103 | | /// <summary> |
| | 104 | | /// Indicates the current state of the processor. |
| | 105 | | /// </summary> |
| | 106 | | /// |
| | 107 | | internal EventProcessorStatus Status |
| | 108 | | { |
| | 109 | | get |
| | 110 | | { |
| | 111 | | EventProcessorStatus? statusOverride; |
| | 112 | |
|
| | 113 | | // If there is no active processor task, ensure that it is not |
| | 114 | | // in the process of starting by attempting to acquire the semaphore. |
| | 115 | | // |
| | 116 | | // If the semaphore could not be acquired, then there is an active start/stop |
| | 117 | | // operation in progress indicating that the processor is not yet running or |
| | 118 | | // will not be running. |
| | 119 | |
|
| 196 | 120 | | if (_runningProcessorTask == null) |
| | 121 | | { |
| | 122 | | try |
| | 123 | | { |
| 60 | 124 | | if (!ProcessorRunningGuard.Wait(100)) |
| | 125 | | { |
| 0 | 126 | | return (_statusOverride ?? EventProcessorStatus.NotRunning); |
| | 127 | | } |
| | 128 | |
|
| 60 | 129 | | statusOverride = _statusOverride; |
| 60 | 130 | | } |
| | 131 | | finally |
| | 132 | | { |
| 60 | 133 | | ProcessorRunningGuard.Release(); |
| 60 | 134 | | } |
| | 135 | | } |
| | 136 | | else |
| | 137 | | { |
| 136 | 138 | | statusOverride = _statusOverride; |
| | 139 | | } |
| | 140 | |
|
| 196 | 141 | | if (statusOverride.HasValue) |
| | 142 | | { |
| 0 | 143 | | return statusOverride.Value; |
| | 144 | | } |
| | 145 | |
|
| 196 | 146 | | if ((_runningProcessorTask?.IsFaulted) ?? (false)) |
| | 147 | | { |
| 24 | 148 | | return EventProcessorStatus.Faulted; |
| | 149 | | } |
| | 150 | |
|
| 172 | 151 | | if ((!_runningProcessorTask?.IsCompleted) ?? (false)) |
| | 152 | | { |
| 112 | 153 | | return EventProcessorStatus.Running; |
| | 154 | | } |
| | 155 | |
|
| 60 | 156 | | return EventProcessorStatus.NotRunning; |
| 0 | 157 | | } |
| | 158 | | } |
| | 159 | |
|
| | 160 | | /// <summary> |
| | 161 | | /// The instance of <see cref="EventHubsEventSource" /> which can be mocked for testing. |
| | 162 | | /// </summary> |
| | 163 | | /// |
| 0 | 164 | | internal EventHubsEventSource Logger { get; set; } = EventHubsEventSource.Log; |
| | 165 | |
|
| | 166 | | /// <summary> |
| | 167 | | /// The active policy which governs retry attempts for the processor. |
| | 168 | | /// </summary> |
| | 169 | | /// |
| 144 | 170 | | protected EventHubsRetryPolicy RetryPolicy { get; } |
| | 171 | |
|
| | 172 | | /// <summary> |
| | 173 | | /// The set of currently active partition processing tasks issued by this event processor and their associated |
| | 174 | | /// token sources that can be used to cancel the operation. Partition identifiers are used as keys. |
| | 175 | | /// </summary> |
| | 176 | | /// |
| 0 | 177 | | private ConcurrentDictionary<string, PartitionProcessor> ActivePartitionProcessors { get; } = new ConcurrentDict |
| | 178 | |
|
| | 179 | | /// <summary> |
| | 180 | | /// A factory used to create new <see cref="EventHubConnection" /> instances. |
| | 181 | | /// </summary> |
| | 182 | | /// |
| 10 | 183 | | private Func<EventHubConnection> ConnectionFactory { get; } |
| | 184 | |
|
| | 185 | | /// <summary> |
| | 186 | | /// Responsible for ownership claim for load balancing. |
| | 187 | | /// </summary> |
| | 188 | | /// |
| 490 | 189 | | private PartitionLoadBalancer LoadBalancer { get; } |
| | 190 | |
|
| | 191 | | /// <summary> |
| | 192 | | /// The set of options to use with the <see cref="EventProcessor{TPartition}" /> instance. |
| | 193 | | /// </summary> |
| | 194 | | /// |
| 6088524 | 195 | | private EventProcessorOptions Options { get; } |
| | 196 | |
|
| | 197 | | /// <summary> |
| | 198 | | /// The desired number of events to include in a batch to be processed. This size is the maximum count in a b |
| | 199 | | /// </summary> |
| | 200 | | /// |
| 3044156 | 201 | | private int EventBatchMaximumCount { get; } |
| | 202 | |
|
| | 203 | | /// <summary> |
| | 204 | | /// Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class. |
| | 205 | | /// </summary> |
| | 206 | | /// |
| | 207 | | /// <param name="eventBatchMaximumCount">The desired number of events to include in a batch to be processed. Th |
| | 208 | | /// <param name="consumerGroup">The name of the consumer group the processor is associated with. Events are rea |
| | 209 | | /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to. This is likel |
| | 210 | | /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param> |
| | 211 | | /// <param name="credential">The Azure managed identity credential to use for authorization. Access controls ma |
| | 212 | | /// <param name="options">The set of options to use for the processor.</param> |
| | 213 | | /// <param name="loadBalancer">The load balancer to use for coordinating processing with other event processor i |
| | 214 | | /// |
| 222 | 215 | | internal EventProcessor(int eventBatchMaximumCount, |
| 222 | 216 | | string consumerGroup, |
| 222 | 217 | | string fullyQualifiedNamespace, |
| 222 | 218 | | string eventHubName, |
| 222 | 219 | | TokenCredential credential, |
| 222 | 220 | | EventProcessorOptions options = default, |
| 222 | 221 | | PartitionLoadBalancer loadBalancer = default) |
| | 222 | | { |
| 222 | 223 | | Argument.AssertInRange(eventBatchMaximumCount, 1, int.MaxValue, nameof(eventBatchMaximumCount)); |
| 214 | 224 | | Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup)); |
| 210 | 225 | | Argument.AssertWellFormedEventHubsNamespace(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace)); |
| 204 | 226 | | Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); |
| 200 | 227 | | Argument.AssertNotNull(credential, nameof(credential)); |
| | 228 | |
|
| 198 | 229 | | options = options?.Clone() ?? new EventProcessorOptions(); |
| | 230 | |
|
| 202 | 231 | | ConnectionFactory = () => new EventHubConnection(fullyQualifiedNamespace, eventHubName, credential, options. |
| 198 | 232 | | FullyQualifiedNamespace = fullyQualifiedNamespace; |
| 198 | 233 | | EventHubName = eventHubName; |
| 198 | 234 | | ConsumerGroup = consumerGroup; |
| 198 | 235 | | Identifier = string.IsNullOrEmpty(options.Identifier) ? Guid.NewGuid().ToString() : options.Identifier; |
| 198 | 236 | | RetryPolicy = options.RetryOptions.ToRetryPolicy(); |
| 198 | 237 | | Options = options; |
| 198 | 238 | | EventBatchMaximumCount = eventBatchMaximumCount; |
| | 239 | |
|
| | 240 | | #pragma warning disable CA2214 // Do not call overridable methods in constructors. The virtual methods are internal and |
| 198 | 241 | | LoadBalancer = loadBalancer ?? CreatePartitionLoadBalancer(CreateStorageManager(this), Identifier, ConsumerG |
| | 242 | | #pragma warning restore CA2214 // Do not call overridable methods in constructors. |
| 198 | 243 | | } |
| | 244 | |
|
| | 245 | | /// <summary> |
| | 246 | | /// Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class. |
| | 247 | | /// </summary> |
| | 248 | | /// |
| | 249 | | /// <param name="eventBatchMaximumCount">The desired number of events to include in a batch to be processed. Th |
| | 250 | | /// <param name="consumerGroup">The name of the consumer group the processor is associated with. Events are rea |
| | 251 | | /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i |
| | 252 | | /// <param name="options">The set of options to use for the processor.</param> |
| | 253 | | /// |
| | 254 | | /// <remarks> |
| | 255 | | /// If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of t |
| | 256 | | /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]] |
| | 257 | | /// connection string. For example, ";EntityPath=telemetry-hub". |
| | 258 | | /// |
| | 259 | | /// If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s |
| | 260 | | /// Event Hub will result in a connection string that contains the name. |
| | 261 | | /// </remarks> |
| | 262 | | /// |
| | 263 | | /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/> |
| | 264 | | /// |
| | 265 | | protected EventProcessor(int eventBatchMaximumCount, |
| | 266 | | string consumerGroup, |
| | 267 | | string connectionString, |
| 38 | 268 | | EventProcessorOptions options = default) : this(eventBatchMaximumCount, consumerGroup, |
| | 269 | | { |
| 10 | 270 | | } |
| | 271 | |
|
| | 272 | | /// <summary> |
| | 273 | | /// Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class. |
| | 274 | | /// </summary> |
| | 275 | | /// |
| | 276 | | /// <param name="eventBatchMaximumCount">The desired number of events to include in a batch to be processed. Th |
| | 277 | | /// <param name="consumerGroup">The name of the consumer group the processor is associated with. Events are rea |
| | 278 | | /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i |
| | 279 | | /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param> |
| | 280 | | /// <param name="options">The set of options to use for the processor.</param> |
| | 281 | | /// |
| | 282 | | /// <remarks> |
| | 283 | | /// If the connection string is copied from the Event Hub itself, it will contain the name of the desired Even |
| | 284 | | /// and can be used directly without passing the <paramref name="eventHubName" />. The name of the Event Hub |
| | 285 | | /// passed only once, either as part of the connection string or separately. |
| | 286 | | /// </remarks> |
| | 287 | | /// |
| | 288 | | /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/> |
| | 289 | | /// |
| 48 | 290 | | protected EventProcessor(int eventBatchMaximumCount, |
| 48 | 291 | | string consumerGroup, |
| 48 | 292 | | string connectionString, |
| 48 | 293 | | string eventHubName, |
| 48 | 294 | | EventProcessorOptions options = default) |
| | 295 | | { |
| 48 | 296 | | Argument.AssertInRange(eventBatchMaximumCount, 1, int.MaxValue, nameof(eventBatchMaximumCount)); |
| 40 | 297 | | Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup)); |
| 36 | 298 | | Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString)); |
| | 299 | |
|
| 28 | 300 | | options = options?.Clone() ?? new EventProcessorOptions(); |
| | 301 | |
|
| 28 | 302 | | var connectionStringProperties = ConnectionStringParser.Parse(connectionString); |
| 28 | 303 | | connectionStringProperties.Validate(eventHubName, nameof(connectionString)); |
| | 304 | |
|
| 20 | 305 | | ConnectionFactory = () => new EventHubConnection(connectionString, eventHubName, options.ConnectionOptions); |
| 14 | 306 | | FullyQualifiedNamespace = connectionStringProperties.Endpoint.Host; |
| 14 | 307 | | EventHubName = string.IsNullOrEmpty(eventHubName) ? connectionStringProperties.EventHubName : eventHubName; |
| 14 | 308 | | ConsumerGroup = consumerGroup; |
| 14 | 309 | | Identifier = string.IsNullOrEmpty(options.Identifier) ? Guid.NewGuid().ToString() : options.Identifier; |
| 14 | 310 | | RetryPolicy = options.RetryOptions.ToRetryPolicy(); |
| 14 | 311 | | Options = options; |
| 14 | 312 | | EventBatchMaximumCount = eventBatchMaximumCount; |
| | 313 | |
|
| | 314 | | #pragma warning disable CA2214 // Do not call overridable methods in constructors. The virtual methods are internal and |
| 14 | 315 | | LoadBalancer = CreatePartitionLoadBalancer(CreateStorageManager(this), Identifier, ConsumerGroup, FullyQuali |
| | 316 | | #pragma warning restore CA2214 // Do not call overridable methods in constructors |
| 14 | 317 | | } |
| | 318 | |
|
| | 319 | | /// <summary> |
| | 320 | | /// Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class. |
| | 321 | | /// </summary> |
| | 322 | | /// |
| | 323 | | /// <param name="eventBatchMaximumCount">The desired number of events to include in a batch to be processed. Th |
| | 324 | | /// <param name="consumerGroup">The name of the consumer group the processor is associated with. Events are rea |
| | 325 | | /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to. This is likel |
| | 326 | | /// <param name="eventHubName">The name of the specific Event Hub to associate the processor with.</param> |
| | 327 | | /// <param name="credential">The Azure managed identity credential to use for authorization. Access controls ma |
| | 328 | | /// <param name="options">The set of options to use for the processor.</param> |
| | 329 | | /// |
| | 330 | | protected EventProcessor(int eventBatchMaximumCount, |
| | 331 | | string consumerGroup, |
| | 332 | | string fullyQualifiedNamespace, |
| | 333 | | string eventHubName, |
| | 334 | | TokenCredential credential, |
| 172 | 335 | | EventProcessorOptions options = default) : this(eventBatchMaximumCount, consumerGroup, |
| | 336 | | { |
| 148 | 337 | | } |
| | 338 | |
|
| | 339 | | /// <summary> |
| | 340 | | /// Initializes a new instance of the <see cref="EventProcessor{TPartition}"/> class. |
| | 341 | | /// </summary> |
| | 342 | | /// |
| 0 | 343 | | protected EventProcessor() |
| | 344 | | { |
| 0 | 345 | | } |
| | 346 | |
|
| | 347 | | /// <summary> |
| | 348 | | /// Signals the <see cref="EventProcessor{TPartition}" /> to begin processing events. Should this method be c |
| | 349 | | /// is running, no action is taken. |
| | 350 | | /// </summary> |
| | 351 | | /// |
| | 352 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 353 | | /// |
| | 354 | | public virtual async Task StartProcessingAsync(CancellationToken cancellationToken = default) => |
| 106 | 355 | | await StartProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false); |
| | 356 | |
|
| | 357 | | /// <summary> |
| | 358 | | /// Signals the <see cref="EventProcessor{TPartition}" /> to begin processing events. Should this method be c |
| | 359 | | /// is running, no action is taken. |
| | 360 | | /// </summary> |
| | 361 | | /// |
| | 362 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 363 | | /// |
| | 364 | | public virtual void StartProcessing(CancellationToken cancellationToken = default) => |
| 14 | 365 | | StartProcessingInternalAsync(false, cancellationToken).EnsureCompleted(); |
| | 366 | |
|
| | 367 | | /// <summary> |
| | 368 | | /// Signals the <see cref="EventProcessor{TPartition}" /> to stop processing events. Should this method be ca |
| | 369 | | /// is not running, no action is taken. |
| | 370 | | /// </summary> |
| | 371 | | /// |
| | 372 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 373 | | /// |
| | 374 | | public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default) => |
| 98 | 375 | | await StopProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false); |
| | 376 | |
|
| | 377 | | /// <summary> |
| | 378 | | /// Signals the <see cref="EventProcessor{TPartition}" /> to stop processing events. Should this method be ca |
| | 379 | | /// is not running, no action is taken. |
| | 380 | | /// </summary> |
| | 381 | | /// |
| | 382 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 383 | | /// |
| | 384 | | public virtual void StopProcessing(CancellationToken cancellationToken = default) => |
| 30 | 385 | | StopProcessingInternalAsync(false, cancellationToken).EnsureCompleted(); |
| | 386 | |
|
| | 387 | | /// <summary> |
| | 388 | | /// Determines whether the specified <see cref="System.Object" /> is equal to this instance. |
| | 389 | | /// </summary> |
| | 390 | | /// |
| | 391 | | /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param> |
| | 392 | | /// |
| | 393 | | /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c> |
| | 394 | | /// |
| | 395 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 0 | 396 | | public override bool Equals(object obj) => base.Equals(obj); |
| | 397 | |
|
| | 398 | | /// <summary> |
| | 399 | | /// Returns a hash code for this instance. |
| | 400 | | /// </summary> |
| | 401 | | /// |
| | 402 | | /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha |
| | 403 | | /// |
| | 404 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 0 | 405 | | public override int GetHashCode() => base.GetHashCode(); |
| | 406 | |
|
| | 407 | | /// <summary> |
| | 408 | | /// Converts the instance to string representation. |
| | 409 | | /// </summary> |
| | 410 | | /// |
| | 411 | | /// <returns>A <see cref="System.String" /> that represents this instance.</returns> |
| | 412 | | /// |
| | 413 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 8 | 414 | | public override string ToString() => $"Event Processor<{ typeof(TPartition).Name }>: { Identifier }"; |
| | 415 | |
|
| | 416 | | /// <summary> |
| | 417 | | /// Creates an <see cref="TransportConsumer" /> to use for processing. |
| | 418 | | /// </summary> |
| | 419 | | /// |
| | 420 | | /// <param name="consumerGroup">The consumer group to associate with the consumer.</param> |
| | 421 | | /// <param name="partitionId">The partition to associated with the consumer.</param> |
| | 422 | | /// <param name="eventPosition">The position in the event stream where the consumer should begin reading.</param |
| | 423 | | /// <param name="connection">The connection to use for the consumer.</param> |
| | 424 | | /// <param name="options">The options to use for configuring the consumer.</param> |
| | 425 | | /// |
| | 426 | | /// <returns>An <see cref="TransportConsumer" /> with the requested configuration.</returns> |
| | 427 | | /// |
| | 428 | | internal virtual TransportConsumer CreateConsumer(string consumerGroup, |
| | 429 | | string partitionId, |
| | 430 | | EventPosition eventPosition, |
| | 431 | | EventHubConnection connection, |
| | 432 | | EventProcessorOptions options) => |
| 2 | 433 | | connection.CreateTransportConsumer(consumerGroup, partitionId, eventPosition, options.RetryOptions.ToRetryPo |
| | 434 | |
|
| | 435 | | /// <summary> |
| | 436 | | /// Creates a <see cref="StorageManager" /> to use for interacting with durable storage. |
| | 437 | | /// </summary> |
| | 438 | | /// |
| | 439 | | /// <param name="instance">The <see cref="EventProcessor{TPartition}" /> instance to associate with the storage |
| | 440 | | /// |
| | 441 | | /// <returns>A <see cref="StorageManager" /> with the requested configuration.</returns> |
| | 442 | | /// |
| 166 | 443 | | internal virtual StorageManager CreateStorageManager(EventProcessor<TPartition> instance) => new DelegatingStora |
| | 444 | |
|
| | 445 | | /// <summary> |
| | 446 | | /// Creates a <see cref="PartitionLoadBalancer"/> for managing partition ownership for the event processor. |
| | 447 | | /// </summary> |
| | 448 | | /// |
| | 449 | | /// <param name="storageManager">Responsible for managing persistence of the partition ownership data.</param> |
| | 450 | | /// <param name="identifier">The identifier of the event processor associated with the load balancer.</param> |
| | 451 | | /// <param name="consumerGroup">The name of the consumer group this load balancer is associated with.</param> |
| | 452 | | /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace that the processor is associa |
| | 453 | | /// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param> |
| | 454 | | /// <param name="ownershipExpiration">The minimum amount of time for an ownership to be considered expired witho |
| | 455 | | /// |
| | 456 | | internal virtual PartitionLoadBalancer CreatePartitionLoadBalancer(StorageManager storageManager, |
| | 457 | | string identifier, |
| | 458 | | string consumerGroup, |
| | 459 | | string fullyQualifiedNamespace, |
| | 460 | | string eventHubName, |
| | 461 | | TimeSpan ownershipExpiration) => |
| 162 | 462 | | new PartitionLoadBalancer(storageManager, identifier, consumerGroup, fullyQualifiedNamespace, eventHubName, |
| | 463 | |
|
| | 464 | | /// <summary> |
| | 465 | | /// Performs the tasks needed to process a batch of events. |
| | 466 | | /// </summary> |
| | 467 | | /// |
| | 468 | | /// <param name="partition">The Event Hub partition whose processing should be started.</param> |
| | 469 | | /// <param name="eventBatch">The batch of events to process.</param> |
| | 470 | | /// <param name="dispatchEmptyBatches"><c>true</c> if empty batches should be dispatched to the handler; otherwi |
| | 471 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 472 | | /// |
| | 473 | | internal virtual async Task ProcessEventBatchAsync(TPartition partition, |
| | 474 | | IReadOnlyList<EventData> eventBatch, |
| | 475 | | bool dispatchEmptyBatches, |
| | 476 | | CancellationToken cancellationToken) |
| | 477 | | { |
| 3029992 | 478 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 479 | |
|
| | 480 | | // If there were no events in the batch and empty batches should not be emitted, |
| | 481 | | // take no further action. |
| | 482 | |
|
| 3029962 | 483 | | if (((eventBatch == null) || (eventBatch.Count <= 0)) && (!dispatchEmptyBatches)) |
| | 484 | | { |
| 2 | 485 | | return; |
| | 486 | | } |
| | 487 | |
|
| | 488 | | // Create the diagnostics scope used for distributed tracing and instrument the events in the batch. |
| | 489 | |
|
| 3029960 | 490 | | using var diagnosticScope = EventDataInstrumentation.ScopeFactory.CreateScope(DiagnosticProperty.EventProces |
| 3029960 | 491 | | diagnosticScope.AddAttribute(DiagnosticProperty.KindAttribute, DiagnosticProperty.ConsumerKind); |
| 3029960 | 492 | | diagnosticScope.AddAttribute(DiagnosticProperty.EventHubAttribute, EventHubName); |
| 3029960 | 493 | | diagnosticScope.AddAttribute(DiagnosticProperty.EndpointAttribute, FullyQualifiedNamespace); |
| | 494 | |
|
| 3029960 | 495 | | if ((diagnosticScope.IsEnabled) && (eventBatch.Any())) |
| | 496 | | { |
| 20 | 497 | | foreach (var eventData in eventBatch) |
| | 498 | | { |
| 6 | 499 | | if (EventDataInstrumentation.TryExtractDiagnosticId(eventData, out string diagnosticId)) |
| | 500 | | { |
| 6 | 501 | | var attributes = new Dictionary<string, string>(1) |
| 6 | 502 | | { |
| 6 | 503 | | { DiagnosticProperty.EnqueuedTimeAttribute, eventData.EnqueuedTime.ToUnixTimeMilliseconds(). |
| 6 | 504 | | }; |
| | 505 | |
|
| 6 | 506 | | diagnosticScope.AddLink(diagnosticId, attributes); |
| | 507 | | } |
| | 508 | | } |
| | 509 | | } |
| | 510 | |
|
| 3029960 | 511 | | diagnosticScope.Start(); |
| | 512 | |
|
| | 513 | | // Dispatch the batch to the handler for processing. Exceptions in the handler code are intended to be |
| | 514 | | // unhandled by the processor; explicitly signal that the exception was observed in developer-provided |
| | 515 | | // code. |
| | 516 | |
|
| | 517 | | try |
| | 518 | | { |
| 3029960 | 519 | | await OnProcessingEventBatchAsync(eventBatch, partition, cancellationToken).ConfigureAwait(false); |
| 3029958 | 520 | | } |
| 2 | 521 | | catch (Exception ex) |
| | 522 | | { |
| 2 | 523 | | diagnosticScope.Failed(ex); |
| 2 | 524 | | throw new DeveloperCodeException(ex); |
| | 525 | | } |
| 3029960 | 526 | | } |
| | 527 | |
|
| | 528 | | /// <summary> |
| | 529 | | /// Creates the infrastructure for tracking the processing of a partition and begins processing the |
| | 530 | | /// partition in the background until cancellation is requested. |
| | 531 | | /// </summary> |
| | 532 | | /// |
| | 533 | | /// <param name="partition">The Event Hub partition whose processing should be started.</param> |
| | 534 | | /// <param name="startingPosition">The position within the event stream that processing should begin.</param> |
| | 535 | | /// <param name="cancellationSource">A <see cref="CancellationTokenSource"/> instance to signal the request to c |
| | 536 | | /// |
| | 537 | | /// <returns>The <see cref="PartitionProcessor" /> encapsulating the processing task, its cancellation token, an |
| | 538 | | /// |
| | 539 | | /// <remarks> |
| | 540 | | /// This method makes liberal use of class methods and state in addition to the received parameters. |
| | 541 | | /// </remarks> |
| | 542 | | /// |
| | 543 | | internal virtual PartitionProcessor CreatePartitionProcessor(TPartition partition, |
| | 544 | | EventPosition startingPosition, |
| | 545 | | CancellationTokenSource cancellationSource) |
| | 546 | | { |
| 80 | 547 | | cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 78 | 548 | | var consumer = default(TransportConsumer); |
| | 549 | |
|
| | 550 | | // If the tracking of the last enqueued event properties was requested, then read the |
| | 551 | | // properties from the active consumer, which can change during processing in the event of |
| | 552 | | // error scenarios. |
| | 553 | |
|
| | 554 | | LastEnqueuedEventProperties readLastEnquedEventInformation() |
| | 555 | | { |
| | 556 | | // This is not an expected scenario; the guard exists to prevent a race condition that is |
| | 557 | | // unlikely, but possible, when partition processing is being stopped or consumer creation |
| | 558 | | // outright failed. |
| | 559 | |
|
| 8 | 560 | | if ((consumer == null) || (consumer.IsClosed)) |
| | 561 | | { |
| 2 | 562 | | Argument.AssertNotClosed(true, Resources.ClientNeededForThisInformationNotAvailable); |
| | 563 | | } |
| | 564 | |
|
| 6 | 565 | | return new LastEnqueuedEventProperties(consumer.LastReceivedEvent); |
| | 566 | | } |
| | 567 | |
|
| | 568 | | // Define the routine to handle processing for the partition. |
| | 569 | |
|
| | 570 | | async Task performProcessing() |
| | 571 | | { |
| 78 | 572 | | cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 573 | |
|
| 72 | 574 | | var connection = default(EventHubConnection); |
| 72 | 575 | | var retryDelay = default(TimeSpan?); |
| 72 | 576 | | var capturedException = default(Exception); |
| 72 | 577 | | var eventBatch = default(IReadOnlyList<EventData>); |
| 72 | 578 | | var lastEvent = default(EventData); |
| 72 | 579 | | var failedAttemptCount = 0; |
| 72 | 580 | | var failedConsumerCount = 0; |
| | 581 | |
|
| | 582 | | // Create the connection to be used for spawning consumers; if the creation |
| | 583 | | // fails, then consider the processing task to be failed. The main processing |
| | 584 | | // loop will take responsibility for attempting to restart or relinquishing ownership. |
| | 585 | |
|
| | 586 | | try |
| | 587 | | { |
| 72 | 588 | | connection = CreateConnection(); |
| 70 | 589 | | } |
| 2 | 590 | | catch (Exception ex) |
| | 591 | | { |
| | 592 | | // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibil |
| | 593 | | // for observing or surfacing exceptions that may occur in the handler. |
| | 594 | |
|
| 2 | 595 | | _ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationReadEvents, CancellationToken.Non |
| 2 | 596 | | Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, Consu |
| | 597 | |
|
| 2 | 598 | | throw; |
| | 599 | | } |
| | 600 | |
|
| 70 | 601 | | await using var connectionAwaiter = connection.ConfigureAwait(false); |
| | 602 | |
|
| | 603 | | // Continue processing the partition until cancellation is signaled or until the count of failed consume |
| | 604 | | // Consumers which been consistently unable to receive and process events will be considered invalid and |
| | 605 | |
|
| 122 | 606 | | while ((!cancellationSource.IsCancellationRequested) && (failedConsumerCount <= MaximumFailedConsumerCou |
| | 607 | | { |
| | 608 | | try |
| | 609 | | { |
| 90 | 610 | | consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, startingPosition, connection, Op |
| | 611 | |
|
| | 612 | | // Allow the core dispatching loop to apply an additional set of retries over any provided by th |
| | 613 | | // itself, as a processor should be as resilient as possible and retain partition ownership if p |
| | 614 | | // able to make forward progress. If the retries are exhausted or a non-retriable exception occ |
| | 615 | | // consumer will be considered invalid and potentially refreshed. |
| | 616 | |
|
| 3044180 | 617 | | while (!cancellationSource.IsCancellationRequested) |
| | 618 | | { |
| | 619 | | try |
| | 620 | | { |
| 3044156 | 621 | | eventBatch = await consumer.ReceiveAsync(EventBatchMaximumCount, Options.MaximumWaitTime |
| 3044126 | 622 | | await ProcessEventBatchAsync(partition, eventBatch, Options.MaximumWaitTime.HasValue, ca |
| | 623 | |
|
| | 624 | | // If the batch was successfully processed, capture the last event as the current starti |
| | 625 | | // event that the consumer becomes invalid and needs to be replaced. |
| | 626 | |
|
| 3044086 | 627 | | lastEvent = (eventBatch != null && eventBatch.Count > 0) ? eventBatch[eventBatch.Count - |
| | 628 | |
|
| 3044086 | 629 | | if ((lastEvent != null) && (lastEvent.Offset != long.MinValue)) |
| | 630 | | { |
| 2 | 631 | | startingPosition = EventPosition.FromOffset(lastEvent.Offset, false); |
| | 632 | | } |
| | 633 | |
|
| | 634 | | // If event batches are successfully processed, then the need for forward progress is |
| | 635 | | // satisfied, and the failure counts should reset. |
| | 636 | |
|
| 3044086 | 637 | | failedAttemptCount = 0; |
| 3044086 | 638 | | failedConsumerCount = 0; |
| 3044086 | 639 | | } |
| 30 | 640 | | catch (TaskCanceledException) when (cancellationSource.IsCancellationRequested) |
| | 641 | | { |
| | 642 | | // Do not log; this is an expected scenario when partition processing is asked to stop. |
| | 643 | |
|
| 28 | 644 | | throw; |
| | 645 | | } |
| 42 | 646 | | catch (Exception ex) when (ex.IsNotType<DeveloperCodeException>()) |
| | 647 | | { |
| | 648 | | // The error handler is invoked as a fire-and-forget task; the processor does not assume |
| | 649 | | // for observing or surfacing exceptions that may occur in the handler. |
| | 650 | |
|
| 36 | 651 | | _ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationReadEvents, Cancellat |
| | 652 | |
|
| 36 | 653 | | Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHu |
| 36 | 654 | | retryDelay = RetryPolicy.CalculateRetryDelay(ex, ++failedAttemptCount); |
| | 655 | |
|
| 36 | 656 | | if (!retryDelay.HasValue) |
| | 657 | | { |
| | 658 | | // If the exception should not be retried, then allow it to pass to the outer loop; |
| | 659 | | // to prevent being stuck in a corrupt state where the consumer is unable to read ev |
| | 660 | |
|
| 32 | 661 | | throw; |
| | 662 | | } |
| | 663 | |
|
| 4 | 664 | | await Task.Delay(retryDelay.Value, cancellationSource.Token).ConfigureAwait(false); |
| | 665 | | } |
| | 666 | | } |
| 24 | 667 | | } |
| 32 | 668 | | catch (OperationCanceledException ex) |
| | 669 | | { |
| 32 | 670 | | throw new TaskCanceledException(ex.Message, ex); |
| | 671 | | } |
| 6 | 672 | | catch (DeveloperCodeException ex) |
| | 673 | | { |
| | 674 | | // Record that an exception was observed in developer-provided code, but consider it fatal and t |
| | 675 | | // steps to handle or dispatch it. |
| | 676 | |
|
| 6 | 677 | | var message = string.Format(CultureInfo.InvariantCulture, Resources.DeveloperCodeExceptionMessag |
| 6 | 678 | | Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, C |
| | 679 | |
|
| 6 | 680 | | ExceptionDispatchInfo.Capture(ex.InnerException).Throw(); |
| 0 | 681 | | } |
| 28 | 682 | | catch (Exception ex) when (ex.IsFatalException()) |
| | 683 | | { |
| 0 | 684 | | throw; |
| | 685 | | } |
| 28 | 686 | | catch (Exception ex) |
| | 687 | | { |
| 28 | 688 | | ++failedConsumerCount; |
| 28 | 689 | | capturedException = ex; |
| 28 | 690 | | } |
| | 691 | | finally |
| | 692 | | { |
| | 693 | | try |
| | 694 | | { |
| 90 | 695 | | if (consumer != null) |
| | 696 | | { |
| 86 | 697 | | await consumer.CloseAsync(CancellationToken.None).ConfigureAwait(false); |
| | 698 | | } |
| 88 | 699 | | } |
| 2 | 700 | | catch (Exception ex) |
| | 701 | | { |
| 2 | 702 | | Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubNam |
| | 703 | |
|
| | 704 | | // Do not bubble the exception, as the consumer is being refreshed; failure to close this co |
| 2 | 705 | | } |
| | 706 | | } |
| | 707 | | } |
| | 708 | |
|
| | 709 | | // If there was an exception captured, then surface it. Otherwise signal that cancellation took place. |
| | 710 | |
|
| 32 | 711 | | if (capturedException != null) |
| | 712 | | { |
| 18 | 713 | | ExceptionDispatchInfo.Capture(capturedException).Throw(); |
| | 714 | | } |
| | 715 | |
|
| 14 | 716 | | throw new TaskCanceledException(); |
| 0 | 717 | | } |
| | 718 | |
|
| | 719 | | // Start processing in the background and return the processor |
| | 720 | | // metadata. |
| | 721 | |
|
| 78 | 722 | | return new PartitionProcessor |
| 78 | 723 | | ( |
| 78 | 724 | | Task.Run(performProcessing), |
| 78 | 725 | | partition, |
| 78 | 726 | | readLastEnquedEventInformation, |
| 78 | 727 | | cancellationSource |
| 78 | 728 | | ); |
| | 729 | | } |
| | 730 | |
|
| | 731 | | /// <summary> |
| | 732 | | /// Creates an <see cref="EventHubConnection" /> to use for communicating with the Event Hubs service. |
| | 733 | | /// </summary> |
| | 734 | | /// |
| | 735 | | /// <returns>The requested <see cref="EventHubConnection" />.</returns> |
| | 736 | | /// |
| 10 | 737 | | protected internal virtual EventHubConnection CreateConnection() => ConnectionFactory(); |
| | 738 | |
|
| | 739 | | /// <summary> |
| | 740 | | /// Produces a list of the available checkpoints for the Event Hub and consumer group associated with the |
| | 741 | | /// event processor instance, so that processing for a given set of partitions can be properly initialized. |
| | 742 | | /// </summary> |
| | 743 | | /// |
| | 744 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 745 | | /// |
| | 746 | | /// <returns>The set of checkpoints for the processor to take into account when initializing partitions.</return |
| | 747 | | /// |
| | 748 | | /// <remarks> |
| | 749 | | /// Should a partition not have a corresponding checkpoint, the <see cref="EventProcessorOptions.DefaultStarti |
| | 750 | | /// be used to initialize the partition for processing. |
| | 751 | | /// |
| | 752 | | /// In the event that a custom starting point is desired for a single partition, or each partition should star |
| | 753 | | /// it is recommended that this method express that intent by returning checkpoints for those partitions with |
| | 754 | | /// starting location set. |
| | 755 | | /// </remarks> |
| | 756 | | /// |
| | 757 | | protected abstract Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken cancellati |
| | 758 | |
|
| | 759 | | /// <summary> |
| | 760 | | /// Produces a list of the ownership assignments for partitions between each of the cooperating event processo |
| | 761 | | /// instances for a given Event Hub and consumer group pairing. This method is used when load balancing to al |
| | 762 | | /// the processor to discover other active collaborators and to make decisions about how to best balance work |
| | 763 | | /// between them. |
| | 764 | | /// </summary> |
| | 765 | | /// |
| | 766 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 767 | | /// |
| | 768 | | /// <returns>The set of ownership records to take into account when making load balancing decisions.</returns> |
| | 769 | | /// |
| | 770 | | protected abstract Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken canc |
| | 771 | |
|
| | 772 | | /// <summary> |
| | 773 | | /// Attempts to claim ownership of the specified partitions for processing. This method is used by |
| | 774 | | /// load balancing to allow event processor instances to distribute the responsibility for processing |
| | 775 | | /// partitions for a given Event Hub and consumer group pairing amongst the active event processors. |
| | 776 | | /// </summary> |
| | 777 | | /// |
| | 778 | | /// <param name="desiredOwnership">The set of partition ownership desired by the event processor instance; this |
| | 779 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 780 | | /// |
| | 781 | | /// <returns>The set of ownership records for the partitions that were successfully claimed; this is expected to |
| | 782 | | /// |
| | 783 | | protected abstract Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProc |
| | 784 | | CancellationToken can |
| | 785 | |
|
| | 786 | | /// <summary> |
| | 787 | | /// Performs the tasks needed to process a batch of events for a given partition as they are read from the Eve |
| | 788 | | /// </summary> |
| | 789 | | /// |
| | 790 | | /// <param name="events">The batch of events to be processed.</param> |
| | 791 | | /// <param name="partition">The context of the partition from which the events were read.</param> |
| | 792 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 793 | | /// |
| | 794 | | /// <remarks> |
| | 795 | | /// <para>The number of events in the <paramref name="events"/> batch may vary. The batch will contain a numb |
| | 796 | | /// requested when the processor was created, depending on the availability of events in the partition within |
| | 797 | | /// interval. |
| | 798 | | /// |
| | 799 | | /// If there are enough events available in the Event Hub partition to fill a batch of the requested size, the |
| | 800 | | /// immediately. If there were not a sufficient number of events available in the partition to populate a ful |
| | 801 | | /// to reach the requested batch size until the <see cref="EventProcessorOptions.MaximumWaitTime"/> has elapse |
| | 802 | | /// available by the end of that period. |
| | 803 | | /// |
| | 804 | | /// If a <see cref="EventProcessorOptions.MaximumWaitTime"/> was not requested, indicated by setting the optio |
| | 805 | | /// partition until a full batch of the requested size could be populated and will not dispatch any partial ba |
| | 806 | | /// |
| | 807 | | /// <para>Should an exception occur within the code for this method, the event processor will allow it to bubb |
| | 808 | | /// it in any way. Developers are strongly encouraged to take exception scenarios into account and guard agai |
| | 809 | | /// |
| | 810 | | /// <para>It is not recommended that the state of the processor be managed directly from within this method; r |
| | 811 | | /// a deadlock scenario, especially if using the synchronous form of the call.</para> |
| | 812 | | /// </remarks> |
| | 813 | | /// |
| | 814 | | protected abstract Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, |
| | 815 | | TPartition partition, |
| | 816 | | CancellationToken cancellationToken); |
| | 817 | |
|
| | 818 | | /// <summary> |
| | 819 | | /// Performs the tasks needed when an unexpected exception occurs within the operation of the |
| | 820 | | /// event processor infrastructure. |
| | 821 | | /// </summary> |
| | 822 | | /// |
| | 823 | | /// <param name="exception">The exception that occurred during operation of the event processor.</param> |
| | 824 | | /// <param name="partition">The context of the partition associated with the error, if any; otherwise, <c>null</ |
| | 825 | | /// <param name="operationDescription">A short textual description of the operation during which the exception o |
| | 826 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 827 | | /// |
| | 828 | | /// <remarks> |
| | 829 | | /// This error handler is invoked when there is an exception observed within the event processor itself; it is |
| | 830 | | /// code that has been implemented to process events or other overrides and extension points that are not crit |
| | 831 | | /// The event processor will make every effort to recover from exceptions and continue processing. Should an |
| | 832 | | /// from be encountered, the processor will attempt to forfeit ownership of all partitions that it was process |
| | 833 | | /// |
| | 834 | | /// The exceptions surfaced to this method may be fatal or non-fatal; because the processor may not be able to |
| | 835 | | /// exception was fatal or whether its state was corrupted, this method has responsibility for making the dete |
| | 836 | | /// should be terminated or restarted. The method may do so by calling Stop on the processor instance and the |
| | 837 | | /// |
| | 838 | | /// It is recommended that, for production scenarios, the decision be made by considering observations made by |
| | 839 | | /// when initializing processing for a partition, and the method invoked when processing for a partition is st |
| | 840 | | /// data from their monitoring platforms in this decision as well. |
| | 841 | | /// |
| | 842 | | /// As with event processing, should an exception occur in the code for the error handler, the event processor |
| | 843 | | /// it in any way. Developers are strongly encouraged to take exception scenarios into account and guard agai |
| | 844 | | /// </remarks> |
| | 845 | | /// |
| | 846 | | protected abstract Task OnProcessingErrorAsync(Exception exception, |
| | 847 | | TPartition partition, |
| | 848 | | string operationDescription, |
| | 849 | | CancellationToken cancellationToken); |
| | 850 | |
|
| | 851 | | /// <summary> |
| | 852 | | /// Performs the tasks to initialize a partition, and its associated context, for event processing. |
| | 853 | | /// </summary> |
| | 854 | | /// |
| | 855 | | /// <param name="partition">The context of the partition being initialized. Only the well-known members of the |
| | 856 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 857 | | /// |
| | 858 | | /// <remarks> |
| | 859 | | /// It is not recommended that the state of the processor be managed directly from within this method; request |
| | 860 | | /// a deadlock scenario, especially if using the synchronous form of the call. |
| | 861 | | /// </remarks> |
| | 862 | | /// |
| | 863 | | protected virtual Task OnInitializingPartitionAsync(TPartition partition, |
| 42 | 864 | | CancellationToken cancellationToken) => Task.CompletedTask; |
| | 865 | |
|
| | 866 | | /// <summary> |
| | 867 | | /// Performs the tasks needed when processing for a partition is being stopped. This commonly occurs when the |
| | 868 | | /// is claimed by another event processor instance or when the current event processor instance is shutting do |
| | 869 | | /// </summary> |
| | 870 | | /// |
| | 871 | | /// <param name="partition">The context of the partition for which processing is being stopped.</param> |
| | 872 | | /// <param name="reason">The reason that processing is being stopped for the partition.</param> |
| | 873 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 874 | | /// |
| | 875 | | /// <remarks> |
| | 876 | | /// It is not recommended that the state of the processor be managed directly from within this method; request |
| | 877 | | /// a deadlock scenario, especially if using the synchronous form of the call. |
| | 878 | | /// </remarks> |
| | 879 | | /// |
| | 880 | | protected virtual Task OnPartitionProcessingStoppedAsync(TPartition partition, |
| | 881 | | ProcessingStoppedReason reason, |
| 42 | 882 | | CancellationToken cancellationToken) => Task.CompletedT |
| | 883 | |
|
| | 884 | | /// <summary> |
| | 885 | | /// A set of information about the last enqueued event of a partition, as observed by the associated EventHubs |
| | 886 | | /// associated with this context as events are received from the Event Hubs service. This is only available i |
| | 887 | | /// created with <see cref="ReadEventOptions.TrackLastEnqueuedEventProperties" /> set. |
| | 888 | | /// </summary> |
| | 889 | | /// |
| | 890 | | /// <param name="partitionId">The identifier of the Event Hub partition to read the properties from.</param> |
| | 891 | | /// |
| | 892 | | /// <returns>The set of properties for the last event that was enqueued to the partition.</returns> |
| | 893 | | /// |
| | 894 | | /// <remarks> |
| | 895 | | /// When information about the partition's last enqueued event is being tracked, each event received from the |
| | 896 | | /// service will carry metadata about the partition that it otherwise would not. This results in a small amoun |
| | 897 | | /// additional network bandwidth consumption that is generally a favorable trade-off when considered |
| | 898 | | /// against periodically making requests for partition properties using an Event Hub client. |
| | 899 | | /// </remarks> |
| | 900 | | /// |
| | 901 | | /// <exception cref="InvalidOperationException">Occurs when this method is invoked without <see cref="EventProce |
| | 902 | | /// |
| | 903 | | protected virtual LastEnqueuedEventProperties ReadLastEnqueuedEventProperties(string partitionId) |
| | 904 | | { |
| 4 | 905 | | if (!ActivePartitionProcessors.TryGetValue(partitionId, out var processor)) |
| | 906 | | { |
| 2 | 907 | | Argument.AssertNotClosed(true, Resources.ClientNeededForThisInformationNotAvailable); |
| | 908 | | } |
| | 909 | |
|
| 2 | 910 | | return processor.ReadLastEnqueuedEventProperties(); |
| | 911 | | } |
| | 912 | |
|
| | 913 | | /// <summary> |
| | 914 | | /// Signals the <see cref="EventProcessor{TPartition}" /> to begin processing events. Should this method be ca |
| | 915 | | /// </summary> |
| | 916 | | /// |
| | 917 | | /// <param name="async">When <c>true</c>, the method will be executed asynchronously; otherwise, it will execute |
| | 918 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 919 | | /// |
| | 920 | | private async Task StartProcessingInternalAsync(bool async, |
| | 921 | | CancellationToken cancellationToken) |
| | 922 | | { |
| 120 | 923 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 116 | 924 | | Logger.EventProcessorStart(Identifier, EventHubName, ConsumerGroup); |
| | 925 | |
|
| 116 | 926 | | var releaseGuard = false; |
| | 927 | |
|
| | 928 | | try |
| | 929 | | { |
| | 930 | | // Acquire the semaphore used to synchronize processor starts and stops, respecting |
| | 931 | | // the async flag. When this is held, the state of the processor is stable. |
| | 932 | |
|
| 116 | 933 | | if (async) |
| | 934 | | { |
| 104 | 935 | | await ProcessorRunningGuard.WaitAsync(cancellationToken).ConfigureAwait(false); |
| | 936 | | } |
| | 937 | | else |
| | 938 | | { |
| 12 | 939 | | ProcessorRunningGuard.Wait(cancellationToken); |
| | 940 | | } |
| | 941 | |
|
| 112 | 942 | | releaseGuard = true; |
| 112 | 943 | | _statusOverride = EventProcessorStatus.Starting; |
| | 944 | |
|
| | 945 | | // If the processor is already running, then it was started before the |
| | 946 | | // semaphore was acquired; there is no work to be done. |
| | 947 | |
|
| 112 | 948 | | if (_runningProcessorTask != null) |
| | 949 | | { |
| 8 | 950 | | return; |
| | 951 | | } |
| | 952 | |
|
| | 953 | | // There should be no cancellation source, but guard against leaking resources in the |
| | 954 | | // event of a processing crash or other exception. |
| | 955 | |
|
| 104 | 956 | | _runningProcessorCancellationSource?.Cancel(); |
| 104 | 957 | | _runningProcessorCancellationSource?.Dispose(); |
| 104 | 958 | | _runningProcessorCancellationSource = new CancellationTokenSource(); |
| | 959 | |
|
| | 960 | | // Start processing events. |
| | 961 | |
|
| 104 | 962 | | ActivePartitionProcessors.Clear(); |
| 104 | 963 | | _runningProcessorTask = RunProcessingAsync(_runningProcessorCancellationSource.Token); |
| 104 | 964 | | } |
| 4 | 965 | | catch (OperationCanceledException ex) |
| | 966 | | { |
| 4 | 967 | | Logger.EventProcessorStartError(Identifier, EventHubName, ConsumerGroup, ex.Message); |
| 4 | 968 | | throw new TaskCanceledException(); |
| | 969 | | } |
| 0 | 970 | | catch (Exception ex) |
| | 971 | | { |
| 0 | 972 | | Logger.EventProcessorStartError(Identifier, EventHubName, ConsumerGroup, ex.Message); |
| 0 | 973 | | throw; |
| | 974 | | } |
| | 975 | | finally |
| | 976 | | { |
| 116 | 977 | | _statusOverride = null; |
| 116 | 978 | | Logger.EventProcessorStartComplete(Identifier, EventHubName, ConsumerGroup); |
| | 979 | |
|
| | 980 | | // If the cancellation token was signaled during the attempt to acquire the |
| | 981 | | // semaphore, it cannot be safely released; ensure that it is held. |
| | 982 | |
|
| 116 | 983 | | if (releaseGuard) |
| | 984 | | { |
| 112 | 985 | | ProcessorRunningGuard.Release(); |
| | 986 | | } |
| | 987 | | } |
| 112 | 988 | | } |
| | 989 | |
|
| | 990 | | /// <summary> |
| | 991 | | /// Signals the <see cref="EventProcessor{TPartition}" /> to stop processing events. Should this method be cal |
| | 992 | | /// </summary> |
| | 993 | | /// |
| | 994 | | /// <param name="async">When <c>true</c>, the method will be executed asynchronously; otherwise, it will execute |
| | 995 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 996 | | /// |
| | 997 | | private async Task StopProcessingInternalAsync(bool async, |
| | 998 | | CancellationToken cancellationToken) |
| | 999 | | { |
| 128 | 1000 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 120 | 1001 | | Logger.EventProcessorStop(Identifier, EventHubName, ConsumerGroup); |
| | 1002 | |
|
| 120 | 1003 | | var processingException = default(Exception); |
| 120 | 1004 | | var releaseGuard = false; |
| | 1005 | |
|
| | 1006 | | try |
| | 1007 | | { |
| | 1008 | | // Acquire the semaphore used to synchronize processor starts and stops, respecting |
| | 1009 | | // the async flag. When this is held, the state of the processor is stable. |
| | 1010 | |
|
| 120 | 1011 | | if (async) |
| | 1012 | | { |
| 92 | 1013 | | await ProcessorRunningGuard.WaitAsync(cancellationToken).ConfigureAwait(false); |
| | 1014 | | } |
| | 1015 | | else |
| | 1016 | | { |
| 28 | 1017 | | ProcessorRunningGuard.Wait(cancellationToken); |
| | 1018 | | } |
| | 1019 | |
|
| 116 | 1020 | | releaseGuard = true; |
| 116 | 1021 | | _statusOverride = EventProcessorStatus.Stopping; |
| | 1022 | |
|
| | 1023 | | // If the processor is not running, then it was never started or has been stopped |
| | 1024 | | // before the semaphore was acquired; there is no work to be done. |
| | 1025 | |
|
| 116 | 1026 | | if (_runningProcessorTask == null) |
| | 1027 | | { |
| 16 | 1028 | | return; |
| | 1029 | | } |
| | 1030 | |
|
| | 1031 | | // Request cancellation of the running processor task. |
| | 1032 | |
|
| 100 | 1033 | | _runningProcessorCancellationSource?.Cancel(); |
| 100 | 1034 | | _runningProcessorCancellationSource?.Dispose(); |
| 100 | 1035 | | _runningProcessorCancellationSource = null; |
| | 1036 | |
|
| | 1037 | | // Allow processing to complete. If there was a processing or load balancing error, |
| | 1038 | | // awaiting the task is where it will be surfaced. Be sure to preserve it so |
| | 1039 | | // that it can be surfaced. |
| | 1040 | |
|
| | 1041 | | try |
| | 1042 | | { |
| 100 | 1043 | | if (async) |
| | 1044 | | { |
| 82 | 1045 | | await _runningProcessorTask.ConfigureAwait(false); |
| | 1046 | | } |
| | 1047 | | else |
| | 1048 | | { |
| | 1049 | | #pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi |
| 18 | 1050 | | _runningProcessorTask.GetAwaiter().GetResult(); |
| | 1051 | | #pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi |
| | 1052 | | } |
| 0 | 1053 | | } |
| 84 | 1054 | | catch (TaskCanceledException) |
| | 1055 | | { |
| | 1056 | | // This is expected; no action is needed. |
| 84 | 1057 | | } |
| 16 | 1058 | | catch (Exception ex) |
| | 1059 | | { |
| | 1060 | | // Preserve the exception to surface once the tasks needed to fully stop are complete; |
| | 1061 | | // logging and invoking of the error handler will have already taken place. |
| | 1062 | |
|
| 16 | 1063 | | processingException = ex; |
| 16 | 1064 | | } |
| | 1065 | |
|
| | 1066 | | // With the processing task having completed, perform the necessary cleanup of partition processing task |
| | 1067 | | // and surrender ownership. |
| | 1068 | |
|
| 100 | 1069 | | var stopPartitionProcessingTasks = ActivePartitionProcessors.Keys |
| 138 | 1070 | | .Select(partitionId => TryStopProcessingPartitionAsync(partitionId, ProcessingStoppedReason.Shutdown |
| 100 | 1071 | | .ToArray(); |
| | 1072 | |
|
| 100 | 1073 | | if (async) |
| | 1074 | | { |
| 82 | 1075 | | await Task.WhenAll(stopPartitionProcessingTasks).ConfigureAwait(false); |
| 82 | 1076 | | await LoadBalancer.RelinquishOwnershipAsync(CancellationToken.None).ConfigureAwait(false); |
| | 1077 | | } |
| | 1078 | | else |
| | 1079 | | { |
| 18 | 1080 | | Task.WaitAll(stopPartitionProcessingTasks); |
| | 1081 | |
|
| | 1082 | | #pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi |
| 18 | 1083 | | LoadBalancer.RelinquishOwnershipAsync(CancellationToken.None).GetAwaiter().GetResult(); |
| | 1084 | | #pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi |
| | 1085 | | } |
| | 1086 | |
|
| 100 | 1087 | | ActivePartitionProcessors.Clear(); |
| | 1088 | |
|
| | 1089 | | // Dispose of the processing task and reset processing state to |
| | 1090 | | // allow the processor to be restarted when this method completes. |
| | 1091 | |
|
| 100 | 1092 | | _runningProcessorTask.Dispose(); |
| 100 | 1093 | | _runningProcessorTask = null; |
| 100 | 1094 | | } |
| 4 | 1095 | | catch (OperationCanceledException ex) |
| | 1096 | | { |
| 4 | 1097 | | Logger.EventProcessorStopError(Identifier, EventHubName, ConsumerGroup, ex.Message); |
| 4 | 1098 | | throw new TaskCanceledException(); |
| | 1099 | | } |
| 0 | 1100 | | catch (Exception ex) |
| | 1101 | | { |
| 0 | 1102 | | Logger.EventProcessorStopError(Identifier, EventHubName, ConsumerGroup, ex.Message); |
| 0 | 1103 | | throw; |
| | 1104 | | } |
| | 1105 | | finally |
| | 1106 | | { |
| 120 | 1107 | | _statusOverride = null; |
| 120 | 1108 | | Logger.EventProcessorStopComplete(Identifier, EventHubName, ConsumerGroup); |
| | 1109 | |
|
| | 1110 | | // If the cancellation token was signaled during the attempt to acquire the |
| | 1111 | | // semaphore, it cannot be safely released; ensure that it is held. |
| | 1112 | |
|
| 120 | 1113 | | if (releaseGuard) |
| | 1114 | | { |
| 116 | 1115 | | ProcessorRunningGuard.Release(); |
| | 1116 | | } |
| | 1117 | | } |
| | 1118 | |
|
| | 1119 | | // Surface any exception that was captured when the processing task was |
| | 1120 | | // initially awaited. |
| | 1121 | |
|
| 100 | 1122 | | if (processingException != default) |
| | 1123 | | { |
| 16 | 1124 | | ExceptionDispatchInfo.Capture(processingException).Throw(); |
| | 1125 | | } |
| 100 | 1126 | | } |
| | 1127 | |
|
| | 1128 | | /// <summary> |
| | 1129 | | /// Performs the tasks needed to execute processing for this <see cref="EventProcessor{TPartition}" /> instanc |
| | 1130 | | /// load balancing between associated processors. |
| | 1131 | | /// </summary> |
| | 1132 | | /// |
| | 1133 | | /// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal the request to cancel |
| | 1134 | | /// |
| | 1135 | | private async Task RunProcessingAsync(CancellationToken cancellationToken) |
| | 1136 | | { |
| 104 | 1137 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 1138 | |
|
| | 1139 | | try |
| | 1140 | | { |
| 104 | 1141 | | var connection = CreateConnection(); |
| 88 | 1142 | | await using var connectionAwaiter = connection.ConfigureAwait(false); |
| | 1143 | |
|
| | 1144 | | ValueStopwatch cycleDuration; |
| 88 | 1145 | | var partitionIds = default(string[]); |
| | 1146 | |
|
| 108 | 1147 | | while (!cancellationToken.IsCancellationRequested) |
| | 1148 | | { |
| 108 | 1149 | | cycleDuration = ValueStopwatch.StartNew(); |
| | 1150 | |
|
| | 1151 | | try |
| | 1152 | | { |
| 108 | 1153 | | partitionIds = await connection.GetPartitionIdsAsync(RetryPolicy, cancellationToken).ConfigureAw |
| 94 | 1154 | | } |
| 14 | 1155 | | catch (Exception ex) when (ex.IsNotType<TaskCanceledException>()) |
| | 1156 | | { |
| | 1157 | | // Logging for exceptions with the service operation are responsibility of the connection. |
| | 1158 | |
|
| 14 | 1159 | | _ = InvokeOnProcessingErrorAsync(ex, null, Resources.OperationGetPartitionIds, CancellationToken |
| 14 | 1160 | | partitionIds = default; |
| 14 | 1161 | | } |
| | 1162 | |
|
| 108 | 1163 | | var remainingTimeUntilNextCycle = await PerformLoadBalancingAsync(cycleDuration, partitionIds, cance |
| | 1164 | |
|
| 108 | 1165 | | if (remainingTimeUntilNextCycle != TimeSpan.Zero) |
| | 1166 | | { |
| 106 | 1167 | | await Task.Delay(remainingTimeUntilNextCycle, cancellationToken).ConfigureAwait(false); |
| | 1168 | | } |
| | 1169 | | } |
| | 1170 | |
|
| | 1171 | | // Cancellation has been requested; throw the corresponding exception to maintain consistent behavior. |
| | 1172 | |
|
| 0 | 1173 | | throw new TaskCanceledException(); |
| 0 | 1174 | | } |
| 84 | 1175 | | catch (OperationCanceledException ex) |
| | 1176 | | { |
| 84 | 1177 | | throw new TaskCanceledException(ex.Message, ex); |
| | 1178 | | } |
| 16 | 1179 | | catch (Exception ex) when (ex.IsFatalException()) |
| | 1180 | | { |
| 0 | 1181 | | throw; |
| | 1182 | | } |
| 16 | 1183 | | catch (Exception ex) |
| | 1184 | | { |
| | 1185 | | // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility |
| | 1186 | | // for observing or surfacing exceptions that may occur in the handler. |
| | 1187 | |
|
| 16 | 1188 | | _ = InvokeOnProcessingErrorAsync(ex, null, Resources.OperationEventProcessingLoop, CancellationToken.Non |
| 16 | 1189 | | Logger.EventProcessorTaskError(Identifier, EventHubName, ConsumerGroup, ex.Message); |
| | 1190 | |
|
| 16 | 1191 | | throw; |
| | 1192 | | } |
| 0 | 1193 | | } |
| | 1194 | |
|
| | 1195 | | /// <summary> |
| | 1196 | | /// Performs the tasks needed for a single cycle of load balancing. |
| | 1197 | | /// </summary> |
| | 1198 | | /// |
| | 1199 | | /// <param name="cycleDuration">Responsible for tracking the duration of this cycle. It is expected that caller |
| | 1200 | | /// <param name="partitionIds">The valid set of partition identifiers for the Event Hub to which the processor i |
| | 1201 | | /// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal the request to cancel |
| | 1202 | | /// |
| | 1203 | | /// <returns>The interval that callers should delay before invoking the next load balancing cycle; <see cref="Ti |
| | 1204 | | /// |
| | 1205 | | private async Task<TimeSpan> PerformLoadBalancingAsync(ValueStopwatch cycleDuration, |
| | 1206 | | string[] partitionIds, |
| | 1207 | | CancellationToken cancellationToken) |
| | 1208 | | { |
| 108 | 1209 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 1210 | |
|
| | 1211 | | // Perform the tasks needed for the load balancing cycle, including managing partition ownership in response |
| | 1212 | | // lost, or faulted partition processors. |
| | 1213 | | // |
| | 1214 | | // Ensure that any errors observed, whether fatal or non-fatal, are surfaced to the exception handler in |
| | 1215 | | // a fire-and-forget manner. The processor does not assume responsibility for observing or surfacing except |
| | 1216 | | // that may occur in the handler, so the associated task is intentionally unobserved. |
| | 1217 | |
|
| 108 | 1218 | | var claimedOwnership = default(EventProcessorPartitionOwnership); |
| | 1219 | |
|
| 108 | 1220 | | if ((partitionIds != default) && (partitionIds.Length > 0)) |
| | 1221 | | { |
| | 1222 | | try |
| | 1223 | | { |
| 58 | 1224 | | claimedOwnership = await LoadBalancer.RunLoadBalancingAsync(partitionIds, cancellationToken).Configu |
| 50 | 1225 | | } |
| | 1226 | | catch (EventHubsException ex) |
| 6 | 1227 | | when (Resources.OperationClaimOwnership.Equals(ex.GetFailureOperation(), StringComparison.InvariantC |
| | 1228 | | { |
| | 1229 | | // If a specific partition was associated with the failure, it is carried by a well-known data membe |
| | 1230 | |
|
| 6 | 1231 | | var partitionId = ex.GetFailureData<string>(); |
| | 1232 | |
|
| 6 | 1233 | | var partition = (partitionId ?? string.Empty) switch |
| 6 | 1234 | | { |
| 14 | 1235 | | string id when (id.Length == 0) => null, |
| 12 | 1236 | | string _ when (ActivePartitionProcessors.TryGetValue(partitionId, out var partitionProcessor)) = |
| 8 | 1237 | | _ => new TPartition { PartitionId = partitionId } |
| 6 | 1238 | | }; |
| | 1239 | |
|
| 6 | 1240 | | _ = InvokeOnProcessingErrorAsync(ex.InnerException ?? ex, partition, Resources.OperationClaimOwnersh |
| 6 | 1241 | | Logger.EventProcessorClaimOwnershipError(Identifier, EventHubName, ConsumerGroup, partitionId, ((ex. |
| 6 | 1242 | | } |
| 2 | 1243 | | catch (Exception ex) when (ex.IsNotType<TaskCanceledException>()) |
| | 1244 | | { |
| 2 | 1245 | | _ = InvokeOnProcessingErrorAsync(ex, null, Resources.OperationLoadBalancing, CancellationToken.None) |
| 2 | 1246 | | Logger.EventProcessorLoadBalancingError(Identifier, EventHubName, ConsumerGroup, ex.Message); |
| 2 | 1247 | | } |
| | 1248 | |
|
| | 1249 | | // If a partition was claimed, begin processing it if not already being processed. |
| | 1250 | |
|
| 58 | 1251 | | if ((claimedOwnership != default) && (!ActivePartitionProcessors.ContainsKey(claimedOwnership.PartitionI |
| | 1252 | | { |
| 38 | 1253 | | await TryStartProcessingPartitionAsync(claimedOwnership.PartitionId, cancellationToken).ConfigureAwa |
| | 1254 | | } |
| | 1255 | | } |
| | 1256 | |
|
| | 1257 | | // Some ownership for some previously claimed partitions may have expired or have been stolen; stop the proc |
| | 1258 | | // which are no longer owned. |
| | 1259 | |
|
| 108 | 1260 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 1261 | |
|
| 108 | 1262 | | await Task.WhenAll(ActivePartitionProcessors.Keys |
| 108 | 1263 | | .Except(LoadBalancer.OwnedPartitionIds) |
| 112 | 1264 | | .Select(partitionId => TryStopProcessingPartitionAsync(partitionId, ProcessingStoppedReason.OwnershipLos |
| 108 | 1265 | | .ConfigureAwait(false); |
| | 1266 | |
|
| | 1267 | | // The remaining processing tasks should be running. To ensure that is the case, validate the status of the |
| | 1268 | | // and restart processing if it has failed. It is also possible that task creation failed when the processi |
| | 1269 | | // in which case there would be no task; create them. |
| | 1270 | |
|
| 108 | 1271 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 1272 | |
|
| 108 | 1273 | | await Task.WhenAll(LoadBalancer.OwnedPartitionIds |
| 108 | 1274 | | .Select(async partitionId => |
| 108 | 1275 | | { |
| 162 | 1276 | | if (!ActivePartitionProcessors.TryGetValue(partitionId, out var partitionProcessor) || partitionProc |
| 108 | 1277 | | { |
| 114 | 1278 | | await TryStopProcessingPartitionAsync(partitionId, ProcessingStoppedReason.OwnershipLost, cancel |
| 114 | 1279 | | await TryStartProcessingPartitionAsync(partitionId, cancellationToken).ConfigureAwait(false); |
| 108 | 1280 | | } |
| 162 | 1281 | | })) |
| 108 | 1282 | | .ConfigureAwait(false); |
| | 1283 | |
|
| | 1284 | | // If load balancing is greedy and there was a partition claimed, then signal that there should be no delay |
| | 1285 | | // invoking the next load balancing cycle. |
| | 1286 | |
|
| 108 | 1287 | | if ((Options.LoadBalancingStrategy == LoadBalancingStrategy.Greedy) && (!LoadBalancer.IsBalanced)) |
| | 1288 | | { |
| 2 | 1289 | | return TimeSpan.Zero; |
| | 1290 | | } |
| | 1291 | |
|
| | 1292 | | // Wait the remaining time, if any, to start the next cycle. |
| | 1293 | |
|
| 106 | 1294 | | return LoadBalancer.LoadBalanceInterval.CalculateRemaining(cycleDuration.GetElapsedTime()); |
| 108 | 1295 | | } |
| | 1296 | |
|
| | 1297 | | /// <summary> |
| | 1298 | | /// Attempts to begin processing the requested partition in the background and update tracking state |
| | 1299 | | /// so that processing can be stopped. |
| | 1300 | | /// </summary> |
| | 1301 | | /// |
| | 1302 | | /// <param name="partitionId">The identifier of the Event Hub partition whose processing should be started.</par |
| | 1303 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 1304 | | /// |
| | 1305 | | /// <returns><c>true</c> if processing was successfully started; otherwise, <c>false</c>.</returns> |
| | 1306 | | /// |
| | 1307 | | /// <remarks> |
| | 1308 | | /// Exceptions encountered in this method will be logged and will result in the error handler being |
| | 1309 | | /// invoked. They will not be surfaced to callers. This is intended to be a safe operation consumed |
| | 1310 | | /// as part of the load balancing cycle, which is failure-tolerant. |
| | 1311 | | /// </remarks> |
| | 1312 | | /// |
| | 1313 | | private async Task<bool> TryStartProcessingPartitionAsync(string partitionId, |
| | 1314 | | CancellationToken cancellationToken) |
| | 1315 | | { |
| 44 | 1316 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 44 | 1317 | | Logger.EventProcessorPartitionProcessingStart(partitionId, Identifier, EventHubName, ConsumerGroup); |
| | 1318 | |
|
| 44 | 1319 | | var partition = new TPartition { PartitionId = partitionId }; |
| 44 | 1320 | | var operationDescription = Resources.OperationClaimOwnership; |
| 44 | 1321 | | var startingPosition = Options.DefaultStartingPosition; |
| 44 | 1322 | | var cancellationSource = default(CancellationTokenSource); |
| | 1323 | |
|
| | 1324 | | try |
| | 1325 | | { |
| | 1326 | | // Initialize the partition context; the handler is responsible for initialing any custom fields of the |
| | 1327 | |
|
| 44 | 1328 | | await OnInitializingPartitionAsync(partition, cancellationToken).ConfigureAwait(false); |
| | 1329 | |
|
| | 1330 | | // Query the available checkpoints for the partition. |
| | 1331 | |
|
| 44 | 1332 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 44 | 1333 | | operationDescription = Resources.OperationListCheckpoints; |
| | 1334 | |
|
| 44 | 1335 | | var checkpoints = await ListCheckpointsAsync(cancellationToken).ConfigureAwait(false); |
| 44 | 1336 | | operationDescription = Resources.OperationClaimOwnership; |
| | 1337 | |
|
| | 1338 | | // Determine the starting position for processing the partition. |
| | 1339 | |
|
| 90 | 1340 | | foreach (var checkpoint in checkpoints) |
| | 1341 | | { |
| 2 | 1342 | | if (checkpoint.PartitionId == partitionId) |
| | 1343 | | { |
| 2 | 1344 | | startingPosition = checkpoint.StartingPosition; |
| 2 | 1345 | | break; |
| | 1346 | | } |
| | 1347 | | } |
| | 1348 | |
|
| | 1349 | | // Create and register the partition processor. Ownership of the cancellationSource is transferred |
| | 1350 | | // to the processor upon creation, including the responsibility for disposal. |
| | 1351 | |
|
| 44 | 1352 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 1353 | |
|
| 44 | 1354 | | cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); |
| 44 | 1355 | | var processor = CreatePartitionProcessor(partition, startingPosition, cancellationSource); |
| | 1356 | |
|
| 0 | 1357 | | ActivePartitionProcessors.AddOrUpdate(partitionId, processor, (key, value) => processor); |
| 40 | 1358 | | cancellationSource = null; |
| | 1359 | |
|
| 40 | 1360 | | return true; |
| | 1361 | | } |
| 4 | 1362 | | catch (Exception ex) |
| | 1363 | | { |
| | 1364 | | // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility |
| | 1365 | | // for observing or surfacing exceptions that may occur in the handler. |
| | 1366 | |
|
| 4 | 1367 | | _ = InvokeOnProcessingErrorAsync(ex, partition, operationDescription, CancellationToken.None); |
| 4 | 1368 | | Logger.EventProcessorPartitionProcessingStartError(partitionId, Identifier, EventHubName, ConsumerGroup, |
| | 1369 | |
|
| 4 | 1370 | | cancellationSource?.Cancel(); |
| 4 | 1371 | | cancellationSource?.Dispose(); |
| 4 | 1372 | | return false; |
| | 1373 | | } |
| | 1374 | | finally |
| | 1375 | | { |
| 44 | 1376 | | Logger.EventProcessorPartitionProcessingStartComplete(partitionId, Identifier, EventHubName, ConsumerGro |
| | 1377 | | } |
| 44 | 1378 | | } |
| | 1379 | |
|
| | 1380 | | /// <summary> |
| | 1381 | | /// Attempts to stop processing the requested partition. |
| | 1382 | | /// </summary> |
| | 1383 | | /// |
| | 1384 | | /// <param name="partitionId">The identifier of the Event Hub partition whose processing should be stopped.</par |
| | 1385 | | /// <param name="reason">The reason why the processing is being stopped.</param> |
| | 1386 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 1387 | | /// |
| | 1388 | | /// <returns><c>true</c> if the <paramref name="partitionId"/> was owned and was being processed; otherwise, <c> |
| | 1389 | | /// |
| | 1390 | | /// <remarks> |
| | 1391 | | /// Exceptions encountered when stopping processing for an owned partition will be logged and will result in t |
| | 1392 | | /// being invoked. They will not be surfaced to callers. This is intended to be a safe operation consumed |
| | 1393 | | /// as part of the load balancing cycle, which is failure-tolerant. |
| | 1394 | | /// </remarks> |
| | 1395 | | /// |
| | 1396 | | private async Task<bool> TryStopProcessingPartitionAsync(string partitionId, |
| | 1397 | | ProcessingStoppedReason reason, |
| | 1398 | | CancellationToken cancellationToken) |
| | 1399 | | { |
| 48 | 1400 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 48 | 1401 | | Logger.EventProcessorPartitionProcessingStop(partitionId, Identifier, EventHubName, ConsumerGroup); |
| | 1402 | |
|
| 48 | 1403 | | var partition = default(TPartition); |
| | 1404 | |
|
| | 1405 | | try |
| | 1406 | | { |
| | 1407 | | // If the partition processor is not being tracked or could not be retrieved from the tracking items, |
| | 1408 | | // then it cannot be stopped. |
| | 1409 | |
|
| 48 | 1410 | | if (!ActivePartitionProcessors.TryRemove(partitionId, out var partitionProcessor)) |
| | 1411 | | { |
| 4 | 1412 | | return false; |
| | 1413 | | } |
| | 1414 | |
|
| | 1415 | | // Attempt to stop the processor; any exceptions should be treated as a problem with processing, not |
| | 1416 | | // associated with the attempt to stop. |
| | 1417 | |
|
| 44 | 1418 | | partition = partitionProcessor.Partition; |
| | 1419 | |
|
| | 1420 | | try |
| | 1421 | | { |
| 44 | 1422 | | partitionProcessor.CancellationSource.Cancel(); |
| 44 | 1423 | | await partitionProcessor.ProcessingTask.ConfigureAwait(false); |
| 4 | 1424 | | } |
| 38 | 1425 | | catch (TaskCanceledException) |
| | 1426 | | { |
| | 1427 | | // This is expected; no action is needed. |
| 38 | 1428 | | } |
| 2 | 1429 | | catch |
| | 1430 | | { |
| | 1431 | | // The processing task is in a failed state; any logging and dispatching |
| | 1432 | | // to the error handler happened in the processing task before the exception |
| | 1433 | | // was thrown. All that remains is to override the reason for stopping. |
| | 1434 | |
|
| 2 | 1435 | | reason = ProcessingStoppedReason.OwnershipLost; |
| 2 | 1436 | | } |
| | 1437 | |
|
| 44 | 1438 | | partitionProcessor.Dispose(); |
| 44 | 1439 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 1440 | |
|
| | 1441 | | // Notify the handler of the now-closed partition, awaiting completion to allow for a more deterministic |
| | 1442 | | // for developers where the initialize and stop handlers will fire in a deterministic order and not inte |
| | 1443 | | // |
| | 1444 | | // Because the processor does not assume responsibility for observing or surfacing exceptions that may o |
| | 1445 | | // errors are logged but the error handler is not invoked nor does an exception in the handler constitut |
| | 1446 | | // processing the partition. This also aims to prevent an infinite loop scenario where StopProcessing i |
| | 1447 | | // error handler, which calls the partition stopped handler, which has an exception that again calls the |
| | 1448 | |
|
| | 1449 | | try |
| | 1450 | | { |
| 44 | 1451 | | await OnPartitionProcessingStoppedAsync(partition, reason, cancellationToken).ConfigureAwait(false); |
| 42 | 1452 | | } |
| 0 | 1453 | | catch (TaskCanceledException) |
| | 1454 | | { |
| | 1455 | | // This is expected; no action is needed. |
| 0 | 1456 | | } |
| 2 | 1457 | | catch (Exception ex) |
| | 1458 | | { |
| 2 | 1459 | | Logger.EventProcessorPartitionProcessingStopError(partitionId, Identifier, EventHubName, ConsumerGro |
| 2 | 1460 | | } |
| | 1461 | |
|
| 44 | 1462 | | return true; |
| | 1463 | | } |
| 0 | 1464 | | catch (Exception ex) when (ex.IsNotType<TaskCanceledException>()) |
| | 1465 | | { |
| | 1466 | | // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility |
| | 1467 | | // for observing or surfacing exceptions that may occur in the handler. |
| | 1468 | |
|
| 0 | 1469 | | _ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationSurrenderOwnership, CancellationToken |
| 0 | 1470 | | Logger.EventProcessorPartitionProcessingStopError(partitionId, Identifier, EventHubName, ConsumerGroup, |
| | 1471 | |
|
| 0 | 1472 | | return false; |
| | 1473 | | } |
| | 1474 | | finally |
| | 1475 | | { |
| 48 | 1476 | | Logger.EventProcessorPartitionProcessingStopComplete(partitionId, Identifier, EventHubName, ConsumerGrou |
| | 1477 | | } |
| 48 | 1478 | | } |
| | 1479 | |
|
| | 1480 | | /// <summary> |
| | 1481 | | /// Performs the tasks needed invoke the <see cref="OnProcessingErrorAsync" /> method in the background, |
| | 1482 | | /// as it is intended to be a fire-and-forget operation. |
| | 1483 | | /// </summary> |
| | 1484 | | /// |
| | 1485 | | /// <param name="exception">The exception that occurred during operation of the event processor.</param> |
| | 1486 | | /// <param name="partition">The context of the partition associated with the error, if any; otherwise, <c>null</ |
| | 1487 | | /// <param name="operationDescription">A short textual description of the operation during which the exception o |
| | 1488 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 1489 | | /// |
| | 1490 | | private Task InvokeOnProcessingErrorAsync(Exception exception, |
| | 1491 | | TPartition partition, |
| | 1492 | | string operationDescription, |
| 160 | 1493 | | CancellationToken cancellationToken) => Task.Run(() => OnProcessingErr |
| | 1494 | |
|
| | 1495 | | /// <summary> |
| | 1496 | | /// A virtual <see cref="StorageManager" /> instance that delegates calls to the |
| | 1497 | | /// <see cref="EventProcessor{TPartition}" /> to which it is associated. |
| | 1498 | | /// </summary> |
| | 1499 | | /// |
| | 1500 | | private class DelegatingStorageManager : StorageManager |
| | 1501 | | { |
| | 1502 | | /// <summary> |
| | 1503 | | /// The <see cref="EventProcessor{TPartition}" /> that the storage manager is associated with. |
| | 1504 | | /// </summary> |
| | 1505 | | /// |
| 58 | 1506 | | private EventProcessor<TPartition> Processor { get; } |
| | 1507 | |
|
| | 1508 | | /// <summary> |
| | 1509 | | /// Initializes a new instance of the <see cref="DelegatingStorageManager"/> class. |
| | 1510 | | /// </summary> |
| | 1511 | | /// |
| | 1512 | | /// <param name="processor">The <see cref="EventProcessor{TPartition}" /> to associate the storage manager w |
| | 1513 | | /// |
| 332 | 1514 | | public DelegatingStorageManager(EventProcessor<TPartition> processor) => Processor = processor; |
| | 1515 | |
|
| | 1516 | | /// <summary> |
| | 1517 | | /// Retrieves a complete ownership list from the data store. |
| | 1518 | | /// </summary> |
| | 1519 | | /// |
| | 1520 | | /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associa |
| | 1521 | | /// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with. This i |
| | 1522 | | /// <param name="consumerGroup">The name of the consumer group the ownership are associated with. This is ig |
| | 1523 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to canc |
| | 1524 | | /// |
| | 1525 | | /// <returns>An enumerable containing all the existing ownership for the associated Event Hub and consumer g |
| | 1526 | | /// |
| | 1527 | | public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQua |
| | 1528 | | string eventHub |
| | 1529 | | string consumer |
| 2 | 1530 | | CancellationTok |
| | 1531 | | /// <summary> |
| | 1532 | | /// Attempts to claim ownership of partitions for processing. |
| | 1533 | | /// </summary> |
| | 1534 | | /// |
| | 1535 | | /// <param name="partitionOwnership">An enumerable containing all the ownership to claim.</param> |
| | 1536 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to canc |
| | 1537 | | /// |
| | 1538 | | /// <returns>An enumerable containing the successfully claimed ownership.</returns> |
| | 1539 | | /// |
| | 1540 | | public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<Ev |
| 54 | 1541 | | CancellationTo |
| | 1542 | |
|
| | 1543 | | /// <summary> |
| | 1544 | | /// Retrieves a list of all the checkpoints in a data store for a given namespace, Event Hub and consumer |
| | 1545 | | /// </summary> |
| | 1546 | | /// |
| | 1547 | | /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associa |
| | 1548 | | /// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with. This is |
| | 1549 | | /// <param name="consumerGroup">The name of the consumer group the checkpoints are associated with. This is |
| | 1550 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to canc |
| | 1551 | | /// |
| | 1552 | | /// <returns>An enumerable containing all the existing checkpoints for the associated Event Hub and consumer |
| | 1553 | | /// |
| | 1554 | | public override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(string fullyQualified |
| | 1555 | | string eventHubName, |
| | 1556 | | string consumerGroup, |
| 2 | 1557 | | CancellationToken can |
| | 1558 | |
|
| | 1559 | | /// <summary> |
| | 1560 | | /// This method is not implemented for this type. |
| | 1561 | | /// </summary> |
| | 1562 | | /// |
| | 1563 | | /// <param name="checkpoint">The checkpoint containing the information to be stored.</param> |
| | 1564 | | /// <param name="eventData">The event to use as the basis for the checkpoint's starting position.</param> |
| | 1565 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to canc |
| | 1566 | | /// |
| | 1567 | | /// <exception cref="NotImplementedException">The method is not implemented for this type.</exception> |
| | 1568 | | /// |
| | 1569 | | public override Task UpdateCheckpointAsync(EventProcessorCheckpoint checkpoint, |
| | 1570 | | EventData eventData, |
| 2 | 1571 | | CancellationToken cancellationToken) => throw new NotImplementedE |
| | 1572 | | } |
| | 1573 | |
|
| | 1574 | | /// <summary> |
| | 1575 | | /// The set of information needed to track and manage the active processing |
| | 1576 | | /// of a partition. |
| | 1577 | | /// </summary> |
| | 1578 | | /// |
| | 1579 | | internal class PartitionProcessor : IDisposable |
| | 1580 | | { |
| | 1581 | | /// <summary>The task that is performing the processing.</summary> |
| | 1582 | | public readonly Task ProcessingTask; |
| | 1583 | |
|
| | 1584 | | /// <summary>The partition that is being processed.</summary> |
| | 1585 | | public readonly TPartition Partition; |
| | 1586 | |
|
| | 1587 | | /// <summary>The source token that can be used to cancel the processing for the associated <see cref="Proces |
| | 1588 | | public readonly CancellationTokenSource CancellationSource; |
| | 1589 | |
|
| | 1590 | | /// <summary>A function that can be used to read the information about the last enqueued event of the partit |
| | 1591 | | public readonly Func<LastEnqueuedEventProperties> ReadLastEnqueuedEventProperties; |
| | 1592 | |
|
| | 1593 | | /// <summary> |
| | 1594 | | /// Initializes a new instance of the <see cref="PartitionProcessor"/> class. |
| | 1595 | | /// </summary> |
| | 1596 | | /// |
| | 1597 | | /// <param name="processingTask">The task that is performing the processing.</param> |
| | 1598 | | /// <param name="partition">The partition that is being processed.</param> |
| | 1599 | | /// <param name="readLastEnqueuedEventProperties">A function that can be used to read the information about |
| | 1600 | | /// <param name="cancellationSource">he source token that can be used to cancel the processing.</param> |
| | 1601 | | /// |
| 92 | 1602 | | public PartitionProcessor(Task processingTask, |
| 92 | 1603 | | TPartition partition, |
| 92 | 1604 | | Func<LastEnqueuedEventProperties> readLastEnqueuedEventProperties, |
| 184 | 1605 | | CancellationTokenSource cancellationSource) => (ProcessingTask, Partition, ReadLas |
| | 1606 | |
|
| | 1607 | | /// <summary> |
| | 1608 | | /// Performs tasks needed to clean-up the disposable resources used by the processor. This method does |
| | 1609 | | /// not assume responsibility for signaling the cancellation source or awaiting the <see cref="ProcessingT |
| | 1610 | | /// </summary> |
| | 1611 | | /// |
| | 1612 | | public void Dispose() |
| | 1613 | | { |
| 44 | 1614 | | CancellationSource?.Dispose(); |
| 44 | 1615 | | ProcessingTask?.Dispose(); |
| 44 | 1616 | | } |
| | 1617 | | } |
| | 1618 | | } |
| | 1619 | | } |