< Summary

Class:Azure.Storage.Blobs.ChangeFeed.ChangeFeedFactory
Assembly:Azure.Storage.Blobs.ChangeFeed
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs.ChangeFeed\src\ChangeFeedFactory.cs
Covered lines:96
Uncovered lines:4
Coverable lines:100
Total lines:217
Line coverage:96% (96 of 100)
Covered branches:38
Total branches:46
Branch coverage:82.6% (38 of 46)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-100%100%
.ctor(...)-100%100%
BuildChangeFeed()-96.72%89.29%
ValidateCursor(...)-60%50%
GetYearPathsInternal()-100%78.57%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs.ChangeFeed\src\ChangeFeedFactory.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Text.Json;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Storage.Blobs.Models;
 10
 11namespace Azure.Storage.Blobs.ChangeFeed
 12{
 13    internal class ChangeFeedFactory
 14    {
 15        private readonly SegmentFactory _segmentFactory;
 16        private readonly BlobContainerClient _containerClient;
 17
 7618        public ChangeFeedFactory(
 7619            BlobServiceClient blobServiceClient)
 20        {
 7621            _containerClient = blobServiceClient.GetBlobContainerClient(Constants.ChangeFeed.ChangeFeedContainerName);
 7622            _segmentFactory = new SegmentFactory(
 7623                _containerClient,
 7624                new ShardFactory(
 7625                    _containerClient,
 7626                    new ChunkFactory(
 7627                        _containerClient,
 7628                        new LazyLoadingBlobStreamFactory(),
 7629                        new AvroReaderFactory())));
 7630        }
 31
 2032        public ChangeFeedFactory(
 2033            BlobContainerClient containerClient,
 2034            SegmentFactory segmentFactory)
 35        {
 2036            _containerClient = containerClient;
 2037            _segmentFactory = segmentFactory;
 2038        }
 39
 40        public async Task<ChangeFeed> BuildChangeFeed(
 41            DateTimeOffset? startTime,
 42            DateTimeOffset? endTime,
 43            string continuation,
 44            bool async,
 45            CancellationToken cancellationToken)
 46        {
 47            DateTimeOffset lastConsumable;
 9248            Queue<string> years = new Queue<string>();
 9249            Queue<string> segments = new Queue<string>();
 9250            ChangeFeedCursor cursor = null;
 51
 52            // Create cursor
 9253            if (continuation != null)
 54            {
 3655                cursor = JsonSerializer.Deserialize<ChangeFeedCursor>(continuation);
 3656                ValidateCursor(_containerClient, cursor);
 3657                startTime = BlobChangeFeedExtensions.ToDateTimeOffset(cursor.CurrentSegmentCursor.SegmentPath).Value;
 3658                endTime = cursor.EndTime;
 59            }
 60            // Round start and end time if we are not using the cursor.
 61            else
 62            {
 5663                startTime = startTime.RoundDownToNearestHour();
 5664                endTime = endTime.RoundUpToNearestHour();
 65            }
 66
 67            // Check if Change Feed has been abled for this account.
 68            bool changeFeedContainerExists;
 69
 9270            if (async)
 71            {
 8472                changeFeedContainerExists = await _containerClient.ExistsAsync(cancellationToken: cancellationToken).Con
 73            }
 74            else
 75            {
 876                changeFeedContainerExists = _containerClient.Exists(cancellationToken: cancellationToken);
 77            }
 78
 9279            if (!changeFeedContainerExists)
 80            {
 081                throw new ArgumentException("Change Feed hasn't been enabled on this account, or is currently being enab
 82            }
 83
 84            // Get last consumable
 9285            BlobClient blobClient = _containerClient.GetBlobClient(Constants.ChangeFeed.MetaSegmentsPath);
 86            BlobDownloadInfo blobDownloadInfo;
 9287            if (async)
 88            {
 8489                blobDownloadInfo = await blobClient.DownloadAsync(cancellationToken: cancellationToken).ConfigureAwait(f
 90            }
 91            else
 92            {
 893                blobDownloadInfo = blobClient.Download(cancellationToken: cancellationToken);
 94            }
 95
 96            JsonDocument jsonMetaSegment;
 9297            if (async)
 98            {
 8499                jsonMetaSegment = await JsonDocument.ParseAsync(
 84100                    blobDownloadInfo.Content,
 84101                    cancellationToken: cancellationToken
 84102                    ).ConfigureAwait(false);
 103            }
 104            else
 105            {
 8106                jsonMetaSegment = JsonDocument.Parse(blobDownloadInfo.Content);
 107            }
 108
 92109            lastConsumable = jsonMetaSegment.RootElement.GetProperty("lastConsumable").GetDateTimeOffset();
 110
 111            // Get year paths
 92112            years = await GetYearPathsInternal(
 92113                async,
 92114                cancellationToken).ConfigureAwait(false);
 115
 116            // Dequeue any years that occur before start time
 92117            if (startTime.HasValue)
 118            {
 108119                while (years.Count > 0
 108120                    && BlobChangeFeedExtensions.ToDateTimeOffset(years.Peek()) < startTime.RoundDownToNearestYear())
 121                {
 20122                    years.Dequeue();
 123                }
 124            }
 125
 126            // There are no years.
 92127            if (years.Count == 0)
 128            {
 4129                return ChangeFeed.Empty();
 130            }
 131
 180132            while (segments.Count == 0 && years.Count > 0)
 133            {
 134                // Get Segments for year
 92135                segments = await BlobChangeFeedExtensions.GetSegmentsInYearInternal(
 92136                    containerClient: _containerClient,
 92137                    yearPath: years.Dequeue(),
 92138                    startTime: startTime,
 92139                    endTime: BlobChangeFeedExtensions.MinDateTime(lastConsumable, endTime),
 92140                    async: async,
 92141                    cancellationToken: cancellationToken)
 92142                    .ConfigureAwait(false);
 143            }
 144
 145            // We were on the last year, and there were no more segments.
 88146            if (segments.Count == 0)
 147            {
 0148                return ChangeFeed.Empty();
 149            }
 150
 88151            Segment currentSegment = await _segmentFactory.BuildSegment(
 88152                async,
 88153                segments.Dequeue(),
 88154                cursor?.CurrentSegmentCursor)
 88155                .ConfigureAwait(false);
 156
 88157            return new ChangeFeed(
 88158                _containerClient,
 88159                _segmentFactory,
 88160                years,
 88161                segments,
 88162                currentSegment,
 88163                lastConsumable,
 88164                startTime,
 88165                endTime);
 92166        }
 167
 168        private static void ValidateCursor(
 169            BlobContainerClient containerClient,
 170            ChangeFeedCursor cursor)
 171        {
 36172            if (BlobChangeFeedExtensions.ComputeMD5(containerClient.Uri.AbsoluteUri) != cursor.UrlHash)
 173            {
 0174                throw new ArgumentException("Cursor URL does not match container URL.");
 175            }
 36176            if (cursor.CursorVersion != 1)
 177            {
 0178                throw new ArgumentException("Unsupported cursor version.");
 179            }
 36180        }
 181
 182        internal async Task<Queue<string>> GetYearPathsInternal(
 183            bool async,
 184            CancellationToken cancellationToken)
 185        {
 96186            List<string> list = new List<string>();
 187
 96188            if (async)
 189            {
 560190                await foreach (BlobHierarchyItem blobHierarchyItem in _containerClient.GetBlobsByHierarchyAsync(
 86191                    prefix: Constants.ChangeFeed.SegmentPrefix,
 86192                    delimiter: "/",
 86193                    cancellationToken: cancellationToken).ConfigureAwait(false))
 194                {
 194195                    if (blobHierarchyItem.Prefix.Contains(Constants.ChangeFeed.InitalizationSegment))
 196                        continue;
 197
 108198                    list.Add(blobHierarchyItem.Prefix);
 199                }
 200            }
 201            else
 202            {
 104203                foreach (BlobHierarchyItem blobHierarchyItem in _containerClient.GetBlobsByHierarchy(
 10204                prefix: Constants.ChangeFeed.SegmentPrefix,
 10205                delimiter: "/",
 10206                cancellationToken: cancellationToken))
 207                {
 42208                    if (blobHierarchyItem.Prefix.Contains(Constants.ChangeFeed.InitalizationSegment))
 209                        continue;
 210
 32211                    list.Add(blobHierarchyItem.Prefix);
 212                }
 213            }
 96214            return new Queue<string>(list);
 96215        }
 216    }
 217}