< Summary

Class:Azure.Storage.Blobs.ChangeFeed.ChangeFeed
Assembly:Azure.Storage.Blobs.ChangeFeed
File(s):C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs.ChangeFeed\src\ChangeFeed.cs
Covered lines:76
Uncovered lines:4
Coverable lines:80
Total lines:205
Line coverage:95% (76 of 80)
Covered branches:30
Total branches:34
Branch coverage:88.2% (30 of 34)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_LastConsumable()-0%100%
.ctor(...)-100%100%
.ctor()-100%100%
GetPage()-85.71%75%
HasNext()-100%91.67%
GetCursor()-100%100%
AdvanceSegmentIfNecessary()-100%100%
Empty()-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\storage\Azure.Storage.Blobs.ChangeFeed\src\ChangeFeed.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Text.Json;
 7using System.Threading.Tasks;
 8using Azure.Storage.Blobs.Models;
 9using System.Threading;
 10
 11namespace Azure.Storage.Blobs.ChangeFeed
 12{
 13    internal class ChangeFeed
 14    {
 15        /// <summary>
 16        /// BlobContainerClient for making List Blob requests and creating Segments.
 17        /// </summary>
 18        private readonly BlobContainerClient _containerClient;
 19
 20        /// <summary>
 21        /// A <see cref="SegmentFactory"/> for creating new <see cref="Segment"/>s.
 22        /// </summary>
 23        private readonly SegmentFactory _segmentFactory;
 24
 25        /// <summary>
 26        /// Queue of paths to years we haven't processed yet.
 27        /// </summary>
 28        private readonly Queue<string> _years;
 29
 30        /// <summary>
 31        /// Paths to segments in the current year we haven't processed yet.
 32        /// </summary>
 33        private Queue<string> _segments;
 34
 35        /// <summary>
 36        /// The Segment we are currently processing.
 37        /// </summary>
 38        private Segment _currentSegment;
 39
 40        /// <summary>
 41        /// The latest time the Change Feed can safely be read from.
 42        /// </summary>
 043        public DateTimeOffset LastConsumable { get; private set; }
 44
 45        /// <summary>
 46        /// User-specified start time.  If the start time occurs before Change Feed was enabled
 47        /// for this account, we will start at the beginning of the Change Feed.
 48        /// </summary>
 49        private DateTimeOffset? _startTime;
 50
 51        /// <summary>
 52        /// User-specified end time.  If the end time occurs after _lastConsumable, we will
 53        /// end at _lastConsumable.
 54        /// </summary>
 55        private DateTimeOffset? _endTime;
 56
 57        /// <summary>
 58        /// If this Change Feed has no events.
 59        /// </summary>
 60        private bool _empty;
 61
 8862        public ChangeFeed(
 8863            BlobContainerClient containerClient,
 8864            SegmentFactory segmentFactory,
 8865            Queue<string> years,
 8866            Queue<string> segments,
 8867            Segment currentSegment,
 8868            DateTimeOffset lastConsumable,
 8869            DateTimeOffset? startTime,
 8870            DateTimeOffset? endTime)
 71        {
 8872            _containerClient = containerClient;
 8873            _segmentFactory = segmentFactory;
 8874            _years = years;
 8875            _segments = segments;
 8876            _currentSegment = currentSegment;
 8877            LastConsumable = lastConsumable;
 8878            _startTime = startTime;
 8879            _endTime = endTime;
 8880            _empty = false;
 8881        }
 82
 83        /// <summary>
 84        /// Constructor for mocking, and for creating a Change Feed with no Events.
 85        /// </summary>
 886        public ChangeFeed() { }
 87
 88        // The last segment may still be adding chunks.
 89        public async Task<Page<BlobChangeFeedEvent>> GetPage(
 90            bool async,
 91            int pageSize = Constants.ChangeFeed.DefaultPageSize,
 92            CancellationToken cancellationToken = default)
 93        {
 14494            if (!HasNext())
 95            {
 096                throw new InvalidOperationException("Change feed doesn't have any more events");
 97            }
 98
 14499            if (_currentSegment.DateTime >= _endTime)
 100            {
 0101                return BlobChangeFeedEventPage.Empty();
 102            }
 103
 144104            if (pageSize > Constants.ChangeFeed.DefaultPageSize)
 105            {
 0106                pageSize = Constants.ChangeFeed.DefaultPageSize;
 107            }
 108
 109            // Get next page
 144110            List<BlobChangeFeedEvent> blobChangeFeedEvents = new List<BlobChangeFeedEvent>();
 111
 144112            int remainingEvents = pageSize;
 316113            while (blobChangeFeedEvents.Count < pageSize
 316114                && HasNext())
 115            {
 172116                List<BlobChangeFeedEvent> newEvents = await _currentSegment.GetPage(
 172117                    async,
 172118                    remainingEvents,
 172119                    cancellationToken).ConfigureAwait(false);
 172120                blobChangeFeedEvents.AddRange(newEvents);
 172121                remainingEvents -= newEvents.Count;
 172122                await AdvanceSegmentIfNecessary(
 172123                    async,
 172124                    cancellationToken).ConfigureAwait(false);
 125            }
 126
 144127            return new BlobChangeFeedEventPage(blobChangeFeedEvents, JsonSerializer.Serialize<ChangeFeedCursor>(GetCurso
 144128        }
 129
 130        public bool HasNext()
 131        {
 132            // [If Change Feed is empty], or [current segment is not finalized]
 133            // or ([segment count is 0] and [year count is 0] and [current segment doesn't have next])
 556134            if (_empty
 556135                || _segments.Count == 0
 556136                    && _years.Count == 0
 556137                    && !_currentSegment.HasNext())
 138            {
 44139                return false;
 140            }
 141
 512142            if (_endTime.HasValue)
 143            {
 424144                return _currentSegment.DateTime < _endTime;
 145            }
 146
 88147            return true;
 148        }
 149
 150        internal ChangeFeedCursor GetCursor()
 148151            => new ChangeFeedCursor(
 148152                urlHash: BlobChangeFeedExtensions.ComputeMD5(_containerClient.Uri.AbsoluteUri),
 148153                endDateTime: _endTime,
 148154                currentSegmentCursor: _currentSegment.GetCursor());
 155
 156        private async Task AdvanceSegmentIfNecessary(
 157            bool async,
 158            CancellationToken cancellationToken)
 159        {
 160            // If the current segment has more Events, we don't need to do anything.
 172161            if (_currentSegment.HasNext())
 162            {
 88163                return;
 164            }
 165
 166            // If the current segment is completed, remove it
 84167            if (_segments.Count > 0)
 168            {
 56169                _currentSegment = await _segmentFactory.BuildSegment(
 56170                    async,
 56171                    _segments.Dequeue()).ConfigureAwait(false);
 172            }
 173
 174            // If _segments is empty, refill it
 28175            else if (_segments.Count == 0 && _years.Count > 0)
 176            {
 4177                string yearPath = _years.Dequeue();
 178
 179                // Get Segments for first year
 4180                _segments = await BlobChangeFeedExtensions.GetSegmentsInYearInternal(
 4181                    containerClient: _containerClient,
 4182                    yearPath: yearPath,
 4183                    startTime: _startTime,
 4184                    endTime: _endTime,
 4185                    async: async,
 4186                    cancellationToken: cancellationToken)
 4187                    .ConfigureAwait(false);
 188
 4189                if (_segments.Count > 0)
 190                {
 4191                    _currentSegment = await _segmentFactory.BuildSegment(
 4192                        async,
 4193                        _segments.Dequeue())
 4194                        .ConfigureAwait(false);
 195                }
 196            }
 172197        }
 198
 199        public static ChangeFeed Empty()
 4200             => new ChangeFeed
 4201             {
 4202                 _empty = true
 4203             };
 204    }
 205}