< Summary

Class:Microsoft.Azure.EventHubs.Processor.PartitionManager
Assembly:Microsoft.Azure.EventHubs.Processor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\PartitionManager.cs
Covered lines:0
Uncovered lines:332
Coverable lines:332
Total lines:688
Line coverage:0% (0 of 332)
Covered branches:0
Total branches:134
Branch coverage:0% (0 of 134)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%100%
GetPartitionIdsAsync()-0%0%
StartAsync()-0%0%
StopAsync()-0%0%
RunAsync()-0%100%
InitializeStoresAsync()-0%0%
RetryAsync()-0%0%
RunLoopAsync()-0%0%
<RunLoopAsync()-0%100%
<RunLoopAsync()-0%100%
AcquireExpiredLeasesAsync()-0%0%
<AcquireExpiredLeasesAsync()-0%0%
CheckAndAddPumpAsync()-0%0%
CreateNewPumpAsync()-0%0%
TryRemovePumpAsync()-0%0%
WaitTaskTimeoutAsync()-0%0%
RemoveAllPumpsAsync(...)-0%0%
WhichLeaseToSteal(...)-0%0%
CountLeasesByOwner(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\PartitionManager.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.EventHubs.Processor
 5{
 6    using System;
 7    using System.Collections.Concurrent;
 8    using System.Collections.Generic;
 9    using System.Diagnostics;
 10    using System.Linq;
 11    using System.Threading;
 12    using System.Threading.Tasks;
 13    using Microsoft.Azure.EventHubs.Primitives;
 14
 15    class PartitionManager
 16    {
 17        readonly EventProcessorHost host;
 18        readonly ConcurrentDictionary<string, PartitionPump> partitionPumps;
 19
 20        IList<string> partitionIds;
 21        CancellationTokenSource cancellationTokenSource;
 22        Task runTask;
 23
 024        internal PartitionManager(EventProcessorHost host)
 25        {
 026            this.host = host;
 027            this.cancellationTokenSource = new CancellationTokenSource();
 028            this.partitionPumps = new ConcurrentDictionary<string, PartitionPump>();
 029        }
 30
 31        public async Task<IEnumerable<string>> GetPartitionIdsAsync()
 32        {
 033            if (this.partitionIds == null)
 34            {
 035                EventHubClient eventHubClient = null;
 36                try
 37                {
 038                    eventHubClient = this.host.CreateEventHubClient();
 039                    eventHubClient.WebProxy = this.host.EventProcessorOptions.WebProxy;
 040                    var runtimeInfo = await eventHubClient.GetRuntimeInformationAsync().ConfigureAwait(false);
 041                    this.partitionIds = runtimeInfo.PartitionIds.ToList();
 042                }
 043                catch (Exception e)
 44                {
 045                    throw new EventProcessorConfigurationException("Encountered error while fetching the list of EventHu
 46                }
 47                finally
 48                {
 049                    if (eventHubClient != null)
 50                    {
 051                        await eventHubClient.CloseAsync().ConfigureAwait(false);
 52                    }
 53                }
 54
 055                ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"PartitionCount: {this.partitionIds
 056            }
 57
 058            return this.partitionIds;
 059        }
 60
 61        public async Task StartAsync()
 62        {
 063            if (this.runTask != null)
 64            {
 065                throw new InvalidOperationException("A PartitionManager cannot be started multiple times.");
 66            }
 67
 068            await this.InitializeStoresAsync().ConfigureAwait(false);
 69
 070            this.runTask = this.RunAsync();
 071        }
 72
 73        public async Task StopAsync()
 74        {
 075            this.cancellationTokenSource.Cancel();
 076            var localRunTask = this.runTask;
 077            if (localRunTask != null)
 78            {
 079                await localRunTask.ConfigureAwait(false);
 80            }
 81
 82            // once it is closed let's reset the task
 083            this.runTask = null;
 084            this.cancellationTokenSource = new CancellationTokenSource();
 085        }
 86
 87        async Task RunAsync()
 88        {
 89            try
 90            {
 091                await this.RunLoopAsync(this.cancellationTokenSource.Token).ConfigureAwait(false);
 092            }
 093            catch (TaskCanceledException) when (this.cancellationTokenSource.IsCancellationRequested)
 94            {
 95                // Expected during host shutdown.
 096            }
 097            catch (Exception e)
 98            {
 99                // Ideally RunLoop should never throw.
 0100                ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manager m
 0101                this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostAction
 0102            }
 103
 104            try
 105            {
 106                // Cleanup
 0107                ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Shutting down all pumps");
 0108                await this.RemoveAllPumpsAsync(CloseReason.Shutdown).ConfigureAwait(false);
 0109            }
 0110            catch (Exception e)
 111            {
 0112                ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during shutdown", e.ToStri
 0113                this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostAction
 0114            }
 0115        }
 116
 117        async Task InitializeStoresAsync() //throws InterruptedException, ExecutionException, ExceptionWithAction
 118        {
 119            // Make sure the lease store exists
 0120            ILeaseManager leaseManager = this.host.LeaseManager;
 0121            if (!await leaseManager.LeaseStoreExistsAsync().ConfigureAwait(false))
 122            {
 0123                await RetryAsync(() => leaseManager.CreateLeaseStoreIfNotExistsAsync(), null, "Failure creating lease st
 0124                        "Out of retries creating lease store for this Event Hub", EventProcessorHostActionStrings.Creati
 125            }
 126            // else
 127            //  lease store already exists, no work needed
 128
 0129            var partitionIds = await this.GetPartitionIdsAsync().ConfigureAwait(false);
 130
 131            // Now make sure the leases exist
 0132            var createLeaseTasks = new List<Task>();
 0133            foreach (string id in partitionIds)
 134            {
 0135                var subjectId = id;
 0136                createLeaseTasks.Add(RetryAsync(() => leaseManager.CreateLeaseIfNotExistsAsync(subjectId), subjectId, $"
 0137                        $"Out of retries creating lease for partition {subjectId}", EventProcessorHostActionStrings.Crea
 138            }
 139
 0140            await Task.WhenAll(createLeaseTasks).ConfigureAwait(false);
 141
 142            // Make sure the checkpoint store exists
 0143            ICheckpointManager checkpointManager = this.host.CheckpointManager;
 0144            if (!await checkpointManager.CheckpointStoreExistsAsync().ConfigureAwait(false))
 145            {
 0146                await RetryAsync(() => checkpointManager.CreateCheckpointStoreIfNotExistsAsync(), null, "Failure creatin
 0147                        "Out of retries creating checkpoint store for this Event Hub", EventProcessorHostActionStrings.C
 148            }
 149            // else
 150            //  checkpoint store already exists, no work needed
 151
 152            // Now make sure the checkpoints exist
 0153            var createCheckpointTasks = new List<Task>();
 0154            foreach (string id in partitionIds)
 155            {
 0156                var subjectId = id;
 0157                createCheckpointTasks.Add(RetryAsync(() => checkpointManager.CreateCheckpointIfNotExistsAsync(subjectId)
 0158                        $"Out of retries creating checkpoint for partition {subjectId}", EventProcessorHostActionStrings
 159            }
 160
 0161            await Task.WhenAll(createCheckpointTasks).ConfigureAwait(false);
 0162        }
 163
 164        // Throws if it runs out of retries. If it returns, action succeeded.
 165        async Task RetryAsync(Func<Task> lambda, string partitionId, string retryMessage, string finalFailureMessage, st
 166        {
 0167            Exception finalException = null;
 0168            bool createdOK = false;
 0169            int retryCount = 0;
 170            do
 171            {
 172                try
 173                {
 0174                    await lambda().ConfigureAwait(false);
 0175                    createdOK = true;
 0176                }
 0177                catch (Exception ex)
 178                {
 0179                    if (partitionId != null)
 180                    {
 0181                        ProcessorEventSource.Log.PartitionPumpWarning(this.host.HostName, partitionId, retryMessage, ex.
 182                    }
 183                    else
 184                    {
 0185                        ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, retryMessage, ex.ToString
 186                    }
 187
 0188                    this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, partitionId, ex, action);
 189
 0190                    finalException = ex;
 0191                    retryCount++;
 0192                }
 193            }
 0194            while (!createdOK && (retryCount < maxRetries));
 195
 0196            if (!createdOK)
 197            {
 0198                if (partitionId != null)
 199                {
 0200                    ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, partitionId, finalFailureMessage);
 201                }
 202                else
 203                {
 0204                    ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, finalFailureMessage, null);
 205                }
 206
 0207                throw new EventProcessorRuntimeException(finalFailureMessage, action, finalException);
 208            }
 0209        }
 210
 211        async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception, ExceptionWithAction
 212        {
 0213            var loopStopwatch = new Stopwatch();
 214
 0215            while (!cancellationToken.IsCancellationRequested)
 216            {
 217                // Mark start time so we can use the duration taken to calculate renew interval.
 0218                loopStopwatch.Restart();
 219
 0220                ILeaseManager leaseManager = this.host.LeaseManager;
 0221                var allLeases = new ConcurrentDictionary<string, Lease>();
 0222                var leasesOwnedByOthers = new ConcurrentDictionary<string, Lease>();
 223
 224                // Inspect all leases.
 225                // Acquire any expired leases.
 226                // Renew any leases that currently belong to us.
 227                IEnumerable<Lease> downloadedLeases;
 0228                var renewLeaseTasks = new List<Task>();
 0229                int ourLeaseCount = 0;
 230
 231                try
 232                {
 233                    try
 234                    {
 0235                        downloadedLeases = await leaseManager.GetAllLeasesAsync().ConfigureAwait(false);
 0236                    }
 0237                    catch (Exception e)
 238                    {
 0239                        ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception during downloadi
 0240                        this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHo
 241
 242                        // Avoid tight spin if getallleases call keeps failing.
 0243                        await Task.Delay(1000).ConfigureAwait(false);
 244
 0245                        continue;
 246                    }
 247
 248                    // First things first, renew owned leases.
 0249                    foreach (var lease in downloadedLeases)
 250                    {
 0251                        var subjectLease = lease;
 252
 253                        try
 254                        {
 0255                            allLeases[subjectLease.PartitionId] = subjectLease;
 0256                            if (subjectLease.Owner == this.host.HostName && !(await subjectLease.IsExpired().ConfigureAw
 257                            {
 0258                                ourLeaseCount++;
 259
 260                                // Get lease from partition since we need the token at this point.
 0261                                if (!this.partitionPumps.TryGetValue(subjectLease.PartitionId, out var capturedPump))
 262                                {
 0263                                    continue;
 264                                }
 265
 0266                                var capturedLease = capturedPump.Lease;
 267
 0268                                ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, capturedLease.PartitionId
 0269                                renewLeaseTasks.Add(leaseManager.RenewLeaseAsync(capturedLease).ContinueWith(renewResult
 0270                                {
 0271                                    if (renewResult.IsFaulted)
 0272                                    {
 0273                                        // Might have failed due to intermittent error or lease-lost.
 0274                                        // Just log here, expired leases will be picked by same or another host anyway.
 0275                                        ProcessorEventSource.Log.PartitionPumpError(
 0276                                            this.host.HostName,
 0277                                            capturedLease.PartitionId,
 0278                                            "Failed to renew lease.",
 0279                                            renewResult.Exception?.Message);
 0280
 0281                                        this.host.EventProcessorOptions.NotifyOfException(
 0282                                            this.host.HostName,
 0283                                            capturedLease.PartitionId,
 0284                                            renewResult.Exception,
 0285                                            EventProcessorHostActionStrings.RenewingLease);
 0286
 0287                                        // Nullify the owner on the lease in case this host lost it.
 0288                                        // This helps to remove pump earlier reducing duplicate receives.
 0289                                        if (renewResult.Exception?.GetBaseException() is LeaseLostException)
 0290                                        {
 0291                                            allLeases[capturedLease.PartitionId].Owner = null;
 0292                                        }
 0293                                    }
 0294                                }, cancellationToken));
 295                            }
 0296                            else if (!await subjectLease.IsExpired().ConfigureAwait(false))
 297                            {
 0298                                leasesOwnedByOthers[subjectLease.PartitionId] = subjectLease;
 299                            }
 0300                        }
 0301                        catch (Exception e)
 302                        {
 0303                            ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during checkin
 0304                            this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcess
 0305                        }
 0306                    }
 307
 308                    // Wait until we are done with renewing our own leases here.
 309                    // In theory, this should never throw, error are logged and notified in the renew tasks.
 0310                    await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false);
 0311                    ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished.");
 312
 313                    // Check any expired leases that we can grab here.
 0314                    ourLeaseCount += await this.AcquireExpiredLeasesAsync(allLeases, leasesOwnedByOthers, ourLeaseCount,
 0315                    ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Expired lease check is finished
 316
 317                    // Grab more leases if available and needed for load balancing
 0318                    if (leasesOwnedByOthers.Count > 0)
 319                    {
 0320                        Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers.Values, ourLeaseCount);
 321
 322                        // Don't attempt to steal the lease if current host has a pump for this partition id
 323                        // This is possible when current pump is in failed state due to lease moved to some other host.
 0324                        if (stealThisLease != null && !this.partitionPumps.ContainsKey(stealThisLease.PartitionId))
 325                        {
 326                            try
 327                            {
 328                                // Get fresh content of lease subject to acquire.
 0329                                var downloadedLease = await leaseManager.GetLeaseAsync(stealThisLease.PartitionId).Confi
 0330                                allLeases[stealThisLease.PartitionId] = downloadedLease;
 331
 332                                // Don't attempt to steal if lease is already expired.
 333                                // Expired leases are picked up by other hosts quickly.
 334                                // Don't attempt to steal if owner has changed from the calculation time to refresh time
 0335                                if (!await downloadedLease.IsExpired().ConfigureAwait(false)
 0336                                    && downloadedLease.Owner == stealThisLease.Owner)
 337                                {
 0338                                    ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.HostName, downloaded
 0339                                    if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false))
 340                                    {
 341                                        // Succeeded in stealing lease
 0342                                        ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.HostName, downloa
 0343                                        ourLeaseCount++;
 344                                    }
 345                                    else
 346                                    {
 347                                        // Acquisition failed. Make sure we don't leave the lease as owned.
 0348                                        allLeases[stealThisLease.PartitionId].Owner = null;
 349
 0350                                        ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName,
 0351                                            "Failed to steal lease for partition " + downloadedLease.PartitionId, null);
 352                                    }
 353                                }
 0354                            }
 0355                            catch (Exception e)
 356                            {
 0357                                ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName,
 0358                                    "Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToS
 0359                                this.host.EventProcessorOptions.NotifyOfException(this.host.HostName,
 0360                                    stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease);
 361
 362                                // Acquisition failed. Make sure we don't leave the lease as owned.
 0363                                allLeases[stealThisLease.PartitionId].Owner = null;
 0364                            }
 365                        }
 0366                    }
 367
 368                    // Update pump with new state of leases on owned partitions in parallel.
 0369                    var createRemovePumpTasks = new List<Task>();
 0370                    foreach (string partitionId in allLeases.Keys)
 371                    {
 0372                        var subjectPartitionId = partitionId;
 373
 0374                        Lease updatedLease = allLeases[subjectPartitionId];
 0375                        ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Lease on partition {update
 376
 0377                        if (updatedLease.Owner == this.host.HostName)
 378                        {
 0379                            createRemovePumpTasks.Add(Task.Run(async () =>
 0380                            {
 0381                                try
 0382                                {
 0383                                    await this.CheckAndAddPumpAsync(subjectPartitionId, updatedLease).ConfigureAwait(fal
 0384                                }
 0385                                catch (Exception e)
 0386                                {
 0387                                    ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception dur
 0388                                    this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectPartiti
 0389                                }
 0390                            }, cancellationToken));
 391                        }
 0392                        else if (this.partitionPumps.ContainsKey(partitionId))
 393                        {
 0394                            createRemovePumpTasks.Add(Task.Run(async () =>
 0395                            {
 0396                                try
 0397                                {
 0398                                    await this.TryRemovePumpAsync(subjectPartitionId, CloseReason.LeaseLost).ConfigureAw
 0399                                }
 0400                                catch (Exception e)
 0401                                {
 0402                                    ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception dur
 0403                                    this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectPartiti
 0404                                }
 0405                            }, cancellationToken));
 406                        }
 407                    }
 408
 0409                    await Task.WhenAll(createRemovePumpTasks).ConfigureAwait(false);
 0410                    ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Pump update is finished.");
 0411                }
 0412                catch (Exception e)
 413                {
 414                    // TaskCancelledException is expected furing host unregister.
 0415                    if (e is TaskCanceledException)
 416                    {
 0417                        continue;
 418                    }
 419
 420                    // Loop should not exit unless signalled via cancellation token. Log any failures and continue.
 0421                    ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manag
 0422                    this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostAc
 0423                }
 424                finally
 425                {
 426                    // Consider reducing the wait time with last lease-walkthrough's time taken.
 0427                    var elapsedTime = loopStopwatch.Elapsed;
 0428                    if (leaseManager.LeaseRenewInterval > elapsedTime)
 429                    {
 0430                        await Task.Delay(leaseManager.LeaseRenewInterval.Subtract(elapsedTime), cancellationToken).Confi
 431                    }
 432                }
 0433            }
 0434        }
 435
 436        async Task<int> AcquireExpiredLeasesAsync(
 437            ConcurrentDictionary<string, Lease> allLeases,
 438            ConcurrentDictionary<string, Lease> leasesOwnedByOthers,
 439            int ownedCount,
 440            CancellationToken cancellationToken)
 441        {
 0442            var totalAcquired = 0;
 0443            var hosts = new List<string>();
 444
 445            // Find distinct hosts actively owning leases.
 0446            foreach (var lease in allLeases.Values)
 447            {
 0448                if (lease.Owner != null && !hosts.Contains(lease.Owner) && !(await lease.IsExpired().ConfigureAwait(fals
 449                {
 0450                    hosts.Add(lease.Owner);
 451                }
 0452            }
 453
 454            // Calculate how many more leases we can own.
 0455            var hostCount = hosts.Count() == 0 ? 1 : hosts.Count();
 0456            var targetLeaseCount = (int)Math.Ceiling((double)allLeases.Count / (double)hostCount) - ownedCount;
 457
 458            // Attempt to acquire expired leases now up to allowed target lease count.
 0459            var tasks = new List<Task>();
 0460            foreach (var possibleLease in allLeases.Values)
 461            {
 462                // Break if we already acquired enough number of leases.
 0463                if (targetLeaseCount <= 0)
 464                {
 465                    break;
 466                }
 467
 0468                var subjectLease = possibleLease;
 469
 0470                if (await subjectLease.IsExpired().ConfigureAwait(false))
 471                {
 0472                    targetLeaseCount--;
 0473                    tasks.Add(Task.Run(async () =>
 0474                    {
 0475                        try
 0476                        {
 0477                            // Get fresh content of lease subject to acquire.
 0478                            var downloadedLease = await this.host.LeaseManager.GetLeaseAsync(subjectLease.PartitionId).C
 0479                            allLeases[subjectLease.PartitionId] = downloadedLease;
 0480
 0481                            // Check expired once more here incase another host have already leased this since we popula
 0482                            if (await downloadedLease.IsExpired().ConfigureAwait(false))
 0483                            {
 0484                                ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.Partition
 0485                                if (await this.host.LeaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false
 0486                                {
 0487                                    ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.Parti
 0488                                    leasesOwnedByOthers.TryRemove(downloadedLease.PartitionId, out var removedLease);
 0489                                    Interlocked.Increment(ref totalAcquired);
 0490                                }
 0491                                else
 0492                                {
 0493                                    // Acquisition failed. Make sure we don't leave the lease as owned.
 0494                                    allLeases[subjectLease.PartitionId].Owner = null;
 0495
 0496                                    ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName,
 0497                                        "Failed to acquire lease for partition " + downloadedLease.PartitionId, null);
 0498                                }
 0499                            }
 0500                        }
 0501                        catch (Exception e)
 0502                        {
 0503                            ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, subjectLease.PartitionId, "F
 0504                            this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectLease.Partition
 0505
 0506                            // Acquisition failed. Make sure we don't leave the lease as owned.
 0507                            allLeases[subjectLease.PartitionId].Owner = null;
 0508                        }
 0509                    }, cancellationToken));
 510                }
 0511            }
 512
 0513            await Task.WhenAll(tasks).ConfigureAwait(false);
 514
 0515            return totalAcquired;
 0516        }
 517
 518        async Task CheckAndAddPumpAsync(string partitionId, Lease lease)
 519        {
 520            PartitionPump capturedPump;
 0521            if (this.partitionPumps.TryGetValue(partitionId, out capturedPump))
 522            {
 523                // There already is a pump. Make sure the pump is working and replace the lease.
 0524                if (capturedPump.PumpStatus != PartitionPumpStatus.Running && capturedPump.PumpStatus != PartitionPumpSt
 525                {
 526                    // The existing pump is bad. Remove it.
 0527                    await TryRemovePumpAsync(partitionId, CloseReason.Shutdown).ConfigureAwait(false);
 528                }
 529                else
 530                {
 531                    // Lease token can show up empty here if lease content download has failed or not recently acquired.
 532                    // Don't update the token if so.
 0533                    if (!string.IsNullOrWhiteSpace(lease.Token))
 534                    {
 535                        // Pump is working, just replace the lease token.
 0536                        ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Updating lease toke
 0537                        capturedPump.SetLeaseToken(lease.Token);
 538                    }
 539                    else
 540                    {
 0541                        ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Skipping to update 
 542                    }
 543                }
 544            }
 545            else
 546            {
 547                // No existing pump, create a new one.
 0548                await CreateNewPumpAsync(partitionId).ConfigureAwait(false);
 549            }
 0550        }
 551
 552        async Task CreateNewPumpAsync(string partitionId)
 553        {
 554            // Refresh lease content and do last minute check to reduce partition moves.
 0555            var refreshedLease = await this.host.LeaseManager.GetLeaseAsync(partitionId).ConfigureAwait(false);
 0556            if (refreshedLease.Owner != this.host.HostName || await refreshedLease.IsExpired().ConfigureAwait(false))
 557            {
 558                // Partition moved to some other node after lease acquisition.
 559                // Return w/o creating the pump.
 0560                ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, $"Partition moved to another
 0561                return;
 562            }
 563
 0564            PartitionPump newPartitionPump = new EventHubPartitionPump(this.host, refreshedLease);
 0565            await newPartitionPump.OpenAsync().ConfigureAwait(false);
 0566            this.partitionPumps.TryAdd(partitionId, newPartitionPump); // do the put after start, if the start fails the
 0567            ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Created new PartitionPump");
 0568        }
 569
 570        async Task TryRemovePumpAsync(string partitionId, CloseReason reason)
 571        {
 572            PartitionPump capturedPump;
 0573            if (this.partitionPumps.TryRemove(partitionId, out capturedPump))
 574            {
 0575                if (!capturedPump.IsClosing)
 576                {
 577                    // Don't block on close call more than renew interval if close reason is lease-lost.
 578                    // Otherwise we can block indefinetely.
 0579                    var closeTask = capturedPump.CloseAsync(reason);
 0580                    if (reason == CloseReason.LeaseLost)
 581                    {
 0582                        await this.WaitTaskTimeoutAsync(closeTask, this.host.LeaseManager.LeaseRenewInterval).ConfigureA
 583                    }
 584                    else
 585                    {
 0586                        await closeTask.ConfigureAwait(false);
 587                    }
 588                }
 589                // else, pump is already closing/closed, don't need to try to shut it down again
 590            }
 0591        }
 592
 593        /// <summary>
 594        /// Awaits given task up to provided wait time.
 595        /// Throws OperationCanceledException when wait time is exhausted.
 596        /// </summary>
 597        async Task WaitTaskTimeoutAsync(Task task, TimeSpan waitTime)
 598        {
 0599            using (var cts = new CancellationTokenSource())
 600            {
 0601                var timeoutTask = Task.Delay(waitTime, cts.Token);
 0602                var completedTask = await Task.WhenAny(task, timeoutTask).ConfigureAwait(false);
 603
 0604                if (completedTask == task)
 605                {
 0606                    cts.Cancel();
 607                }
 608                else
 609                {
 610                    // Throw OperationCanceledException, caller will log the failures appropriately.
 0611                    throw new OperationCanceledException();
 612                }
 0613            }
 0614        }
 615
 616        Task RemoveAllPumpsAsync(CloseReason reason)
 617        {
 0618            List<Task> tasks = new List<Task>();
 0619            var keys = new List<string>(this.partitionPumps.Keys);
 0620            foreach (string partitionId in keys)
 621            {
 0622                tasks.Add(this.TryRemovePumpAsync(partitionId, reason));
 623            }
 624
 0625            return Task.WhenAll(tasks);
 626        }
 627
 628        Lease WhichLeaseToSteal(IEnumerable<Lease> stealableLeases, int haveLeaseCount)
 629        {
 0630            IDictionary<string, int> countsByOwner = CountLeasesByOwner(stealableLeases);
 631
 632            // Consider all leases might be already released where we won't have any entry in the return counts map.
 0633            if (countsByOwner.Count == 0)
 634            {
 0635                return null;
 636            }
 637
 0638            var biggestOwner = countsByOwner.OrderByDescending(o => o.Value).First();
 0639            Lease stealThisLease = null;
 640
 641            // If the number of leases is a multiple of the number of hosts, then the desired configuration is
 642            // that all hosts own the name number of leases, and the difference between the "biggest" owner and
 643            // any other is 0.
 644            //
 645            // If the number of leases is not a multiple of the number of hosts, then the most even configuration
 646            // possible is for some hosts to have (leases/hosts) leases and others to have ((leases/hosts) + 1).
 647            // For example, for 16 partitions distributed over five hosts, the distribution would be 4, 3, 3, 3, 3,
 648            // or any of the possible reorderings.
 649            //
 650            // In either case, if the difference between this host and the biggest owner is 2 or more, then the
 651            // system is not in the most evenly-distributed configuration, so steal one lease from the biggest.
 652            // If there is a tie for biggest, we pick whichever appears first in the list because
 653            // it doesn't really matter which "biggest" is trimmed down.
 654            //
 655            // Stealing one at a time prevents flapping because it reduces the difference between the biggest and
 656            // this host by two at a time. If the starting difference is two or greater, then the difference cannot
 657            // end up below 0. This host may become tied for biggest, but it cannot become larger than the host that
 658            // it is stealing from.
 659
 0660            if ((biggestOwner.Value - haveLeaseCount) >= 2)
 661            {
 0662                stealThisLease = stealableLeases.First(l => l.Owner == biggestOwner.Key);
 0663                ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Proposed to steal lease for partit
 664            }
 665
 0666            return stealThisLease;
 667        }
 668
 669        Dictionary<string, int> CountLeasesByOwner(IEnumerable<Lease> leases)
 670        {
 0671            var counts = leases.Where(lease => lease.Owner != null).GroupBy(lease => lease.Owner).Select(group => new
 0672            {
 0673                Owner = group.Key,
 0674                Count = group.Count()
 0675            });
 676
 677            // Log ownership mapping.
 0678            foreach (var owner in counts)
 679            {
 0680                ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Host {owner.Owner} owns {owner.Cou
 681            }
 682
 0683            ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Total hosts in list: {counts.Count()}"
 684
 0685            return counts.ToDictionary(e => e.Owner, e => e.Count);
 686        }
 687    }
 688}