< Summary

Class:Microsoft.Azure.EventHubs.Processor.AzureStorageCheckpointLeaseManager
Assembly:Microsoft.Azure.EventHubs.Processor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\AzureStorageCheckpointLeaseManager.cs
Covered lines:0
Uncovered lines:276
Coverable lines:276
Total lines:586
Line coverage:0% (0 of 276)
Covered branches:0
Total branches:62
Branch coverage:0% (0 of 62)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-0%100%
.ctor(...)-0%100%
.ctor(...)-0%0%
Initialize(...)-0%100%
CheckpointStoreExistsAsync()-0%100%
CreateCheckpointStoreIfNotExistsAsync()-0%100%
GetCheckpointAsync()-0%0%
CreateCheckpointIfNotExistsAsync(...)-0%100%
UpdateCheckpointAsync()-0%100%
DeleteCheckpointAsync(...)-0%100%
get_LeaseRenewInterval()-0%100%
get_LeaseDuration()-0%100%
LeaseStoreExistsAsync()-0%100%
CreateLeaseStoreIfNotExistsAsync()-0%100%
DeleteLeaseStoreAsync()-0%0%
GetLeaseAsync()-0%100%
GetAllLeasesAsync()-0%0%
CreateLeaseIfNotExistsAsync()-0%0%
DeleteLeaseAsync(...)-0%100%
AcquireLeaseAsync(...)-0%100%
AcquireLeaseCoreAsync()-0%0%
RenewLeaseAsync(...)-0%100%
RenewLeaseCoreAsync()-0%100%
ReleaseLeaseAsync(...)-0%100%
ReleaseLeaseCoreAsync()-0%100%
UpdateLeaseAsync(...)-0%100%
UpdateLeaseCoreAsync()-0%0%
DownloadLeaseAsync()-0%100%
HandleStorageException(...)-0%0%
GetBlockBlobReference(...)-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\AzureStorageCheckpointLeaseManager.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.EventHubs.Processor
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Linq;
 9    using System.Threading;
 10    using System.Threading.Tasks;
 11    using Microsoft.Azure.EventHubs.Primitives;
 12    using Microsoft.Azure.Storage;
 13    using Microsoft.Azure.Storage.Blob;
 14    using Newtonsoft.Json;
 15
 16    class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseManager
 17    {
 018        static string MetaDataOwnerName = "OWNINGHOST";
 19
 20        EventProcessorHost host;
 21        TimeSpan leaseDuration;
 22        TimeSpan leaseRenewInterval;
 23
 024        static readonly TimeSpan storageMaximumExecutionTime = TimeSpan.FromMinutes(2);
 25        readonly CloudStorageAccount cloudStorageAccount;
 26        readonly string leaseContainerName;
 27        readonly string storageBlobPrefix;
 28        BlobRequestOptions defaultRequestOptions;
 29        OperationContext operationContext = null;
 30        CloudBlobContainer eventHubContainer;
 31        CloudBlobDirectory consumerGroupDirectory;
 32
 33        internal AzureStorageCheckpointLeaseManager(string storageConnectionString, string leaseContainerName, string st
 034            : this(CloudStorageAccount.Parse(storageConnectionString), leaseContainerName, storageBlobPrefix)
 35        {
 036        }
 37
 038        internal AzureStorageCheckpointLeaseManager(CloudStorageAccount cloudStorageAccount, string leaseContainerName, 
 39        {
 040            Guard.ArgumentNotNull(nameof(cloudStorageAccount), cloudStorageAccount);
 41
 42            try
 43            {
 044                NameValidator.ValidateContainerName(leaseContainerName);
 045            }
 046            catch (ArgumentException)
 47            {
 048                throw new ArgumentException(
 049                    "Azure Storage lease container name is invalid. Please check naming conventions at https://msdn.micr
 050                    nameof(leaseContainerName));
 51            }
 52
 053            this.cloudStorageAccount = cloudStorageAccount;
 054            this.leaseContainerName = leaseContainerName;
 55
 56            // Convert all-whitespace prefix to empty string. Convert null prefix to empty string.
 57            // Then the rest of the code only has one case to worry about.
 058            this.storageBlobPrefix = (storageBlobPrefix != null) ? storageBlobPrefix.Trim() : "";
 059        }
 60
 61        // The EventProcessorHost can't pass itself to the AzureStorageCheckpointLeaseManager constructor
 62        // because it is still being constructed. Do other initialization here also because it might throw and
 63        // hence we don't want it in the constructor.
 64        internal void Initialize(EventProcessorHost host)
 65        {
 066            this.host = host;
 67
 68            // Assign partition manager options.
 069            this.leaseDuration = host.PartitionManagerOptions.LeaseDuration;
 070            this.leaseRenewInterval = host.PartitionManagerOptions.RenewInterval;
 71
 72            // Create storage default request options.
 073            this.defaultRequestOptions = new BlobRequestOptions()
 074            {
 075                // Gets or sets the server timeout interval for a single HTTP request.
 076                ServerTimeout = TimeSpan.FromSeconds(this.LeaseRenewInterval.TotalSeconds / 2),
 077
 078                // Gets or sets the maximum execution time across all potential retries for the request.
 079                MaximumExecutionTime = TimeSpan.FromSeconds(this.LeaseRenewInterval.TotalSeconds)
 080            };
 81
 82#if FullNetFx
 83            // Proxy enabled?
 84            if (this.host.EventProcessorOptions != null && this.host.EventProcessorOptions.WebProxy != null)
 85            {
 86                this.operationContext = new OperationContext
 87                {
 88                    Proxy = this.host.EventProcessorOptions.WebProxy
 89                };
 90            }
 91#endif
 92
 93            // Create storage client and configure max execution time.
 94            // Max execution time will apply to any storage call unless otherwise specified by custom request options.
 095            var storageClient = this.cloudStorageAccount.CreateCloudBlobClient();
 096            storageClient.DefaultRequestOptions = new BlobRequestOptions
 097            {
 098                MaximumExecutionTime = AzureStorageCheckpointLeaseManager.storageMaximumExecutionTime
 099            };
 100
 0101            this.eventHubContainer = storageClient.GetContainerReference(this.leaseContainerName);
 0102            this.consumerGroupDirectory = this.eventHubContainer.GetDirectoryReference(this.storageBlobPrefix + this.hos
 0103        }
 104
 105        //
 106        // In this implementation, checkpoints are data that's actually in the lease blob, so checkpoint operations
 107        // turn into lease operations under the covers.
 108        //
 109        public Task<bool> CheckpointStoreExistsAsync()
 110        {
 0111            return LeaseStoreExistsAsync();
 112        }
 113
 114        public Task<bool> CreateCheckpointStoreIfNotExistsAsync()
 115        {
 116            // Because we control the caller, we know that this method will only be called after createLeaseStoreIfNotEx
 117            // In this implementation, it's the same store, so the store will always exist if execution reaches here.
 0118            return Task.FromResult(true);
 119        }
 120
 121        public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
 122        {
 0123            AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false);
 0124            Checkpoint checkpoint = null;
 0125            if (lease != null && !string.IsNullOrEmpty(lease.Offset))
 126            {
 0127                checkpoint = new Checkpoint(partitionId)
 0128                {
 0129                    Offset = lease.Offset,
 0130                    SequenceNumber = lease.SequenceNumber
 0131                };
 132            }
 133
 0134            return checkpoint;
 0135        }
 136
 137        public Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)
 138        {
 139            // Normally the lease will already be created, checkpoint store is initialized after lease store.
 0140            return Task.FromResult<Checkpoint>(null);
 141        }
 142
 143        public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint)
 144        {
 0145            AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease) lease)
 0146            {
 0147                Offset = checkpoint.Offset,
 0148                SequenceNumber = checkpoint.SequenceNumber
 0149            };
 150
 0151            await this.UpdateLeaseAsync(newLease).ConfigureAwait(false);
 0152        }
 153
 154        public Task DeleteCheckpointAsync(string partitionId)
 155        {
 156            // Make this a no-op to avoid deleting leases by accident.
 0157            return Task.FromResult(0);
 158        }
 159
 160        //
 161        // Lease operations.
 162        //
 0163        public TimeSpan LeaseRenewInterval => this.leaseRenewInterval;
 164
 0165        public TimeSpan LeaseDuration => this.leaseDuration;
 166
 167        public Task<bool> LeaseStoreExistsAsync()
 168        {
 0169            return this.eventHubContainer.ExistsAsync(null, this.operationContext);
 170        }
 171
 172        public Task<bool> CreateLeaseStoreIfNotExistsAsync()
 173        {
 0174            return this.eventHubContainer.CreateIfNotExistsAsync(null, this.operationContext);
 175        }
 176
 177        public async Task<bool> DeleteLeaseStoreAsync()
 178        {
 0179            bool retval = true;
 180
 0181            BlobContinuationToken outerContinuationToken = null;
 182            do
 183            {
 0184                BlobResultSegment outerResultSegment = await this.eventHubContainer.ListBlobsSegmentedAsync(outerContinu
 0185                outerContinuationToken = outerResultSegment.ContinuationToken;
 0186                foreach (IListBlobItem blob in outerResultSegment.Results)
 187                {
 0188                    if (blob is CloudBlobDirectory)
 189                    {
 0190                        BlobContinuationToken innerContinuationToken = null;
 191                        do
 192                        {
 0193                            BlobResultSegment innerResultSegment = await ((CloudBlobDirectory)blob).ListBlobsSegmentedAs
 0194                            innerContinuationToken = innerResultSegment.ContinuationToken;
 0195                            foreach (IListBlobItem subBlob in innerResultSegment.Results)
 196                            {
 197                                try
 198                                {
 0199                                    await ((CloudBlockBlob)subBlob).DeleteIfExistsAsync().ConfigureAwait(false);
 0200                                }
 0201                                catch (StorageException e)
 202                                {
 0203                                    ProcessorEventSource.Log.AzureStorageManagerWarning(this.host.HostName, "N/A", "Fail
 0204                                    retval = false;
 0205                                }
 206                            }
 207                        }
 0208                        while (innerContinuationToken != null);
 0209                    }
 0210                    else if (blob is CloudBlockBlob)
 211                    {
 212                        try
 213                        {
 0214                            await ((CloudBlockBlob)blob).DeleteIfExistsAsync().ConfigureAwait(false);
 0215                        }
 0216                        catch (StorageException e)
 217                        {
 0218                            ProcessorEventSource.Log.AzureStorageManagerWarning(this.host.HostName, "N/A", "Failure whil
 0219                            retval = false;
 0220                        }
 221                    }
 0222                }
 223            }
 0224            while (outerContinuationToken != null);
 225
 0226            return retval;
 0227        }
 228
 229        public async Task<Lease> GetLeaseAsync(string partitionId) // throws URISyntaxException, IOException, StorageExc
 230        {
 0231            CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);
 232
 0233            await leaseBlob.FetchAttributesAsync(null, defaultRequestOptions, this.operationContext).ConfigureAwait(fals
 234
 0235            return await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false);
 0236        }
 237
 238        public async Task<IEnumerable<Lease>> GetAllLeasesAsync()
 239        {
 0240            var leaseList = new List<Lease>();
 0241            BlobContinuationToken continuationToken = null;
 242
 243            do
 244            {
 0245                var listBlobsTask = this.consumerGroupDirectory.ListBlobsSegmentedAsync(
 0246                    true,
 0247                    BlobListingDetails.Metadata,
 0248                    null,
 0249                    continuationToken,
 0250                    this.defaultRequestOptions,
 0251                    this.operationContext);
 252
 253                // ListBlobsSegmentedAsync honors neither timeout settings in request options nor cancellation token and
 254                // This provides a workaround until we have storage.blob library fixed.
 255                BlobResultSegment leaseBlobsResult;
 0256                using (var cts = new CancellationTokenSource())
 257                {
 0258                    var delayTask = Task.Delay(this.LeaseRenewInterval, cts.Token);
 0259                    var completedTask = await Task.WhenAny(listBlobsTask, delayTask).ConfigureAwait(false);
 260
 0261                    if (completedTask == listBlobsTask)
 262                    {
 0263                        cts.Cancel();
 0264                        leaseBlobsResult = await listBlobsTask;
 265                    }
 266                    else
 267                    {
 268                        // Throw OperationCanceledException, caller will log the failures appropriately.
 0269                        throw new OperationCanceledException();
 270                    }
 0271                }
 272
 0273                foreach (CloudBlockBlob leaseBlob in leaseBlobsResult.Results)
 274                {
 275                    // Try getting owner name from existing blob.
 276                    // This might return null when run on the existing lease after SDK upgrade.
 0277                    leaseBlob.Metadata.TryGetValue(MetaDataOwnerName, out var owner);
 278
 279                    // Discover partition id from URI path of the blob.
 0280                    var partitionId = leaseBlob.Uri.AbsolutePath.Split('/').Last();
 281
 0282                    leaseList.Add(new AzureBlobLease(partitionId, owner, leaseBlob));
 283                }
 284
 0285                continuationToken = leaseBlobsResult.ContinuationToken;
 286
 0287            } while (continuationToken != null);
 288
 0289            return leaseList;
 0290        }
 291
 292        public async Task<Lease> CreateLeaseIfNotExistsAsync(string partitionId) // throws URISyntaxException, IOExcepti
 293        {
 294            AzureBlobLease returnLease;
 295            try
 296            {
 0297                CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);
 0298                returnLease = new AzureBlobLease(partitionId, leaseBlob);
 0299                string jsonLease = JsonConvert.SerializeObject(returnLease);
 300
 0301                ProcessorEventSource.Log.AzureStorageManagerInfo(
 0302                    this.host.HostName,
 0303                    partitionId,
 0304                    "CreateLeaseIfNotExist - leaseContainerName: " + this.leaseContainerName +
 0305                    " consumerGroupName: " + this.host.ConsumerGroupName + " storageBlobPrefix: " + this.storageBlobPref
 306
 307                // Don't provide default request options for upload call.
 308                // This request will respect client's default options.
 0309                await leaseBlob.UploadTextAsync(
 0310                    jsonLease,
 0311                    null,
 0312                    AccessCondition.GenerateIfNoneMatchCondition("*"),
 0313                    null,
 0314                    this.operationContext).ConfigureAwait(false);
 0315            }
 0316            catch (StorageException se)
 317            {
 0318                if (se.RequestInformation.ErrorCode == BlobErrorCodeStrings.BlobAlreadyExists ||
 0319                     se.RequestInformation.ErrorCode == BlobErrorCodeStrings.LeaseIdMissing) // occurs when somebody els
 320                {
 321                    // The blob already exists.
 0322                    ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Lease already exi
 0323                    returnLease = (AzureBlobLease)await GetLeaseAsync(partitionId).ConfigureAwait(false);
 324                }
 325                else
 326                {
 0327                    ProcessorEventSource.Log.AzureStorageManagerError(
 0328                        this.host.HostName,
 0329                        partitionId,
 0330                        "CreateLeaseIfNotExist StorageException - leaseContainerName: " + this.leaseContainerName +
 0331                        " consumerGroupName: " + this.host.ConsumerGroupName + " storageBlobPrefix: " + this.storageBlob
 0332                        se.ToString());
 0333                    throw;
 334                }
 335            }
 336
 0337            return returnLease;
 0338        }
 339
 340        public Task DeleteLeaseAsync(Lease lease)
 341        {
 0342            var azureBlobLease = (AzureBlobLease)lease;
 0343            ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, azureBlobLease.PartitionId, "Deleting l
 0344            return azureBlobLease.Blob.DeleteIfExistsAsync();
 345        }
 346
 347        public Task<bool> AcquireLeaseAsync(Lease lease)
 348        {
 0349            return AcquireLeaseCoreAsync((AzureBlobLease)lease);
 350        }
 351
 352        async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
 353        {
 0354            CloudBlockBlob leaseBlob = lease.Blob;
 0355            bool retval = true;
 0356            string newLeaseId = Guid.NewGuid().ToString();
 0357            string partitionId = lease.PartitionId;
 358            try
 359            {
 0360                bool renewLease = false;
 361                string newToken;
 362
 0363                await leaseBlob.FetchAttributesAsync(null, this.defaultRequestOptions, this.operationContext).ConfigureA
 364
 0365                if (leaseBlob.Properties.LeaseState == LeaseState.Leased)
 366                {
 0367                    if (string.IsNullOrEmpty(lease.Token))
 368                    {
 369                        // We reach here in a race condition: when this instance of EventProcessorHost scanned the
 370                        // lease blobs, this partition was unowned (token is empty) but between then and now, another
 371                        // instance of EPH has established a lease (getLeaseState() is LEASED). We normally enforce
 372                        // that we only steal the lease if it is still owned by the instance which owned it when we
 373                        // scanned, but we can't do that when we don't know who owns it. The safest thing to do is just
 374                        // fail the acquisition. If that means that one EPH instance gets more partitions than it should
 375                        // rebalancing will take care of that quickly enough.
 0376                        return false;
 377                    }
 378
 0379                    ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to Cha
 0380                    renewLease = true;
 0381                    newToken = await leaseBlob.ChangeLeaseAsync(
 0382                        newLeaseId,
 0383                        AccessCondition.GenerateLeaseCondition(lease.Token),
 0384                        this.defaultRequestOptions,
 0385                        this.operationContext).ConfigureAwait(false);
 386                }
 387                else
 388                {
 0389                    ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to Acq
 0390                    newToken = await leaseBlob.AcquireLeaseAsync(
 0391                        leaseDuration,
 0392                        newLeaseId,
 0393                        null,
 0394                        this.defaultRequestOptions,
 0395                        this.operationContext).ConfigureAwait(false);
 396                }
 397
 0398                lease.Token = newToken;
 0399                lease.Owner = this.host.HostName;
 0400                lease.IncrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
 401
 402                // Renew lease here if needed?
 403                // ChangeLease doesn't renew so we should avoid lease expiring before next renew interval.
 0404                if (renewLease)
 405                {
 0406                    await this.RenewLeaseCoreAsync(lease).ConfigureAwait(false);
 407                }
 408
 409                // Update owner in the metadata first since clients get ownership information by looking at metadata.
 0410                lease.Blob.Metadata[MetaDataOwnerName] = lease.Owner;
 0411                await lease.Blob.SetMetadataAsync(
 0412                    AccessCondition.GenerateLeaseCondition(lease.Token),
 0413                    this.defaultRequestOptions,
 0414                    this.operationContext).ConfigureAwait(false);
 415
 416                // Then update deserialized lease content.
 0417                await leaseBlob.UploadTextAsync(
 0418                    JsonConvert.SerializeObject(lease),
 0419                    null,
 0420                    AccessCondition.GenerateLeaseCondition(lease.Token),
 0421                    this.defaultRequestOptions,
 0422                    this.operationContext).ConfigureAwait(false);
 0423            }
 0424            catch (StorageException se)
 425            {
 0426                throw HandleStorageException(partitionId, se);
 427            }
 428
 0429            return retval;
 0430        }
 431
 432        public Task<bool> RenewLeaseAsync(Lease lease)
 433        {
 0434            return RenewLeaseCoreAsync((AzureBlobLease)lease);
 435        }
 436
 437        async Task<bool> RenewLeaseCoreAsync(AzureBlobLease lease)
 438        {
 0439            CloudBlockBlob leaseBlob = lease.Blob;
 0440            string partitionId = lease.PartitionId;
 441
 442            try
 443            {
 0444                await leaseBlob.RenewLeaseAsync(
 0445                    AccessCondition.GenerateLeaseCondition(lease.Token),
 0446                    this.defaultRequestOptions,
 0447                    this.operationContext).ConfigureAwait(false);
 0448            }
 0449            catch (StorageException se)
 450            {
 0451                throw HandleStorageException(partitionId, se);
 452            }
 453
 0454            return true;
 0455        }
 456
 457        public Task<bool> ReleaseLeaseAsync(Lease lease)
 458        {
 0459            return ReleaseLeaseCoreAsync((AzureBlobLease)lease);
 460        }
 461
 462        async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
 463        {
 0464            ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Releasing lease");
 465
 0466            CloudBlockBlob leaseBlob = lease.Blob;
 0467            string partitionId = lease.PartitionId;
 468
 469            try
 470            {
 0471                string leaseId = lease.Token;
 0472                AzureBlobLease releasedCopy = new AzureBlobLease(lease)
 0473                {
 0474                    Token = string.Empty,
 0475                    Owner = string.Empty
 0476                };
 477
 478                // Remove owner in the metadata.
 0479                leaseBlob.Metadata.Remove(MetaDataOwnerName);
 480
 0481                await leaseBlob.SetMetadataAsync(
 0482                    AccessCondition.GenerateLeaseCondition(leaseId),
 0483                    this.defaultRequestOptions,
 0484                    this.operationContext).ConfigureAwait(false);
 485
 0486                await leaseBlob.UploadTextAsync(
 0487                    JsonConvert.SerializeObject(releasedCopy),
 0488                    null,
 0489                    AccessCondition.GenerateLeaseCondition(leaseId),
 0490                    this.defaultRequestOptions,
 0491                    this.operationContext).ConfigureAwait(false);
 492
 0493                await leaseBlob.ReleaseLeaseAsync(AccessCondition.GenerateLeaseCondition(leaseId), this.defaultRequestOp
 0494            }
 0495            catch (StorageException se)
 496            {
 0497                throw HandleStorageException(partitionId, se);
 498            }
 499
 0500            return true;
 0501        }
 502
 503        public Task<bool> UpdateLeaseAsync(Lease lease)
 504        {
 0505            return UpdateLeaseCoreAsync((AzureBlobLease)lease);
 506        }
 507
 508        async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)
 509        {
 0510            if (lease == null)
 511            {
 0512                return false;
 513            }
 514
 0515            string partitionId = lease.PartitionId;
 0516            ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Updating lease");
 517
 0518            string token = lease.Token;
 0519            if (string.IsNullOrEmpty(token))
 520            {
 0521                return false;
 522            }
 523
 0524            CloudBlockBlob leaseBlob = lease.Blob;
 525            try
 526            {
 0527                string jsonToUpload = JsonConvert.SerializeObject(lease);
 0528                ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, $"Raw JSON uploa
 529
 530                // This is on the code path of checkpoint call thus don't provide default request options for upload cal
 531                // This request will respect client's default options.
 0532                await leaseBlob.UploadTextAsync(
 0533                    jsonToUpload,
 0534                    null,
 0535                    AccessCondition.GenerateLeaseCondition(token),
 0536                    null,
 0537                    this.operationContext).ConfigureAwait(false);
 0538            }
 0539            catch (StorageException se)
 540            {
 0541                throw HandleStorageException(partitionId, se);
 542            }
 543
 0544            return true;
 0545        }
 546
 547        async Task<Lease> DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOExce
 548        {
 0549            string jsonLease = await blob.DownloadTextAsync(null, null, this.defaultRequestOptions, this.operationContex
 550
 0551            ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Raw JSON downloaded: " + 
 0552            AzureBlobLease rehydrated = (AzureBlobLease)JsonConvert.DeserializeObject(jsonLease, typeof(AzureBlobLease))
 0553            AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob);
 554
 0555            return blobLease;
 0556        }
 557
 558        Exception HandleStorageException(string partitionId, StorageException se)
 559        {
 0560            ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "HandleStorageException - 
 0561            if (se.RequestInformation.HttpStatusCode == 409 || // conflict
 0562                se.RequestInformation.HttpStatusCode == 412) // precondition failed
 563            {
 0564                ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId,
 0565                    "HandleStorageException - Error code: " + se.RequestInformation.ErrorCode);
 0566                ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId,
 0567                    "HandleStorageException - Error message: " + se.RequestInformation.ExtendedErrorInformation?.ErrorMe
 568
 0569                if (se.RequestInformation.ErrorCode == null ||
 0570                    se.RequestInformation.ErrorCode == BlobErrorCodeStrings.LeaseLost ||
 0571                    se.RequestInformation.ErrorCode == BlobErrorCodeStrings.LeaseIdMismatchWithLeaseOperation ||
 0572                    se.RequestInformation.ErrorCode == BlobErrorCodeStrings.LeaseIdMismatchWithBlobOperation)
 573                {
 0574                    return new LeaseLostException(partitionId, se);
 575                }
 576            }
 577
 0578            return se;
 579        }
 580
 581        CloudBlockBlob GetBlockBlobReference(string partitionId)
 582        {
 0583            return this.consumerGroupDirectory.GetBlockBlobReference(partitionId);
 584        }
 585    }
 586}