| | 1 | | // Copyright (c) Microsoft. All rights reserved. |
| | 2 | | // Licensed under the MIT license. See LICENSE file in the project root for full license information. |
| | 3 | |
|
| | 4 | | namespace Microsoft.Azure.EventHubs.Processor |
| | 5 | | { |
| | 6 | | using System; |
| | 7 | | using System.Collections.Concurrent; |
| | 8 | | using System.Collections.Generic; |
| | 9 | | using System.Diagnostics; |
| | 10 | | using System.Linq; |
| | 11 | | using System.Threading; |
| | 12 | | using System.Threading.Tasks; |
| | 13 | | using Microsoft.Azure.EventHubs.Primitives; |
| | 14 | |
|
| | 15 | | class PartitionManager |
| | 16 | | { |
| | 17 | | readonly EventProcessorHost host; |
| | 18 | | readonly ConcurrentDictionary<string, PartitionPump> partitionPumps; |
| | 19 | |
|
| | 20 | | IList<string> partitionIds; |
| | 21 | | CancellationTokenSource cancellationTokenSource; |
| | 22 | | Task runTask; |
| | 23 | |
|
| 0 | 24 | | internal PartitionManager(EventProcessorHost host) |
| | 25 | | { |
| 0 | 26 | | this.host = host; |
| 0 | 27 | | this.cancellationTokenSource = new CancellationTokenSource(); |
| 0 | 28 | | this.partitionPumps = new ConcurrentDictionary<string, PartitionPump>(); |
| 0 | 29 | | } |
| | 30 | |
|
| | 31 | | public async Task<IEnumerable<string>> GetPartitionIdsAsync() |
| | 32 | | { |
| 0 | 33 | | if (this.partitionIds == null) |
| | 34 | | { |
| 0 | 35 | | EventHubClient eventHubClient = null; |
| | 36 | | try |
| | 37 | | { |
| 0 | 38 | | eventHubClient = this.host.CreateEventHubClient(); |
| 0 | 39 | | eventHubClient.WebProxy = this.host.EventProcessorOptions.WebProxy; |
| 0 | 40 | | var runtimeInfo = await eventHubClient.GetRuntimeInformationAsync().ConfigureAwait(false); |
| 0 | 41 | | this.partitionIds = runtimeInfo.PartitionIds.ToList(); |
| 0 | 42 | | } |
| 0 | 43 | | catch (Exception e) |
| | 44 | | { |
| 0 | 45 | | throw new EventProcessorConfigurationException("Encountered error while fetching the list of EventHu |
| | 46 | | } |
| | 47 | | finally |
| | 48 | | { |
| 0 | 49 | | if (eventHubClient != null) |
| | 50 | | { |
| 0 | 51 | | await eventHubClient.CloseAsync().ConfigureAwait(false); |
| | 52 | | } |
| | 53 | | } |
| | 54 | |
|
| 0 | 55 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"PartitionCount: {this.partitionIds |
| 0 | 56 | | } |
| | 57 | |
|
| 0 | 58 | | return this.partitionIds; |
| 0 | 59 | | } |
| | 60 | |
|
| | 61 | | public async Task StartAsync() |
| | 62 | | { |
| 0 | 63 | | if (this.runTask != null) |
| | 64 | | { |
| 0 | 65 | | throw new InvalidOperationException("A PartitionManager cannot be started multiple times."); |
| | 66 | | } |
| | 67 | |
|
| 0 | 68 | | await this.InitializeStoresAsync().ConfigureAwait(false); |
| | 69 | |
|
| 0 | 70 | | this.runTask = this.RunAsync(); |
| 0 | 71 | | } |
| | 72 | |
|
| | 73 | | public async Task StopAsync() |
| | 74 | | { |
| 0 | 75 | | this.cancellationTokenSource.Cancel(); |
| 0 | 76 | | var localRunTask = this.runTask; |
| 0 | 77 | | if (localRunTask != null) |
| | 78 | | { |
| 0 | 79 | | await localRunTask.ConfigureAwait(false); |
| | 80 | | } |
| | 81 | |
|
| | 82 | | // once it is closed let's reset the task |
| 0 | 83 | | this.runTask = null; |
| 0 | 84 | | this.cancellationTokenSource = new CancellationTokenSource(); |
| 0 | 85 | | } |
| | 86 | |
|
| | 87 | | async Task RunAsync() |
| | 88 | | { |
| | 89 | | try |
| | 90 | | { |
| 0 | 91 | | await this.RunLoopAsync(this.cancellationTokenSource.Token).ConfigureAwait(false); |
| 0 | 92 | | } |
| 0 | 93 | | catch (TaskCanceledException) when (this.cancellationTokenSource.IsCancellationRequested) |
| | 94 | | { |
| | 95 | | // Expected during host shutdown. |
| 0 | 96 | | } |
| 0 | 97 | | catch (Exception e) |
| | 98 | | { |
| | 99 | | // Ideally RunLoop should never throw. |
| 0 | 100 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manager m |
| 0 | 101 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostAction |
| 0 | 102 | | } |
| | 103 | |
|
| | 104 | | try |
| | 105 | | { |
| | 106 | | // Cleanup |
| 0 | 107 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Shutting down all pumps"); |
| 0 | 108 | | await this.RemoveAllPumpsAsync(CloseReason.Shutdown).ConfigureAwait(false); |
| 0 | 109 | | } |
| 0 | 110 | | catch (Exception e) |
| | 111 | | { |
| 0 | 112 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during shutdown", e.ToStri |
| 0 | 113 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostAction |
| 0 | 114 | | } |
| 0 | 115 | | } |
| | 116 | |
|
| | 117 | | async Task InitializeStoresAsync() //throws InterruptedException, ExecutionException, ExceptionWithAction |
| | 118 | | { |
| | 119 | | // Make sure the lease store exists |
| 0 | 120 | | ILeaseManager leaseManager = this.host.LeaseManager; |
| 0 | 121 | | if (!await leaseManager.LeaseStoreExistsAsync().ConfigureAwait(false)) |
| | 122 | | { |
| 0 | 123 | | await RetryAsync(() => leaseManager.CreateLeaseStoreIfNotExistsAsync(), null, "Failure creating lease st |
| 0 | 124 | | "Out of retries creating lease store for this Event Hub", EventProcessorHostActionStrings.Creati |
| | 125 | | } |
| | 126 | | // else |
| | 127 | | // lease store already exists, no work needed |
| | 128 | |
|
| 0 | 129 | | var partitionIds = await this.GetPartitionIdsAsync().ConfigureAwait(false); |
| | 130 | |
|
| | 131 | | // Now make sure the leases exist |
| 0 | 132 | | var createLeaseTasks = new List<Task>(); |
| 0 | 133 | | foreach (string id in partitionIds) |
| | 134 | | { |
| 0 | 135 | | var subjectId = id; |
| 0 | 136 | | createLeaseTasks.Add(RetryAsync(() => leaseManager.CreateLeaseIfNotExistsAsync(subjectId), subjectId, $" |
| 0 | 137 | | $"Out of retries creating lease for partition {subjectId}", EventProcessorHostActionStrings.Crea |
| | 138 | | } |
| | 139 | |
|
| 0 | 140 | | await Task.WhenAll(createLeaseTasks).ConfigureAwait(false); |
| | 141 | |
|
| | 142 | | // Make sure the checkpoint store exists |
| 0 | 143 | | ICheckpointManager checkpointManager = this.host.CheckpointManager; |
| 0 | 144 | | if (!await checkpointManager.CheckpointStoreExistsAsync().ConfigureAwait(false)) |
| | 145 | | { |
| 0 | 146 | | await RetryAsync(() => checkpointManager.CreateCheckpointStoreIfNotExistsAsync(), null, "Failure creatin |
| 0 | 147 | | "Out of retries creating checkpoint store for this Event Hub", EventProcessorHostActionStrings.C |
| | 148 | | } |
| | 149 | | // else |
| | 150 | | // checkpoint store already exists, no work needed |
| | 151 | |
|
| | 152 | | // Now make sure the checkpoints exist |
| 0 | 153 | | var createCheckpointTasks = new List<Task>(); |
| 0 | 154 | | foreach (string id in partitionIds) |
| | 155 | | { |
| 0 | 156 | | var subjectId = id; |
| 0 | 157 | | createCheckpointTasks.Add(RetryAsync(() => checkpointManager.CreateCheckpointIfNotExistsAsync(subjectId) |
| 0 | 158 | | $"Out of retries creating checkpoint for partition {subjectId}", EventProcessorHostActionStrings |
| | 159 | | } |
| | 160 | |
|
| 0 | 161 | | await Task.WhenAll(createCheckpointTasks).ConfigureAwait(false); |
| 0 | 162 | | } |
| | 163 | |
|
| | 164 | | // Throws if it runs out of retries. If it returns, action succeeded. |
| | 165 | | async Task RetryAsync(Func<Task> lambda, string partitionId, string retryMessage, string finalFailureMessage, st |
| | 166 | | { |
| 0 | 167 | | Exception finalException = null; |
| 0 | 168 | | bool createdOK = false; |
| 0 | 169 | | int retryCount = 0; |
| | 170 | | do |
| | 171 | | { |
| | 172 | | try |
| | 173 | | { |
| 0 | 174 | | await lambda().ConfigureAwait(false); |
| 0 | 175 | | createdOK = true; |
| 0 | 176 | | } |
| 0 | 177 | | catch (Exception ex) |
| | 178 | | { |
| 0 | 179 | | if (partitionId != null) |
| | 180 | | { |
| 0 | 181 | | ProcessorEventSource.Log.PartitionPumpWarning(this.host.HostName, partitionId, retryMessage, ex. |
| | 182 | | } |
| | 183 | | else |
| | 184 | | { |
| 0 | 185 | | ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, retryMessage, ex.ToString |
| | 186 | | } |
| | 187 | |
|
| 0 | 188 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, partitionId, ex, action); |
| | 189 | |
|
| 0 | 190 | | finalException = ex; |
| 0 | 191 | | retryCount++; |
| 0 | 192 | | } |
| | 193 | | } |
| 0 | 194 | | while (!createdOK && (retryCount < maxRetries)); |
| | 195 | |
|
| 0 | 196 | | if (!createdOK) |
| | 197 | | { |
| 0 | 198 | | if (partitionId != null) |
| | 199 | | { |
| 0 | 200 | | ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, partitionId, finalFailureMessage); |
| | 201 | | } |
| | 202 | | else |
| | 203 | | { |
| 0 | 204 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, finalFailureMessage, null); |
| | 205 | | } |
| | 206 | |
|
| 0 | 207 | | throw new EventProcessorRuntimeException(finalFailureMessage, action, finalException); |
| | 208 | | } |
| 0 | 209 | | } |
| | 210 | |
|
| | 211 | | async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception, ExceptionWithAction |
| | 212 | | { |
| 0 | 213 | | var loopStopwatch = new Stopwatch(); |
| | 214 | |
|
| 0 | 215 | | while (!cancellationToken.IsCancellationRequested) |
| | 216 | | { |
| | 217 | | // Mark start time so we can use the duration taken to calculate renew interval. |
| 0 | 218 | | loopStopwatch.Restart(); |
| | 219 | |
|
| 0 | 220 | | ILeaseManager leaseManager = this.host.LeaseManager; |
| 0 | 221 | | var allLeases = new ConcurrentDictionary<string, Lease>(); |
| 0 | 222 | | var leasesOwnedByOthers = new ConcurrentDictionary<string, Lease>(); |
| | 223 | |
|
| | 224 | | // Inspect all leases. |
| | 225 | | // Acquire any expired leases. |
| | 226 | | // Renew any leases that currently belong to us. |
| | 227 | | IEnumerable<Lease> downloadedLeases; |
| 0 | 228 | | var renewLeaseTasks = new List<Task>(); |
| 0 | 229 | | int ourLeaseCount = 0; |
| | 230 | |
|
| | 231 | | try |
| | 232 | | { |
| | 233 | | try |
| | 234 | | { |
| 0 | 235 | | downloadedLeases = await leaseManager.GetAllLeasesAsync().ConfigureAwait(false); |
| 0 | 236 | | } |
| 0 | 237 | | catch (Exception e) |
| | 238 | | { |
| 0 | 239 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception during downloadi |
| 0 | 240 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHo |
| | 241 | |
|
| | 242 | | // Avoid tight spin if getallleases call keeps failing. |
| 0 | 243 | | await Task.Delay(1000).ConfigureAwait(false); |
| | 244 | |
|
| 0 | 245 | | continue; |
| | 246 | | } |
| | 247 | |
|
| | 248 | | // First things first, renew owned leases. |
| 0 | 249 | | foreach (var lease in downloadedLeases) |
| | 250 | | { |
| 0 | 251 | | var subjectLease = lease; |
| | 252 | |
|
| | 253 | | try |
| | 254 | | { |
| 0 | 255 | | allLeases[subjectLease.PartitionId] = subjectLease; |
| 0 | 256 | | if (subjectLease.Owner == this.host.HostName && !(await subjectLease.IsExpired().ConfigureAw |
| | 257 | | { |
| 0 | 258 | | ourLeaseCount++; |
| | 259 | |
|
| | 260 | | // Get lease from partition since we need the token at this point. |
| 0 | 261 | | if (!this.partitionPumps.TryGetValue(subjectLease.PartitionId, out var capturedPump)) |
| | 262 | | { |
| 0 | 263 | | continue; |
| | 264 | | } |
| | 265 | |
|
| 0 | 266 | | var capturedLease = capturedPump.Lease; |
| | 267 | |
|
| 0 | 268 | | ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, capturedLease.PartitionId |
| 0 | 269 | | renewLeaseTasks.Add(leaseManager.RenewLeaseAsync(capturedLease).ContinueWith(renewResult |
| 0 | 270 | | { |
| 0 | 271 | | if (renewResult.IsFaulted) |
| 0 | 272 | | { |
| 0 | 273 | | // Might have failed due to intermittent error or lease-lost. |
| 0 | 274 | | // Just log here, expired leases will be picked by same or another host anyway. |
| 0 | 275 | | ProcessorEventSource.Log.PartitionPumpError( |
| 0 | 276 | | this.host.HostName, |
| 0 | 277 | | capturedLease.PartitionId, |
| 0 | 278 | | "Failed to renew lease.", |
| 0 | 279 | | renewResult.Exception?.Message); |
| 0 | 280 | |
|
| 0 | 281 | | this.host.EventProcessorOptions.NotifyOfException( |
| 0 | 282 | | this.host.HostName, |
| 0 | 283 | | capturedLease.PartitionId, |
| 0 | 284 | | renewResult.Exception, |
| 0 | 285 | | EventProcessorHostActionStrings.RenewingLease); |
| 0 | 286 | |
|
| 0 | 287 | | // Nullify the owner on the lease in case this host lost it. |
| 0 | 288 | | // This helps to remove pump earlier reducing duplicate receives. |
| 0 | 289 | | if (renewResult.Exception?.GetBaseException() is LeaseLostException) |
| 0 | 290 | | { |
| 0 | 291 | | allLeases[capturedLease.PartitionId].Owner = null; |
| 0 | 292 | | } |
| 0 | 293 | | } |
| 0 | 294 | | }, cancellationToken)); |
| | 295 | | } |
| 0 | 296 | | else if (!await subjectLease.IsExpired().ConfigureAwait(false)) |
| | 297 | | { |
| 0 | 298 | | leasesOwnedByOthers[subjectLease.PartitionId] = subjectLease; |
| | 299 | | } |
| 0 | 300 | | } |
| 0 | 301 | | catch (Exception e) |
| | 302 | | { |
| 0 | 303 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during checkin |
| 0 | 304 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcess |
| 0 | 305 | | } |
| 0 | 306 | | } |
| | 307 | |
|
| | 308 | | // Wait until we are done with renewing our own leases here. |
| | 309 | | // In theory, this should never throw, error are logged and notified in the renew tasks. |
| 0 | 310 | | await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false); |
| 0 | 311 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished."); |
| | 312 | |
|
| | 313 | | // Check any expired leases that we can grab here. |
| 0 | 314 | | ourLeaseCount += await this.AcquireExpiredLeasesAsync(allLeases, leasesOwnedByOthers, ourLeaseCount, |
| 0 | 315 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Expired lease check is finished |
| | 316 | |
|
| | 317 | | // Grab more leases if available and needed for load balancing |
| 0 | 318 | | if (leasesOwnedByOthers.Count > 0) |
| | 319 | | { |
| 0 | 320 | | Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers.Values, ourLeaseCount); |
| | 321 | |
|
| | 322 | | // Don't attempt to steal the lease if current host has a pump for this partition id |
| | 323 | | // This is possible when current pump is in failed state due to lease moved to some other host. |
| 0 | 324 | | if (stealThisLease != null && !this.partitionPumps.ContainsKey(stealThisLease.PartitionId)) |
| | 325 | | { |
| | 326 | | try |
| | 327 | | { |
| | 328 | | // Get fresh content of lease subject to acquire. |
| 0 | 329 | | var downloadedLease = await leaseManager.GetLeaseAsync(stealThisLease.PartitionId).Confi |
| 0 | 330 | | allLeases[stealThisLease.PartitionId] = downloadedLease; |
| | 331 | |
|
| | 332 | | // Don't attempt to steal if lease is already expired. |
| | 333 | | // Expired leases are picked up by other hosts quickly. |
| | 334 | | // Don't attempt to steal if owner has changed from the calculation time to refresh time |
| 0 | 335 | | if (!await downloadedLease.IsExpired().ConfigureAwait(false) |
| 0 | 336 | | && downloadedLease.Owner == stealThisLease.Owner) |
| | 337 | | { |
| 0 | 338 | | ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.HostName, downloaded |
| 0 | 339 | | if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false)) |
| | 340 | | { |
| | 341 | | // Succeeded in stealing lease |
| 0 | 342 | | ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.HostName, downloa |
| 0 | 343 | | ourLeaseCount++; |
| | 344 | | } |
| | 345 | | else |
| | 346 | | { |
| | 347 | | // Acquisition failed. Make sure we don't leave the lease as owned. |
| 0 | 348 | | allLeases[stealThisLease.PartitionId].Owner = null; |
| | 349 | |
|
| 0 | 350 | | ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, |
| 0 | 351 | | "Failed to steal lease for partition " + downloadedLease.PartitionId, null); |
| | 352 | | } |
| | 353 | | } |
| 0 | 354 | | } |
| 0 | 355 | | catch (Exception e) |
| | 356 | | { |
| 0 | 357 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, |
| 0 | 358 | | "Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToS |
| 0 | 359 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, |
| 0 | 360 | | stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease); |
| | 361 | |
|
| | 362 | | // Acquisition failed. Make sure we don't leave the lease as owned. |
| 0 | 363 | | allLeases[stealThisLease.PartitionId].Owner = null; |
| 0 | 364 | | } |
| | 365 | | } |
| 0 | 366 | | } |
| | 367 | |
|
| | 368 | | // Update pump with new state of leases on owned partitions in parallel. |
| 0 | 369 | | var createRemovePumpTasks = new List<Task>(); |
| 0 | 370 | | foreach (string partitionId in allLeases.Keys) |
| | 371 | | { |
| 0 | 372 | | var subjectPartitionId = partitionId; |
| | 373 | |
|
| 0 | 374 | | Lease updatedLease = allLeases[subjectPartitionId]; |
| 0 | 375 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Lease on partition {update |
| | 376 | |
|
| 0 | 377 | | if (updatedLease.Owner == this.host.HostName) |
| | 378 | | { |
| 0 | 379 | | createRemovePumpTasks.Add(Task.Run(async () => |
| 0 | 380 | | { |
| 0 | 381 | | try |
| 0 | 382 | | { |
| 0 | 383 | | await this.CheckAndAddPumpAsync(subjectPartitionId, updatedLease).ConfigureAwait(fal |
| 0 | 384 | | } |
| 0 | 385 | | catch (Exception e) |
| 0 | 386 | | { |
| 0 | 387 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception dur |
| 0 | 388 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectPartiti |
| 0 | 389 | | } |
| 0 | 390 | | }, cancellationToken)); |
| | 391 | | } |
| 0 | 392 | | else if (this.partitionPumps.ContainsKey(partitionId)) |
| | 393 | | { |
| 0 | 394 | | createRemovePumpTasks.Add(Task.Run(async () => |
| 0 | 395 | | { |
| 0 | 396 | | try |
| 0 | 397 | | { |
| 0 | 398 | | await this.TryRemovePumpAsync(subjectPartitionId, CloseReason.LeaseLost).ConfigureAw |
| 0 | 399 | | } |
| 0 | 400 | | catch (Exception e) |
| 0 | 401 | | { |
| 0 | 402 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception dur |
| 0 | 403 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectPartiti |
| 0 | 404 | | } |
| 0 | 405 | | }, cancellationToken)); |
| | 406 | | } |
| | 407 | | } |
| | 408 | |
|
| 0 | 409 | | await Task.WhenAll(createRemovePumpTasks).ConfigureAwait(false); |
| 0 | 410 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Pump update is finished."); |
| 0 | 411 | | } |
| 0 | 412 | | catch (Exception e) |
| | 413 | | { |
| | 414 | | // TaskCancelledException is expected furing host unregister. |
| 0 | 415 | | if (e is TaskCanceledException) |
| | 416 | | { |
| 0 | 417 | | continue; |
| | 418 | | } |
| | 419 | |
|
| | 420 | | // Loop should not exit unless signalled via cancellation token. Log any failures and continue. |
| 0 | 421 | | ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manag |
| 0 | 422 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostAc |
| 0 | 423 | | } |
| | 424 | | finally |
| | 425 | | { |
| | 426 | | // Consider reducing the wait time with last lease-walkthrough's time taken. |
| 0 | 427 | | var elapsedTime = loopStopwatch.Elapsed; |
| 0 | 428 | | if (leaseManager.LeaseRenewInterval > elapsedTime) |
| | 429 | | { |
| 0 | 430 | | await Task.Delay(leaseManager.LeaseRenewInterval.Subtract(elapsedTime), cancellationToken).Confi |
| | 431 | | } |
| | 432 | | } |
| 0 | 433 | | } |
| 0 | 434 | | } |
| | 435 | |
|
| | 436 | | async Task<int> AcquireExpiredLeasesAsync( |
| | 437 | | ConcurrentDictionary<string, Lease> allLeases, |
| | 438 | | ConcurrentDictionary<string, Lease> leasesOwnedByOthers, |
| | 439 | | int ownedCount, |
| | 440 | | CancellationToken cancellationToken) |
| | 441 | | { |
| 0 | 442 | | var totalAcquired = 0; |
| 0 | 443 | | var hosts = new List<string>(); |
| | 444 | |
|
| | 445 | | // Find distinct hosts actively owning leases. |
| 0 | 446 | | foreach (var lease in allLeases.Values) |
| | 447 | | { |
| 0 | 448 | | if (lease.Owner != null && !hosts.Contains(lease.Owner) && !(await lease.IsExpired().ConfigureAwait(fals |
| | 449 | | { |
| 0 | 450 | | hosts.Add(lease.Owner); |
| | 451 | | } |
| 0 | 452 | | } |
| | 453 | |
|
| | 454 | | // Calculate how many more leases we can own. |
| 0 | 455 | | var hostCount = hosts.Count() == 0 ? 1 : hosts.Count(); |
| 0 | 456 | | var targetLeaseCount = (int)Math.Ceiling((double)allLeases.Count / (double)hostCount) - ownedCount; |
| | 457 | |
|
| | 458 | | // Attempt to acquire expired leases now up to allowed target lease count. |
| 0 | 459 | | var tasks = new List<Task>(); |
| 0 | 460 | | foreach (var possibleLease in allLeases.Values) |
| | 461 | | { |
| | 462 | | // Break if we already acquired enough number of leases. |
| 0 | 463 | | if (targetLeaseCount <= 0) |
| | 464 | | { |
| | 465 | | break; |
| | 466 | | } |
| | 467 | |
|
| 0 | 468 | | var subjectLease = possibleLease; |
| | 469 | |
|
| 0 | 470 | | if (await subjectLease.IsExpired().ConfigureAwait(false)) |
| | 471 | | { |
| 0 | 472 | | targetLeaseCount--; |
| 0 | 473 | | tasks.Add(Task.Run(async () => |
| 0 | 474 | | { |
| 0 | 475 | | try |
| 0 | 476 | | { |
| 0 | 477 | | // Get fresh content of lease subject to acquire. |
| 0 | 478 | | var downloadedLease = await this.host.LeaseManager.GetLeaseAsync(subjectLease.PartitionId).C |
| 0 | 479 | | allLeases[subjectLease.PartitionId] = downloadedLease; |
| 0 | 480 | |
|
| 0 | 481 | | // Check expired once more here incase another host have already leased this since we popula |
| 0 | 482 | | if (await downloadedLease.IsExpired().ConfigureAwait(false)) |
| 0 | 483 | | { |
| 0 | 484 | | ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.Partition |
| 0 | 485 | | if (await this.host.LeaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false |
| 0 | 486 | | { |
| 0 | 487 | | ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.Parti |
| 0 | 488 | | leasesOwnedByOthers.TryRemove(downloadedLease.PartitionId, out var removedLease); |
| 0 | 489 | | Interlocked.Increment(ref totalAcquired); |
| 0 | 490 | | } |
| 0 | 491 | | else |
| 0 | 492 | | { |
| 0 | 493 | | // Acquisition failed. Make sure we don't leave the lease as owned. |
| 0 | 494 | | allLeases[subjectLease.PartitionId].Owner = null; |
| 0 | 495 | |
|
| 0 | 496 | | ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, |
| 0 | 497 | | "Failed to acquire lease for partition " + downloadedLease.PartitionId, null); |
| 0 | 498 | | } |
| 0 | 499 | | } |
| 0 | 500 | | } |
| 0 | 501 | | catch (Exception e) |
| 0 | 502 | | { |
| 0 | 503 | | ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, subjectLease.PartitionId, "F |
| 0 | 504 | | this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectLease.Partition |
| 0 | 505 | |
|
| 0 | 506 | | // Acquisition failed. Make sure we don't leave the lease as owned. |
| 0 | 507 | | allLeases[subjectLease.PartitionId].Owner = null; |
| 0 | 508 | | } |
| 0 | 509 | | }, cancellationToken)); |
| | 510 | | } |
| 0 | 511 | | } |
| | 512 | |
|
| 0 | 513 | | await Task.WhenAll(tasks).ConfigureAwait(false); |
| | 514 | |
|
| 0 | 515 | | return totalAcquired; |
| 0 | 516 | | } |
| | 517 | |
|
| | 518 | | async Task CheckAndAddPumpAsync(string partitionId, Lease lease) |
| | 519 | | { |
| | 520 | | PartitionPump capturedPump; |
| 0 | 521 | | if (this.partitionPumps.TryGetValue(partitionId, out capturedPump)) |
| | 522 | | { |
| | 523 | | // There already is a pump. Make sure the pump is working and replace the lease. |
| 0 | 524 | | if (capturedPump.PumpStatus != PartitionPumpStatus.Running && capturedPump.PumpStatus != PartitionPumpSt |
| | 525 | | { |
| | 526 | | // The existing pump is bad. Remove it. |
| 0 | 527 | | await TryRemovePumpAsync(partitionId, CloseReason.Shutdown).ConfigureAwait(false); |
| | 528 | | } |
| | 529 | | else |
| | 530 | | { |
| | 531 | | // Lease token can show up empty here if lease content download has failed or not recently acquired. |
| | 532 | | // Don't update the token if so. |
| 0 | 533 | | if (!string.IsNullOrWhiteSpace(lease.Token)) |
| | 534 | | { |
| | 535 | | // Pump is working, just replace the lease token. |
| 0 | 536 | | ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Updating lease toke |
| 0 | 537 | | capturedPump.SetLeaseToken(lease.Token); |
| | 538 | | } |
| | 539 | | else |
| | 540 | | { |
| 0 | 541 | | ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Skipping to update |
| | 542 | | } |
| | 543 | | } |
| | 544 | | } |
| | 545 | | else |
| | 546 | | { |
| | 547 | | // No existing pump, create a new one. |
| 0 | 548 | | await CreateNewPumpAsync(partitionId).ConfigureAwait(false); |
| | 549 | | } |
| 0 | 550 | | } |
| | 551 | |
|
| | 552 | | async Task CreateNewPumpAsync(string partitionId) |
| | 553 | | { |
| | 554 | | // Refresh lease content and do last minute check to reduce partition moves. |
| 0 | 555 | | var refreshedLease = await this.host.LeaseManager.GetLeaseAsync(partitionId).ConfigureAwait(false); |
| 0 | 556 | | if (refreshedLease.Owner != this.host.HostName || await refreshedLease.IsExpired().ConfigureAwait(false)) |
| | 557 | | { |
| | 558 | | // Partition moved to some other node after lease acquisition. |
| | 559 | | // Return w/o creating the pump. |
| 0 | 560 | | ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, $"Partition moved to another |
| 0 | 561 | | return; |
| | 562 | | } |
| | 563 | |
|
| 0 | 564 | | PartitionPump newPartitionPump = new EventHubPartitionPump(this.host, refreshedLease); |
| 0 | 565 | | await newPartitionPump.OpenAsync().ConfigureAwait(false); |
| 0 | 566 | | this.partitionPumps.TryAdd(partitionId, newPartitionPump); // do the put after start, if the start fails the |
| 0 | 567 | | ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Created new PartitionPump"); |
| 0 | 568 | | } |
| | 569 | |
|
| | 570 | | async Task TryRemovePumpAsync(string partitionId, CloseReason reason) |
| | 571 | | { |
| | 572 | | PartitionPump capturedPump; |
| 0 | 573 | | if (this.partitionPumps.TryRemove(partitionId, out capturedPump)) |
| | 574 | | { |
| 0 | 575 | | if (!capturedPump.IsClosing) |
| | 576 | | { |
| | 577 | | // Don't block on close call more than renew interval if close reason is lease-lost. |
| | 578 | | // Otherwise we can block indefinetely. |
| 0 | 579 | | var closeTask = capturedPump.CloseAsync(reason); |
| 0 | 580 | | if (reason == CloseReason.LeaseLost) |
| | 581 | | { |
| 0 | 582 | | await this.WaitTaskTimeoutAsync(closeTask, this.host.LeaseManager.LeaseRenewInterval).ConfigureA |
| | 583 | | } |
| | 584 | | else |
| | 585 | | { |
| 0 | 586 | | await closeTask.ConfigureAwait(false); |
| | 587 | | } |
| | 588 | | } |
| | 589 | | // else, pump is already closing/closed, don't need to try to shut it down again |
| | 590 | | } |
| 0 | 591 | | } |
| | 592 | |
|
| | 593 | | /// <summary> |
| | 594 | | /// Awaits given task up to provided wait time. |
| | 595 | | /// Throws OperationCanceledException when wait time is exhausted. |
| | 596 | | /// </summary> |
| | 597 | | async Task WaitTaskTimeoutAsync(Task task, TimeSpan waitTime) |
| | 598 | | { |
| 0 | 599 | | using (var cts = new CancellationTokenSource()) |
| | 600 | | { |
| 0 | 601 | | var timeoutTask = Task.Delay(waitTime, cts.Token); |
| 0 | 602 | | var completedTask = await Task.WhenAny(task, timeoutTask).ConfigureAwait(false); |
| | 603 | |
|
| 0 | 604 | | if (completedTask == task) |
| | 605 | | { |
| 0 | 606 | | cts.Cancel(); |
| | 607 | | } |
| | 608 | | else |
| | 609 | | { |
| | 610 | | // Throw OperationCanceledException, caller will log the failures appropriately. |
| 0 | 611 | | throw new OperationCanceledException(); |
| | 612 | | } |
| 0 | 613 | | } |
| 0 | 614 | | } |
| | 615 | |
|
| | 616 | | Task RemoveAllPumpsAsync(CloseReason reason) |
| | 617 | | { |
| 0 | 618 | | List<Task> tasks = new List<Task>(); |
| 0 | 619 | | var keys = new List<string>(this.partitionPumps.Keys); |
| 0 | 620 | | foreach (string partitionId in keys) |
| | 621 | | { |
| 0 | 622 | | tasks.Add(this.TryRemovePumpAsync(partitionId, reason)); |
| | 623 | | } |
| | 624 | |
|
| 0 | 625 | | return Task.WhenAll(tasks); |
| | 626 | | } |
| | 627 | |
|
| | 628 | | Lease WhichLeaseToSteal(IEnumerable<Lease> stealableLeases, int haveLeaseCount) |
| | 629 | | { |
| 0 | 630 | | IDictionary<string, int> countsByOwner = CountLeasesByOwner(stealableLeases); |
| | 631 | |
|
| | 632 | | // Consider all leases might be already released where we won't have any entry in the return counts map. |
| 0 | 633 | | if (countsByOwner.Count == 0) |
| | 634 | | { |
| 0 | 635 | | return null; |
| | 636 | | } |
| | 637 | |
|
| 0 | 638 | | var biggestOwner = countsByOwner.OrderByDescending(o => o.Value).First(); |
| 0 | 639 | | Lease stealThisLease = null; |
| | 640 | |
|
| | 641 | | // If the number of leases is a multiple of the number of hosts, then the desired configuration is |
| | 642 | | // that all hosts own the name number of leases, and the difference between the "biggest" owner and |
| | 643 | | // any other is 0. |
| | 644 | | // |
| | 645 | | // If the number of leases is not a multiple of the number of hosts, then the most even configuration |
| | 646 | | // possible is for some hosts to have (leases/hosts) leases and others to have ((leases/hosts) + 1). |
| | 647 | | // For example, for 16 partitions distributed over five hosts, the distribution would be 4, 3, 3, 3, 3, |
| | 648 | | // or any of the possible reorderings. |
| | 649 | | // |
| | 650 | | // In either case, if the difference between this host and the biggest owner is 2 or more, then the |
| | 651 | | // system is not in the most evenly-distributed configuration, so steal one lease from the biggest. |
| | 652 | | // If there is a tie for biggest, we pick whichever appears first in the list because |
| | 653 | | // it doesn't really matter which "biggest" is trimmed down. |
| | 654 | | // |
| | 655 | | // Stealing one at a time prevents flapping because it reduces the difference between the biggest and |
| | 656 | | // this host by two at a time. If the starting difference is two or greater, then the difference cannot |
| | 657 | | // end up below 0. This host may become tied for biggest, but it cannot become larger than the host that |
| | 658 | | // it is stealing from. |
| | 659 | |
|
| 0 | 660 | | if ((biggestOwner.Value - haveLeaseCount) >= 2) |
| | 661 | | { |
| 0 | 662 | | stealThisLease = stealableLeases.First(l => l.Owner == biggestOwner.Key); |
| 0 | 663 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Proposed to steal lease for partit |
| | 664 | | } |
| | 665 | |
|
| 0 | 666 | | return stealThisLease; |
| | 667 | | } |
| | 668 | |
|
| | 669 | | Dictionary<string, int> CountLeasesByOwner(IEnumerable<Lease> leases) |
| | 670 | | { |
| 0 | 671 | | var counts = leases.Where(lease => lease.Owner != null).GroupBy(lease => lease.Owner).Select(group => new |
| 0 | 672 | | { |
| 0 | 673 | | Owner = group.Key, |
| 0 | 674 | | Count = group.Count() |
| 0 | 675 | | }); |
| | 676 | |
|
| | 677 | | // Log ownership mapping. |
| 0 | 678 | | foreach (var owner in counts) |
| | 679 | | { |
| 0 | 680 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Host {owner.Owner} owns {owner.Cou |
| | 681 | | } |
| | 682 | |
|
| 0 | 683 | | ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Total hosts in list: {counts.Count()}" |
| | 684 | |
|
| 0 | 685 | | return counts.ToDictionary(e => e.Owner, e => e.Count); |
| | 686 | | } |
| | 687 | | } |
| | 688 | | } |