| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.Text.Json; |
| | 7 | | using System.Threading.Tasks; |
| | 8 | | using Azure.Storage.Blobs.Models; |
| | 9 | | using System.Threading; |
| | 10 | |
|
| | 11 | | namespace Azure.Storage.Blobs.ChangeFeed |
| | 12 | | { |
| | 13 | | internal class ChangeFeed |
| | 14 | | { |
| | 15 | | /// <summary> |
| | 16 | | /// BlobContainerClient for making List Blob requests and creating Segments. |
| | 17 | | /// </summary> |
| | 18 | | private readonly BlobContainerClient _containerClient; |
| | 19 | |
|
| | 20 | | /// <summary> |
| | 21 | | /// A <see cref="SegmentFactory"/> for creating new <see cref="Segment"/>s. |
| | 22 | | /// </summary> |
| | 23 | | private readonly SegmentFactory _segmentFactory; |
| | 24 | |
|
| | 25 | | /// <summary> |
| | 26 | | /// Queue of paths to years we haven't processed yet. |
| | 27 | | /// </summary> |
| | 28 | | private readonly Queue<string> _years; |
| | 29 | |
|
| | 30 | | /// <summary> |
| | 31 | | /// Paths to segments in the current year we haven't processed yet. |
| | 32 | | /// </summary> |
| | 33 | | private Queue<string> _segments; |
| | 34 | |
|
| | 35 | | /// <summary> |
| | 36 | | /// The Segment we are currently processing. |
| | 37 | | /// </summary> |
| | 38 | | private Segment _currentSegment; |
| | 39 | |
|
| | 40 | | /// <summary> |
| | 41 | | /// The latest time the Change Feed can safely be read from. |
| | 42 | | /// </summary> |
| 0 | 43 | | public DateTimeOffset LastConsumable { get; private set; } |
| | 44 | |
|
| | 45 | | /// <summary> |
| | 46 | | /// User-specified start time. If the start time occurs before Change Feed was enabled |
| | 47 | | /// for this account, we will start at the beginning of the Change Feed. |
| | 48 | | /// </summary> |
| | 49 | | private DateTimeOffset? _startTime; |
| | 50 | |
|
| | 51 | | /// <summary> |
| | 52 | | /// User-specified end time. If the end time occurs after _lastConsumable, we will |
| | 53 | | /// end at _lastConsumable. |
| | 54 | | /// </summary> |
| | 55 | | private DateTimeOffset? _endTime; |
| | 56 | |
|
| | 57 | | /// <summary> |
| | 58 | | /// If this Change Feed has no events. |
| | 59 | | /// </summary> |
| | 60 | | private bool _empty; |
| | 61 | |
|
| 88 | 62 | | public ChangeFeed( |
| 88 | 63 | | BlobContainerClient containerClient, |
| 88 | 64 | | SegmentFactory segmentFactory, |
| 88 | 65 | | Queue<string> years, |
| 88 | 66 | | Queue<string> segments, |
| 88 | 67 | | Segment currentSegment, |
| 88 | 68 | | DateTimeOffset lastConsumable, |
| 88 | 69 | | DateTimeOffset? startTime, |
| 88 | 70 | | DateTimeOffset? endTime) |
| | 71 | | { |
| 88 | 72 | | _containerClient = containerClient; |
| 88 | 73 | | _segmentFactory = segmentFactory; |
| 88 | 74 | | _years = years; |
| 88 | 75 | | _segments = segments; |
| 88 | 76 | | _currentSegment = currentSegment; |
| 88 | 77 | | LastConsumable = lastConsumable; |
| 88 | 78 | | _startTime = startTime; |
| 88 | 79 | | _endTime = endTime; |
| 88 | 80 | | _empty = false; |
| 88 | 81 | | } |
| | 82 | |
|
| | 83 | | /// <summary> |
| | 84 | | /// Constructor for mocking, and for creating a Change Feed with no Events. |
| | 85 | | /// </summary> |
| 8 | 86 | | public ChangeFeed() { } |
| | 87 | |
|
| | 88 | | // The last segment may still be adding chunks. |
| | 89 | | public async Task<Page<BlobChangeFeedEvent>> GetPage( |
| | 90 | | bool async, |
| | 91 | | int pageSize = Constants.ChangeFeed.DefaultPageSize, |
| | 92 | | CancellationToken cancellationToken = default) |
| | 93 | | { |
| 144 | 94 | | if (!HasNext()) |
| | 95 | | { |
| 0 | 96 | | throw new InvalidOperationException("Change feed doesn't have any more events"); |
| | 97 | | } |
| | 98 | |
|
| 144 | 99 | | if (_currentSegment.DateTime >= _endTime) |
| | 100 | | { |
| 0 | 101 | | return BlobChangeFeedEventPage.Empty(); |
| | 102 | | } |
| | 103 | |
|
| 144 | 104 | | if (pageSize > Constants.ChangeFeed.DefaultPageSize) |
| | 105 | | { |
| 0 | 106 | | pageSize = Constants.ChangeFeed.DefaultPageSize; |
| | 107 | | } |
| | 108 | |
|
| | 109 | | // Get next page |
| 144 | 110 | | List<BlobChangeFeedEvent> blobChangeFeedEvents = new List<BlobChangeFeedEvent>(); |
| | 111 | |
|
| 144 | 112 | | int remainingEvents = pageSize; |
| 316 | 113 | | while (blobChangeFeedEvents.Count < pageSize |
| 316 | 114 | | && HasNext()) |
| | 115 | | { |
| 172 | 116 | | List<BlobChangeFeedEvent> newEvents = await _currentSegment.GetPage( |
| 172 | 117 | | async, |
| 172 | 118 | | remainingEvents, |
| 172 | 119 | | cancellationToken).ConfigureAwait(false); |
| 172 | 120 | | blobChangeFeedEvents.AddRange(newEvents); |
| 172 | 121 | | remainingEvents -= newEvents.Count; |
| 172 | 122 | | await AdvanceSegmentIfNecessary( |
| 172 | 123 | | async, |
| 172 | 124 | | cancellationToken).ConfigureAwait(false); |
| | 125 | | } |
| | 126 | |
|
| 144 | 127 | | return new BlobChangeFeedEventPage(blobChangeFeedEvents, JsonSerializer.Serialize<ChangeFeedCursor>(GetCurso |
| 144 | 128 | | } |
| | 129 | |
|
| | 130 | | public bool HasNext() |
| | 131 | | { |
| | 132 | | // [If Change Feed is empty], or [current segment is not finalized] |
| | 133 | | // or ([segment count is 0] and [year count is 0] and [current segment doesn't have next]) |
| 556 | 134 | | if (_empty |
| 556 | 135 | | || _segments.Count == 0 |
| 556 | 136 | | && _years.Count == 0 |
| 556 | 137 | | && !_currentSegment.HasNext()) |
| | 138 | | { |
| 44 | 139 | | return false; |
| | 140 | | } |
| | 141 | |
|
| 512 | 142 | | if (_endTime.HasValue) |
| | 143 | | { |
| 424 | 144 | | return _currentSegment.DateTime < _endTime; |
| | 145 | | } |
| | 146 | |
|
| 88 | 147 | | return true; |
| | 148 | | } |
| | 149 | |
|
| | 150 | | internal ChangeFeedCursor GetCursor() |
| 148 | 151 | | => new ChangeFeedCursor( |
| 148 | 152 | | urlHash: BlobChangeFeedExtensions.ComputeMD5(_containerClient.Uri.AbsoluteUri), |
| 148 | 153 | | endDateTime: _endTime, |
| 148 | 154 | | currentSegmentCursor: _currentSegment.GetCursor()); |
| | 155 | |
|
| | 156 | | private async Task AdvanceSegmentIfNecessary( |
| | 157 | | bool async, |
| | 158 | | CancellationToken cancellationToken) |
| | 159 | | { |
| | 160 | | // If the current segment has more Events, we don't need to do anything. |
| 172 | 161 | | if (_currentSegment.HasNext()) |
| | 162 | | { |
| 88 | 163 | | return; |
| | 164 | | } |
| | 165 | |
|
| | 166 | | // If the current segment is completed, remove it |
| 84 | 167 | | if (_segments.Count > 0) |
| | 168 | | { |
| 56 | 169 | | _currentSegment = await _segmentFactory.BuildSegment( |
| 56 | 170 | | async, |
| 56 | 171 | | _segments.Dequeue()).ConfigureAwait(false); |
| | 172 | | } |
| | 173 | |
|
| | 174 | | // If _segments is empty, refill it |
| 28 | 175 | | else if (_segments.Count == 0 && _years.Count > 0) |
| | 176 | | { |
| 4 | 177 | | string yearPath = _years.Dequeue(); |
| | 178 | |
|
| | 179 | | // Get Segments for first year |
| 4 | 180 | | _segments = await BlobChangeFeedExtensions.GetSegmentsInYearInternal( |
| 4 | 181 | | containerClient: _containerClient, |
| 4 | 182 | | yearPath: yearPath, |
| 4 | 183 | | startTime: _startTime, |
| 4 | 184 | | endTime: _endTime, |
| 4 | 185 | | async: async, |
| 4 | 186 | | cancellationToken: cancellationToken) |
| 4 | 187 | | .ConfigureAwait(false); |
| | 188 | |
|
| 4 | 189 | | if (_segments.Count > 0) |
| | 190 | | { |
| 4 | 191 | | _currentSegment = await _segmentFactory.BuildSegment( |
| 4 | 192 | | async, |
| 4 | 193 | | _segments.Dequeue()) |
| 4 | 194 | | .ConfigureAwait(false); |
| | 195 | | } |
| | 196 | | } |
| 172 | 197 | | } |
| | 198 | |
|
| | 199 | | public static ChangeFeed Empty() |
| 4 | 200 | | => new ChangeFeed |
| 4 | 201 | | { |
| 4 | 202 | | _empty = true |
| 4 | 203 | | }; |
| | 204 | | } |
| | 205 | | } |