< Summary

Class:Azure.Messaging.EventHubs.Primitives.PartitionLoadBalancer
Assembly:Azure.Messaging.EventHubs.Processor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Shared\src\Processor\PartitionLoadBalancer.cs
Covered lines:0
Uncovered lines:170
Coverable lines:170
Total lines:533
Line coverage:0% (0 of 170)
Covered branches:0
Total branches:46
Branch coverage:0% (0 of 46)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-0%100%
.ctor(...)-0%100%
get_FullyQualifiedNamespace()-0%100%
get_EventHubName()-0%100%
get_ConsumerGroup()-0%100%
get_OwnerIdentifier()-0%100%
get_LoadBalanceInterval()-0%100%
get_IsBalanced()-0%100%
get_OwnedPartitionIds()-0%100%
get_Logger()-0%100%
get_InstanceOwnership()-0%100%
.ctor()-0%100%
RunLoadBalancingAsync()-0%0%
RelinquishOwnershipAsync()-0%100%
FindAndClaimOwnershipAsync(...)-0%0%
RenewOwnershipAsync()-0%100%
ClaimOwnershipAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Shared\src\Processor\PartitionLoadBalancer.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Linq;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Core;
 10using Azure.Messaging.EventHubs.Core;
 11using Azure.Messaging.EventHubs.Diagnostics;
 12
 13namespace Azure.Messaging.EventHubs.Primitives
 14{
 15    /// <summary>
 16    ///   Handles all load balancing concerns for an event processor including claiming, stealing, and relinquishing own
 17    /// </summary>
 18    ///
 19    internal class PartitionLoadBalancer
 20    {
 21        /// <summary>The random number generator to use for a specific thread.</summary>
 022        private static readonly ThreadLocal<Random> RandomNumberGenerator = new ThreadLocal<Random>(() => new Random(Int
 23
 24        /// <summary>The seed to use for initializing random number generated for a given thread-specific instance.</sum
 025        private static int s_randomSeed = Environment.TickCount;
 26
 27        /// <summary>
 28        ///   Responsible for creation of checkpoints and for ownership claim.
 29        /// </summary>
 30        ///
 31        private readonly StorageManager StorageManager;
 32
 33        /// <summary>
 34        ///   A partition distribution dictionary, mapping an owner's identifier to the amount of partitions it owns and
 35        /// </summary>
 36        ///
 037        private readonly Dictionary<string, List<EventProcessorPartitionOwnership>> ActiveOwnershipWithDistribution = ne
 38
 39        /// <summary>
 40        ///   The minimum amount of time for an ownership to be considered expired without further updates.
 41        /// </summary>
 42        ///
 43        private TimeSpan OwnershipExpiration;
 44
 45        /// <summary>
 46        ///   The fully qualified Event Hubs namespace that the processor is associated with.  This is likely
 47        ///   to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
 48        /// </summary>
 49        ///
 050        public string FullyQualifiedNamespace { get; private set; }
 51
 52        /// <summary>
 53        ///   The name of the Event Hub that the processor is connected to, specific to the
 54        ///   Event Hubs namespace that contains it.
 55        /// </summary>
 56        ///
 057        public string EventHubName { get; private set; }
 58
 59        /// <summary>
 60        ///   The name of the consumer group this load balancer is associated with.  Events will be
 61        ///   read only in the context of this group.
 62        /// </summary>
 63        ///
 064        public string ConsumerGroup { get; private set; }
 65
 66        /// <summary>
 67        ///   The identifier of the EventProcessorClient that owns this load balancer.
 68        /// </summary>
 69        ///
 070        public string OwnerIdentifier { get; private set; }
 71
 72        /// <summary>
 73        ///   The minimum amount of time to be elapsed between two load balancing verifications.
 74        /// </summary>
 75        ///
 076        public TimeSpan LoadBalanceInterval { get; set; } = TimeSpan.FromSeconds(10);
 77
 78        /// <summary>
 79        ///   Indicates whether the load balancer believes itself to be in a balanced state
 80        ///   when considering its fair share of partitions and whether any partitions
 81        ///   remain unclaimed.
 82        /// </summary>
 83        ///
 084        public virtual bool IsBalanced { get; private set; }
 85
 86        /// <summary>
 87        ///   The partitionIds currently owned by the associated event processor.
 88        /// </summary>
 89        ///
 090        public virtual IEnumerable<string> OwnedPartitionIds => InstanceOwnership.Keys;
 91
 92        /// <summary>
 93        ///   The instance of <see cref="PartitionLoadBalancerEventSource" /> which can be mocked for testing.
 94        /// </summary>
 95        ///
 096        internal PartitionLoadBalancerEventSource Logger { get; set; } = PartitionLoadBalancerEventSource.Log;
 97
 98        /// <summary>
 99        ///   The set of partition ownership the associated event processor owns.  Partition ids are used as keys.
 100        /// </summary>
 101        ///
 0102        private Dictionary<string, EventProcessorPartitionOwnership> InstanceOwnership { get; set; } = new Dictionary<st
 103
 104        /// <summary>
 105        ///   Initializes a new instance of the <see cref="PartitionLoadBalancer"/> class.
 106        /// </summary>
 107        ///
 108        /// <param name="storageManager">Responsible for creation of checkpoints and for ownership claim.</param>
 109        /// <param name="identifier">The identifier of the EventProcessorClient that owns this load balancer.</param>
 110        /// <param name="consumerGroup">The name of the consumer group this load balancer is associated with.</param>
 111        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace that the processor is associa
 112        /// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
 113        /// <param name="ownershipExpiration">The minimum amount of time for an ownership to be considered expired witho
 114        ///
 0115        public PartitionLoadBalancer(StorageManager storageManager,
 0116                                     string identifier,
 0117                                     string consumerGroup,
 0118                                     string fullyQualifiedNamespace,
 0119                                     string eventHubName,
 0120                                     TimeSpan ownershipExpiration)
 121        {
 0122            Argument.AssertNotNull(storageManager, nameof(storageManager));
 0123            Argument.AssertNotNullOrEmpty(identifier, nameof(identifier));
 0124            Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
 0125            Argument.AssertNotNullOrEmpty(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace));
 0126            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 127
 0128            StorageManager = storageManager;
 0129            OwnerIdentifier = identifier;
 0130            FullyQualifiedNamespace = fullyQualifiedNamespace;
 0131            EventHubName = eventHubName;
 0132            ConsumerGroup = consumerGroup;
 0133            OwnershipExpiration = ownershipExpiration;
 0134        }
 135
 136        /// <summary>
 137        ///   Initializes a new instance of the <see cref="PartitionLoadBalancer"/> class.
 138        /// </summary>
 139        ///
 0140        protected PartitionLoadBalancer()
 141        {
 0142        }
 143
 144        /// <summary>
 145        ///   Performs load balancing between multiple EventProcessorClient instances, claiming others' partitions to en
 146        ///   a more equal distribution when necessary.  It also manages its own partition processing tasks and ownershi
 147        /// </summary>
 148        ///
 149        /// <param name="partitionIds">The set of partitionIds available for ownership balancing.</param>
 150        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 151        ///
 152        /// <returns>The claimed ownership. <c>null</c> if this instance is not eligible, if no claimable ownership was 
 153        ///
 154        public virtual async ValueTask<EventProcessorPartitionOwnership> RunLoadBalancingAsync(string[] partitionIds,
 155                                                                                               CancellationToken cancell
 156        {
 157            // Renew this instance's ownership so they don't expire.
 158
 0159            await RenewOwnershipAsync(cancellationToken).ConfigureAwait(false);
 160
 161            // From the storage service, obtain a complete list of ownership, including expired ones.  We may still need
 162            // their eTags to claim orphan partitions.
 163
 164            IEnumerable<EventProcessorPartitionOwnership> completeOwnershipList;
 165
 166            try
 167            {
 0168                completeOwnershipList = (await StorageManager.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, 
 0169                    .ConfigureAwait(false))
 0170                    .ToList();
 0171            }
 0172            catch (Exception ex)
 173            {
 0174                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 175
 176                // If ownership list retrieval fails, give up on the current cycle.  There's nothing more we can do
 177                // without an updated ownership list.  Set the EventHubName to null so it doesn't modify the exception
 178                // message.  This exception message is used so the processor can retrieve the raw Operation string, and
 179                // adding the EventHubName would append unwanted info to it.
 180
 0181                throw new EventHubsException(true, null, Resources.OperationListOwnership, ex);
 182            }
 183
 184            // There's no point in continuing the current cycle if we failed to fetch the completeOwnershipList.
 185
 0186            if (completeOwnershipList == default)
 187            {
 0188                return default;
 189            }
 190
 0191            var unclaimedPartitions = new HashSet<string>(partitionIds);
 192
 193            // Create a partition distribution dictionary from the complete ownership list we have, mapping an owner's i
 194            // partitions it owns.  When an event processor goes down and it has only expired ownership, it will not be 
 195            // by others.  The expiration time defaults to 30 seconds, but it may be overridden by a derived class.
 196
 0197            var utcNow = DateTimeOffset.UtcNow;
 0198            var activeOwnership = default(EventProcessorPartitionOwnership);
 199
 0200            ActiveOwnershipWithDistribution.Clear();
 0201            ActiveOwnershipWithDistribution[OwnerIdentifier] = new List<EventProcessorPartitionOwnership>();
 202
 0203            foreach (EventProcessorPartitionOwnership ownership in completeOwnershipList)
 204            {
 0205                if (utcNow.Subtract(ownership.LastModifiedTime) < OwnershipExpiration && !string.IsNullOrEmpty(ownership
 206                {
 0207                    activeOwnership = ownership;
 208
 209                    // If a processor crashes and restarts, then it is possible for it to own partitions that it is not 
 210                    // tracking as owned.  Test for this case and ensure that ownership is tracked and extended.
 211
 0212                    if ((string.Equals(ownership.OwnerIdentifier, OwnerIdentifier, StringComparison.OrdinalIgnoreCase)) 
 213                    {
 0214                        (_, activeOwnership) = await ClaimOwnershipAsync(ownership.PartitionId, new[] { ownership }, can
 215
 216                        // If the claim failed, then the ownership period was not extended.  Since the original ownershi
 217                        // yet expired prior to the claim attempt, consider the original to be the active ownership for 
 218
 0219                        if (activeOwnership == default)
 220                        {
 0221                            activeOwnership = ownership;
 222                        }
 223
 0224                        InstanceOwnership[activeOwnership.PartitionId] = activeOwnership;
 225                    }
 226
 227                    // Update active ownership and trim the unclaimed partitions.
 228
 0229                    if (ActiveOwnershipWithDistribution.ContainsKey(activeOwnership.OwnerIdentifier))
 230                    {
 0231                        ActiveOwnershipWithDistribution[activeOwnership.OwnerIdentifier].Add(activeOwnership);
 232                    }
 233                    else
 234                    {
 0235                        ActiveOwnershipWithDistribution[activeOwnership.OwnerIdentifier] = new List<EventProcessorPartit
 236                    }
 237
 0238                    unclaimedPartitions.Remove(activeOwnership.PartitionId);
 239                }
 0240            }
 241
 242            // Find an ownership to claim and try to claim it.  The method will return null if this instance was not eli
 243            // increase its ownership list, if no claimable ownership could be found or if a claim attempt has failed.
 244
 0245            var (claimAttempted, claimedOwnership) = await FindAndClaimOwnershipAsync(completeOwnershipList, unclaimedPa
 246
 0247            if (claimedOwnership != null)
 248            {
 0249                InstanceOwnership[claimedOwnership.PartitionId] = claimedOwnership;
 250            }
 251
 252            // Update the balanced state.  Consider the load balanced if this processor has its minimum share of partiti
 253            // attempt to claim a partition.
 254
 0255            var minimumDesiredPartitions = partitionIds.Length / ActiveOwnershipWithDistribution.Keys.Count;
 0256            IsBalanced = ((InstanceOwnership.Count >= minimumDesiredPartitions) && (!claimAttempted));
 257
 0258            return claimedOwnership;
 0259        }
 260
 261        /// <summary>
 262        ///   Relinquishes this instance's ownership so they can be claimed by other processors and clears the OwnedPart
 263        /// </summary>
 264        ///
 265        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 266        ///
 267        public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellationToken)
 268        {
 0269            IEnumerable<EventProcessorPartitionOwnership> ownershipToRelinquish = InstanceOwnership.Values
 0270                .Select(ownership => new EventProcessorPartitionOwnership
 0271                {
 0272                    FullyQualifiedNamespace = ownership.FullyQualifiedNamespace,
 0273                    EventHubName = ownership.EventHubName,
 0274                    ConsumerGroup = ownership.ConsumerGroup,
 0275                    OwnerIdentifier = string.Empty, //set ownership to Empty so that it is treated as available to claim
 0276                    PartitionId = ownership.PartitionId,
 0277                    LastModifiedTime = ownership.LastModifiedTime,
 0278                    Version = ownership.Version
 0279                });
 280
 0281            await StorageManager.ClaimOwnershipAsync(ownershipToRelinquish, cancellationToken).ConfigureAwait(false);
 282
 0283            InstanceOwnership.Clear();
 0284        }
 285
 286        /// <summary>
 287        ///   Finds and tries to claim an ownership if this processor instance is eligible to increase its ownership lis
 288        /// </summary>
 289        ///
 290        /// <param name="completeOwnershipEnumerable">A complete enumerable of ownership obtained from the storage servi
 291        /// <param name="unclaimedPartitions">The set of partitionIds that are currently unclaimed.</param>
 292        /// <param name="partitionCount">The count of partitions.</param>
 293        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 294        ///
 295        /// <returns>A tuple indicating whether a claim was attempted and any ownership that was claimed.  The claimed o
 296        ///
 297        private ValueTask<(bool wasClaimAttempted, EventProcessorPartitionOwnership claimedPartition)> FindAndClaimOwner
 298                                                                                                                        
 299                                                                                                                        
 300                                                                                                                        
 301        {
 0302            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 303
 304            // The minimum owned partitions count is the minimum amount of partitions every event processor needs to own
 305            // is balanced.  If n = minimumOwnedPartitionsCount, a balanced distribution will only have processors that 
 306            // each.  We can guarantee the partition distribution has at least one key, which corresponds to this event 
 307            // if it owns no partitions.
 308
 0309            var minimumOwnedPartitionsCount = partitionCount / ActiveOwnershipWithDistribution.Keys.Count;
 0310            Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount);
 311
 0312            var ownedPartitionsCount = ActiveOwnershipWithDistribution[OwnerIdentifier].Count;
 0313            Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier);
 314
 315            // There are two possible situations in which we may need to claim a partition ownership:
 316            //
 317            //   - The first one is when we are below the minimum amount of owned partitions.  There's nothing more to c
 318            //     partitions to enforce balancing.
 319            //
 320            //   - The second case is a bit tricky.  Sometimes the claim must be performed by an event processor that al
 321            //     amount of ownership.  This may happen, for instance, when we have 13 partitions and 3 processors, eac
 322            //     The minimum amount of partitions per processor is, in fact, 4, but in this example we still have 1 or
 323            //     avoid overlooking this kind of situation, we may want to claim an ownership when we have exactly the 
 324            //     but we are making sure there are no better candidates among the other event processors.
 325
 0326            if (ownedPartitionsCount < minimumOwnedPartitionsCount
 0327                || (ownedPartitionsCount == minimumOwnedPartitionsCount && !ActiveOwnershipWithDistribution.Values.Any(p
 328            {
 329                // Look for unclaimed partitions.  If any, randomly pick one of them to claim.
 330
 0331                Logger.UnclaimedPartitions(unclaimedPartitions);
 332
 0333                if (unclaimedPartitions.Count > 0)
 334                {
 0335                    var index = RandomNumberGenerator.Value.Next(unclaimedPartitions.Count);
 0336                    var returnTask = ClaimOwnershipAsync(unclaimedPartitions.ElementAt(index), completeOwnershipEnumerab
 337
 0338                    return new ValueTask<(bool, EventProcessorPartitionOwnership)>(returnTask);
 339                }
 340
 341                // Only try to steal partitions if there are no unclaimed partitions left.  At first, only processors th
 342                // maximum owned partition count should be targeted.
 343
 0344                Logger.ShouldStealPartition(OwnerIdentifier);
 345
 0346                var maximumOwnedPartitionsCount = minimumOwnedPartitionsCount + 1;
 0347                var partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount = new List<string>();
 0348                var partitionsOwnedByProcessorWithExactlyMaximumOwnedPartitionsCount = new List<string>();
 349
 350                // Build a list of partitions owned by processors owning exactly maximumOwnedPartitionsCount partitions
 351                // and a list of partitions owned by processors owning more than maximumOwnedPartitionsCount partitions.
 352                // Ignore the partitions already owned by this processor even though the current processor should never 
 353
 0354                foreach (var key in ActiveOwnershipWithDistribution.Keys)
 355                {
 0356                    var ownedPartitions = ActiveOwnershipWithDistribution[key];
 357
 0358                    if (ownedPartitions.Count < maximumOwnedPartitionsCount || string.Equals(key, OwnerIdentifier, Strin
 359                    {
 360                        // Skip if the common case is true.
 361
 362                        continue;
 363                    }
 0364                    if (ownedPartitions.Count == maximumOwnedPartitionsCount)
 365                    {
 0366                        ownedPartitions
 0367                            .ForEach(ownership => partitionsOwnedByProcessorWithExactlyMaximumOwnedPartitionsCount.Add(o
 368                    }
 369                    else
 370                    {
 0371                        ownedPartitions
 0372                            .ForEach(ownership => partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount.A
 373                    }
 374                }
 375
 376                // Here's the important part.  If there are no processors that have exceeded the maximum owned partition
 377                // need to steal from the processors that have exactly the maximum amount.  If this instance is below th
 378                // we have no choice as we need to enforce balancing.  Otherwise, leave it as it is because the distribu
 379
 0380                if (partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount.Count > 0)
 381                {
 382                    // If any stealable partitions were found, randomly pick one of them to claim.
 383
 0384                    Logger.StealPartition(OwnerIdentifier);
 385
 0386                    var index = RandomNumberGenerator.Value.Next(partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPa
 387
 0388                    var returnTask = ClaimOwnershipAsync(
 0389                        partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount[index],
 0390                        completeOwnershipEnumerable,
 0391                        cancellationToken);
 392
 0393                    return new ValueTask<(bool, EventProcessorPartitionOwnership)>(returnTask);
 394                }
 0395                else if (ownedPartitionsCount < minimumOwnedPartitionsCount)
 396                {
 397                    // If any stealable partitions were found, randomly pick one of them to claim.
 398
 0399                    Logger.StealPartition(OwnerIdentifier);
 400
 0401                    var index = RandomNumberGenerator.Value.Next(partitionsOwnedByProcessorWithExactlyMaximumOwnedPartit
 402
 0403                    var returnTask = ClaimOwnershipAsync(
 0404                        partitionsOwnedByProcessorWithExactlyMaximumOwnedPartitionsCount[index],
 0405                        completeOwnershipEnumerable,
 0406                        cancellationToken);
 407
 0408                    return new ValueTask<(bool, EventProcessorPartitionOwnership)>(returnTask);
 409                }
 410            }
 411
 412            // No ownership has been claimed.
 413
 0414            return new ValueTask<(bool, EventProcessorPartitionOwnership)>((false, default(EventProcessorPartitionOwners
 415        }
 416
 417        /// <summary>
 418        ///   Renews this instance's ownership so they don't expire.
 419        /// </summary>
 420        ///
 421        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 422        ///
 423        private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
 424        {
 0425            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 426
 0427            Logger.RenewOwnershipStart(OwnerIdentifier);
 428
 0429            IEnumerable<EventProcessorPartitionOwnership> ownershipToRenew = InstanceOwnership.Values
 0430                .Select(ownership => new EventProcessorPartitionOwnership
 0431                {
 0432                    FullyQualifiedNamespace = ownership.FullyQualifiedNamespace,
 0433                    EventHubName = ownership.EventHubName,
 0434                    ConsumerGroup = ownership.ConsumerGroup,
 0435                    OwnerIdentifier = ownership.OwnerIdentifier,
 0436                    PartitionId = ownership.PartitionId,
 0437                    LastModifiedTime = DateTimeOffset.UtcNow,
 0438                    Version = ownership.Version
 0439                });
 440
 441            try
 442            {
 443                // Dispose of all previous partition ownership instances and get a whole new dictionary.
 444
 0445                InstanceOwnership = (await StorageManager.ClaimOwnershipAsync(ownershipToRenew, cancellationToken)
 0446                    .ConfigureAwait(false))
 0447                    .ToDictionary(ownership => ownership.PartitionId);
 0448            }
 0449            catch (Exception ex)
 450            {
 0451                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 452
 453                // If ownership renewal fails just give up and try again in the next cycle.  The processor may
 454                // end up losing some of its ownership.
 455
 0456                Logger.RenewOwnershipError(OwnerIdentifier, ex.Message);
 457
 458                // Set the EventHubName to null so it doesn't modify the exception message. This exception message is
 459                // used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
 460                // unwanted info to it.
 461
 0462                throw new EventHubsException(true, null, Resources.OperationRenewOwnership, ex);
 463            }
 464            finally
 465            {
 0466                Logger.RenewOwnershipComplete(OwnerIdentifier);
 467            }
 0468        }
 469
 470        /// <summary>
 471        ///   Tries to claim ownership of the specified partition.
 472        /// </summary>
 473        ///
 474        /// <param name="partitionId">The identifier of the Event Hub partition the ownership is associated with.</param
 475        /// <param name="completeOwnershipEnumerable">A complete enumerable of ownership obtained from the stored servic
 476        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 477        ///
 478        /// <returns>A tuple indicating whether a claim was attempted and the claimed ownership. The claimed ownership w
 479        ///
 480        private async Task<(bool wasClaimAttempted, EventProcessorPartitionOwnership claimedPartition)> ClaimOwnershipAs
 481                                                                                                                        
 482                                                                                                                        
 483        {
 0484            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 485
 0486            Logger.ClaimOwnershipStart(partitionId);
 487
 488            // We need the eTag from the most recent ownership of this partition, even if it's expired.  We want to keep
 489            // the sequence number as well.
 490
 0491            var oldOwnership = completeOwnershipEnumerable.FirstOrDefault(ownership => ownership.PartitionId == partitio
 492
 0493            var newOwnership = new EventProcessorPartitionOwnership
 0494            {
 0495                FullyQualifiedNamespace = FullyQualifiedNamespace,
 0496                EventHubName = EventHubName,
 0497                ConsumerGroup = ConsumerGroup,
 0498                OwnerIdentifier = OwnerIdentifier,
 0499                PartitionId = partitionId,
 0500                LastModifiedTime = DateTimeOffset.UtcNow,
 0501                Version = oldOwnership?.Version
 0502            };
 503
 0504            var claimedOwnership = default(IEnumerable<EventProcessorPartitionOwnership>);
 505
 506            try
 507            {
 0508                claimedOwnership = await StorageManager.ClaimOwnershipAsync(new List<EventProcessorPartitionOwnership> {
 0509            }
 0510            catch (Exception ex)
 511            {
 0512                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 513
 514                // If ownership claim fails, just treat it as a usual ownership claim failure.
 515
 0516                Logger.ClaimOwnershipError(partitionId, ex.Message);
 517
 518                // Set the EventHubName to null so it doesn't modify the exception message. This exception message is
 519                // used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
 520                // unwanted info to it. This exception also communicates the PartitionId to the caller.
 521
 0522                var exception = new EventHubsException(true, null, Resources.OperationClaimOwnership, ex);
 0523                exception.SetFailureOperation(exception.Message);
 0524                exception.SetFailureData(partitionId);
 0525                throw exception;
 526            }
 527
 528            // We are expecting an enumerable with a single element if the claim attempt succeeds.
 529
 0530            return (true, claimedOwnership.FirstOrDefault());
 0531        }
 532    }
 533}