< Summary

Class:Azure.Messaging.EventHubs.Processor.BlobsCheckpointStore
Assembly:Azure.Messaging.EventHubs.Processor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\src\Storage\BlobsCheckpointStore.cs
Covered lines:166
Uncovered lines:5
Coverable lines:171
Total lines:494
Line coverage:97% (166 of 171)
Covered branches:38
Total branches:50
Branch coverage:76% (38 of 50)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-100%100%
get_ContainerClient()-100%100%
get_RetryPolicy()-100%100%
get_Logger()-100%100%
.ctor(...)-100%100%
ListOwnershipAsync()-100%62.5%
<ListOwnershipAsync()-100%50%
ClaimOwnershipAsync()-91.11%87.5%
<ClaimOwnershipAsync()-50%100%
ListCheckpointsAsync()-100%81.25%
<ListCheckpointsAsync()-100%81.25%
UpdateCheckpointAsync()-100%100%
<UpdateCheckpointAsync()-100%100%
ApplyRetryPolicy()-95%68.75%
ApplyRetryPolicy()-100%100%
<ApplyRetryPolicy()-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\src\Storage\BlobsCheckpointStore.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Globalization;
 7using System.IO;
 8using System.Runtime.CompilerServices;
 9using System.Text.RegularExpressions;
 10using System.Threading;
 11using System.Threading.Tasks;
 12using Azure.Core;
 13using Azure.Messaging.EventHubs.Consumer;
 14using Azure.Messaging.EventHubs.Core;
 15using Azure.Messaging.EventHubs.Primitives;
 16using Azure.Messaging.EventHubs.Processor.Diagnostics;
 17using Azure.Storage.Blobs;
 18using Azure.Storage.Blobs.Models;
 19
 20namespace Azure.Messaging.EventHubs.Processor
 21{
 22    /// <summary>
 23    ///   A storage blob service that keeps track of checkpoints and ownership.
 24    /// </summary>
 25    ///
 26    internal sealed class BlobsCheckpointStore : StorageManager
 27    {
 28        /// <summary>A regular expression used to capture strings enclosed in double quotes.</summary>
 229        private static readonly Regex DoubleQuotesExpression = new Regex("\"(.*)\"", RegexOptions.Compiled);
 30
 31        /// <summary>An ETag value to be used for permissive matching when querying Storage.</summary>
 232        private static readonly ETag IfNoneMatchAllTag = new ETag("*");
 33
 34        /// <summary>
 35        ///   Specifies a string that filters the results to return only checkpoint blobs whose name begins
 36        ///   with the specified prefix.
 37        /// </summary>
 38        ///
 39        private const string CheckpointPrefix = "{0}/{1}/{2}/checkpoint/";
 40
 41        /// <summary>
 42        ///   Specifies a string that filters the results to return only ownership blobs whose name begins
 43        ///   with the specified prefix.
 44        /// </summary>
 45        ///
 46        private const string OwnershipPrefix = "{0}/{1}/{2}/ownership/";
 47
 48        /// <summary>
 49        ///   The client used to interact with the Azure Blob Storage service.
 50        /// </summary>
 51        ///
 12052        private BlobContainerClient ContainerClient { get; }
 53
 54        /// <summary>
 55        ///   The active policy which governs retry attempts for the
 56        ///   <see cref="BlobsCheckpointStore" />.
 57        /// </summary>
 58        ///
 28259        private EventHubsRetryPolicy RetryPolicy { get; }
 60
 61        /// <summary>
 62        ///   The instance of <see cref="BlobEventStoreEventSource" /> which can be mocked for testing.
 63        /// </summary>
 64        ///
 61065        internal BlobEventStoreEventSource Logger { get; set; } = BlobEventStoreEventSource.Log;
 66
 67        /// <summary>
 68        ///   Initializes a new instance of the <see cref="BlobsCheckpointStore"/> class.
 69        /// </summary>
 70        ///
 71        /// <param name="blobContainerClient">The client used to interact with the Azure Blob Storage service.</param>
 72        /// <param name="retryPolicy">The retry policy to use as the basis for interacting with the Storage Blobs servic
 73        ///
 13674        public BlobsCheckpointStore(BlobContainerClient blobContainerClient,
 13675                                    EventHubsRetryPolicy retryPolicy)
 76        {
 13677            Argument.AssertNotNull(blobContainerClient, nameof(blobContainerClient));
 13478            Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
 79
 13280            ContainerClient = blobContainerClient;
 13281            RetryPolicy = retryPolicy;
 13282            Logger.BlobsCheckpointStoreCreated(nameof(BlobsCheckpointStore), blobContainerClient.AccountName, blobContai
 13283        }
 84
 85        /// <summary>
 86        ///   Retrieves a complete ownership list from the storage blob service.
 87        /// </summary>
 88        ///
 89        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associated 
 90        /// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with, relative to
 91        /// <param name="consumerGroup">The name of the consumer group the ownership are associated with.</param>
 92        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 93        ///
 94        /// <returns>An enumerable containing all the existing ownership for the associated Event Hub and consumer group
 95        ///
 96        public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifi
 97                                                                                                     string eventHubName
 98                                                                                                     string consumerGrou
 99                                                                                                     CancellationToken c
 100        {
 18101            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 16102            Logger.ListOwnershipStart(fullyQualifiedNamespace, eventHubName, consumerGroup);
 103
 16104            List<EventProcessorPartitionOwnership> result = null;
 105
 106            try
 107            {
 16108                var prefix = string.Format(CultureInfo.InvariantCulture, OwnershipPrefix, fullyQualifiedNamespace.ToLowe
 109
 110                async Task<List<EventProcessorPartitionOwnership>> listOwnershipAsync(CancellationToken listOwnershipTok
 111                {
 20112                    var ownershipList = new List<EventProcessorPartitionOwnership>();
 113
 28114                    await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: p
 115                    {
 116                        // In case this key does not exist, ownerIdentifier is set to null.  This will force the Partiti
 117                        // to throw an exception.
 118
 2119                        blob.Metadata.TryGetValue(BlobMetadataKey.OwnerIdentifier, out var ownerIdentifier);
 120
 2121                        ownershipList.Add(new EventProcessorPartitionOwnership
 2122                        {
 2123                            FullyQualifiedNamespace = fullyQualifiedNamespace,
 2124                            EventHubName = eventHubName,
 2125                            ConsumerGroup = consumerGroup,
 2126                            OwnerIdentifier = ownerIdentifier,
 2127                            PartitionId = blob.Name.Substring(prefix.Length),
 2128                            LastModifiedTime = blob.Properties.LastModified.GetValueOrDefault(),
 2129                            Version = blob.Properties.ETag.ToString()
 2130                        });
 131                    }
 132
 4133                    return ownershipList;
 4134                };
 135
 16136                result = await ApplyRetryPolicy(listOwnershipAsync, cancellationToken).ConfigureAwait(false);
 4137                return result;
 138            }
 2139            catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
 140            {
 2141                Logger.ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message);
 2142                throw new RequestFailedException(Resources.BlobsResourceDoesNotExist);
 143            }
 144            finally
 145            {
 16146                Logger.ListOwnershipComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, result?.Count ?? 0);
 147            }
 4148        }
 149
 150        /// <summary>
 151        ///   Attempts to claim ownership of partitions for processing.
 152        /// </summary>
 153        ///
 154        /// <param name="partitionOwnership">An enumerable containing all the ownership to claim.</param>
 155        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 156        ///
 157        /// <returns>An enumerable containing the successfully claimed ownership.</returns>
 158        ///
 159        public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventP
 160                                                                                                      CancellationToken 
 161        {
 38162            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 163
 36164            var claimedOwnership = new List<EventProcessorPartitionOwnership>();
 36165            var metadata = new Dictionary<string, string>();
 166
 167            Response<BlobContentInfo> contentInfoResponse;
 168            Response<BlobInfo> infoResponse;
 169
 120170            foreach (EventProcessorPartitionOwnership ownership in partitionOwnership)
 171            {
 36172                Logger.ClaimOwnershipStart(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubN
 36173                metadata[BlobMetadataKey.OwnerIdentifier] = ownership.OwnerIdentifier;
 174
 36175                var blobRequestConditions = new BlobRequestConditions();
 36176                var blobName = string.Format(CultureInfo.InvariantCulture, OwnershipPrefix + ownership.PartitionId, owne
 36177                var blobClient = ContainerClient.GetBlobClient(blobName);
 178
 179                try
 180                {
 181                    // Even though documentation states otherwise, we cannot use UploadAsync when the blob already exist
 182                    // the current storage SDK.  For this reason, we are using the specified ETag as an indication of wh
 183                    // method to use.
 184
 36185                    if (ownership.Version == null)
 186                    {
 18187                        blobRequestConditions.IfNoneMatch = IfNoneMatchAllTag;
 188
 189                        async Task<Response<BlobContentInfo>> uploadBlobAsync(CancellationToken uploadToken)
 190                        {
 22191                            using var blobContent = new MemoryStream(Array.Empty<byte>());
 192
 193                            try
 194                            {
 22195                                return await blobClient.UploadAsync(blobContent, metadata: metadata, conditions: blobReq
 196                            }
 0197                            catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.BlobAlreadyExists)
 198                            {
 199                                // A blob could have just been created by another Event Processor that claimed ownership
 200                                // partition.  In this case, there's no point in retrying because we don't have the corr
 201
 0202                                Logger.OwnershipNotClaimable(ownership.PartitionId, ownership.FullyQualifiedNamespace, o
 0203                                return null;
 204                            }
 6205                        };
 206
 18207                        contentInfoResponse = await ApplyRetryPolicy(uploadBlobAsync, cancellationToken).ConfigureAwait(
 208
 6209                        if (contentInfoResponse == null)
 210                        {
 0211                            continue;
 212                        }
 213
 6214                        ownership.LastModifiedTime = contentInfoResponse.Value.LastModified;
 6215                        ownership.Version = contentInfoResponse.Value.ETag.ToString();
 216                    }
 217                    else
 218                    {
 18219                        blobRequestConditions.IfMatch = new ETag(ownership.Version);
 40220                        infoResponse = await ApplyRetryPolicy(uploadToken => blobClient.SetMetadataAsync(metadata, blobR
 221
 2222                        ownership.LastModifiedTime = infoResponse.Value.LastModified;
 2223                        ownership.Version = infoResponse.Value.ETag.ToString();
 224                    }
 225
 226                    // Small workaround to retrieve the eTag.  The current storage SDK returns it enclosed in
 227                    // double quotes ('"ETAG_VALUE"' instead of 'ETAG_VALUE').
 228
 8229                    var match = DoubleQuotesExpression.Match(ownership.Version);
 230
 8231                    if (match.Success)
 232                    {
 2233                        ownership.Version = match.Groups[1].ToString();
 234                    }
 235
 8236                    claimedOwnership.Add(ownership);
 8237                    Logger.OwnershipClaimed(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHub
 8238                }
 6239                catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ConditionNotMet)
 240                {
 4241                    Logger.OwnershipNotClaimable(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.Eve
 4242                }
 2243                catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound || ex.ErrorCode 
 244                {
 2245                    Logger.ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.Event
 2246                    throw new RequestFailedException(Resources.BlobsResourceDoesNotExist);
 247                }
 22248                catch (Exception ex)
 249                {
 22250                    Logger.ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.Event
 22251                    throw;
 252                }
 253                finally
 254                {
 36255                    Logger.ClaimOwnershipComplete(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.Ev
 256                }
 12257            }
 258
 12259            return claimedOwnership;
 12260        }
 261
 262        /// <summary>
 263        ///   Retrieves a list of all the checkpoints in a data store for a given namespace, Event Hub and consumer grou
 264        /// </summary>
 265        ///
 266        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associated 
 267        /// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with, relative to
 268        /// <param name="consumerGroup">The name of the consumer group the ownership are associated with.</param>
 269        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 270        ///
 271        /// <returns>An enumerable containing all the existing checkpoints for the associated Event Hub and consumer gro
 272        ///
 273        public override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(string fullyQualifiedName
 274                                                                                               string eventHubName,
 275                                                                                               string consumerGroup,
 276                                                                                               CancellationToken cancell
 277        {
 28278            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 26279            Logger.ListCheckpointsStart(fullyQualifiedNamespace, eventHubName, consumerGroup);
 280
 26281            var prefix = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix, fullyQualifiedNamespace.ToLowerIn
 26282            var checkpointCount = 0;
 283
 284            async Task<IEnumerable<EventProcessorCheckpoint>> listCheckpointsAsync(CancellationToken listCheckpointsToke
 285            {
 30286                var checkpoints = new List<EventProcessorCheckpoint>();
 287
 62288                await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: prefi
 289                {
 10290                    var partitionId = blob.Name.Substring(prefix.Length);
 10291                    var startingPosition = default(EventPosition?);
 292
 10293                    if (blob.Metadata.TryGetValue(BlobMetadataKey.Offset, out var str) && long.TryParse(str, NumberStyle
 294                    {
 4295                        startingPosition = EventPosition.FromOffset(result, false);
 296                    }
 6297                    else if (blob.Metadata.TryGetValue(BlobMetadataKey.SequenceNumber, out str) && long.TryParse(str, Nu
 298                    {
 2299                        startingPosition = EventPosition.FromSequenceNumber(result, false);
 300                    }
 301
 302                    // If either the offset or the sequence number was not populated,
 303                    // this is not a valid checkpoint.
 304
 10305                    if (startingPosition.HasValue)
 306                    {
 6307                        checkpoints.Add(new EventProcessorCheckpoint
 6308                        {
 6309                            FullyQualifiedNamespace = fullyQualifiedNamespace,
 6310                            EventHubName = eventHubName,
 6311                            ConsumerGroup = consumerGroup,
 6312                            PartitionId = partitionId,
 6313                            StartingPosition = startingPosition.Value
 6314                        });
 315                    }
 316                    else
 317                    {
 4318                        Logger.InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup)
 319                    }
 320                }
 321
 12322                checkpointCount = checkpoints.Count;
 12323                return checkpoints;
 12324            };
 325
 326            try
 327            {
 26328                return await ApplyRetryPolicy(listCheckpointsAsync, cancellationToken).ConfigureAwait(false);
 329            }
 2330            catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
 331            {
 2332                Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message);
 2333                throw new RequestFailedException(Resources.BlobsResourceDoesNotExist);
 334            }
 12335            catch (Exception ex)
 336            {
 12337                Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message);
 12338                throw;
 339            }
 340            finally
 341            {
 26342                Logger.ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpointCount);
 343            }
 12344        }
 345
 346        /// <summary>
 347        ///   Updates the checkpoint using the given information for the associated partition and consumer group in the 
 348        /// </summary>
 349        ///
 350        /// <param name="checkpoint">The checkpoint containing the information to be stored.</param>
 351        /// <param name="eventData">The event to use as the basis for the checkpoint's starting position.</param>
 352        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 353        ///
 354        /// <returns>A task to be resolved on when the operation has completed.</returns>
 355        ///
 356        public override async Task UpdateCheckpointAsync(EventProcessorCheckpoint checkpoint,
 357                                                         EventData eventData,
 358                                                         CancellationToken cancellationToken)
 359        {
 36360            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 34361            Logger.UpdateCheckpointStart(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHub
 362
 34363            var blobName = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix + checkpoint.PartitionId, checkp
 34364            var blobClient = ContainerClient.GetBlobClient(blobName);
 365
 34366            var metadata = new Dictionary<string, string>()
 34367            {
 34368                { BlobMetadataKey.Offset, eventData.Offset.ToString(CultureInfo.InvariantCulture) },
 34369                { BlobMetadataKey.SequenceNumber, eventData.SequenceNumber.ToString(CultureInfo.InvariantCulture) }
 34370            };
 371
 372            try
 373            {
 374                try
 375                {
 376                    // Assume the blob is present and attempt to set the metadata.
 377
 72378                    await ApplyRetryPolicy(token => blobClient.SetMetadataAsync(metadata, cancellationToken: token), can
 4379                }
 18380                catch (RequestFailedException ex) when ((ex.ErrorCode == BlobErrorCode.BlobNotFound) || (ex.ErrorCode ==
 381                {
 382                    // If the blob wasn't present, fall-back to trying to create a new one.
 383
 18384                    await ApplyRetryPolicy(async token =>
 18385                    {
 40386                        using var blobContent = new MemoryStream(Array.Empty<byte>());
 40387                        await blobClient.UploadAsync(blobContent, metadata: metadata, cancellationToken: token).Configur
 18388
 22389                    }, cancellationToken).ConfigureAwait(false);
 390                }
 8391            }
 2392            catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
 393            {
 2394                Logger.UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.Even
 2395                throw new RequestFailedException(Resources.BlobsResourceDoesNotExist);
 396            }
 24397            catch (Exception ex)
 398            {
 24399                Logger.UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.Even
 24400                throw;
 401            }
 402            finally
 403            {
 34404                Logger.UpdateCheckpointComplete(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.E
 405            }
 8406        }
 407
 408        /// <summary>
 409        ///   Applies the checkpoint store's <see cref="RetryPolicy" /> to a specified function.
 410        /// </summary>
 411        ///
 412        /// <param name="functionToRetry">The function to which the retry policy should be applied.</param>
 413        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 414        ///
 415        /// <returns>The value returned by the function to which the retry policy has been applied.</returns>
 416        ///
 417        private async Task ApplyRetryPolicy(Func<CancellationToken, Task> functionToRetry,
 418                                            CancellationToken cancellationToken)
 419        {
 420            TimeSpan? retryDelay;
 421
 130422            var failedAttemptCount = 0;
 130423            var tryTimeout = RetryPolicy.CalculateTryTimeout(0);
 130424            var timeoutTokenSource = default(CancellationTokenSource);
 130425            var linkedTokenSource = default(CancellationTokenSource);
 426
 154427            while (!cancellationToken.IsCancellationRequested)
 428            {
 429                try
 430                {
 154431                    timeoutTokenSource = new CancellationTokenSource(tryTimeout);
 154432                    linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutTokenS
 433
 154434                    await functionToRetry(linkedTokenSource.Token).ConfigureAwait(false);
 32435                    return;
 436                }
 437                catch (Exception ex)
 438                {
 439                    // Determine if there should be a retry for the next attempt; if so enforce the delay but do not qui
 440                    // Otherwise, mark the exception as active and break out of the loop.
 441
 122442                    ++failedAttemptCount;
 122443                    retryDelay = RetryPolicy.CalculateRetryDelay(ex, failedAttemptCount);
 444
 122445                    if ((retryDelay.HasValue) && (!cancellationToken.IsCancellationRequested))
 446                    {
 24447                        await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
 24448                        tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
 449                    }
 450                    else
 451                    {
 98452                        timeoutTokenSource?.Token.ThrowIfCancellationRequested<TimeoutException>();
 94453                        throw;
 454                    }
 455                }
 456                finally
 457                {
 154458                    timeoutTokenSource?.Dispose();
 154459                    linkedTokenSource?.Dispose();
 460                }
 461            }
 462
 463            // If no value has been returned nor exception thrown by this point,
 464            // then cancellation has been requested.
 465
 0466            throw new TaskCanceledException();
 32467        }
 468
 469        /// <summary>
 470        ///   Applies the checkpoint store's <see cref="RetryPolicy" /> to a specified function.
 471        /// </summary>
 472        ///
 473        /// <param name="functionToRetry">The function to which the retry policy should be applied.</param>
 474        /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t
 475        ///
 476        /// <typeparam name="T">The type returned by the function to be executed.</typeparam>
 477        ///
 478        /// <returns>The value returned by the function to which the retry policy has been applied.</returns>
 479        ///
 480        private async Task<T> ApplyRetryPolicy<T>(Func<CancellationToken, Task<T>> functionToRetry,
 481                                                  CancellationToken cancellationToken)
 482        {
 112483            var result = default(T);
 484
 485            async Task wrapper(CancellationToken token)
 486            {
 132487                result = await functionToRetry(token).ConfigureAwait(false);
 28488            };
 489
 112490            await ApplyRetryPolicy(wrapper, cancellationToken).ConfigureAwait(false);
 28491            return result;
 28492        }
 493    }
 494}