| | | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | | 2 | | // Licensed under the MIT License. |
| | | 3 | | |
| | | 4 | | using System; |
| | | 5 | | using System.Collections.Generic; |
| | | 6 | | using System.Runtime.CompilerServices; |
| | | 7 | | using System.Threading; |
| | | 8 | | using System.Threading.Channels; |
| | | 9 | | using System.Threading.Tasks; |
| | | 10 | | using Azure.Core; |
| | | 11 | | |
| | | 12 | | namespace Azure.Messaging.EventHubs.Core |
| | | 13 | | { |
| | | 14 | | /// <summary> |
| | | 15 | | /// The set of extensions for the <see cref="ChannelReader{T}" /> |
| | | 16 | | /// class. |
| | | 17 | | /// </summary> |
| | | 18 | | /// |
| | | 19 | | internal static class ChannelReaderExtensions |
| | | 20 | | { |
| | | 21 | | /// <summary> |
| | | 22 | | /// Enumerates the events as they become available in the associated channel. |
| | | 23 | | /// </summary> |
| | | 24 | | /// |
| | | 25 | | /// <typeparam name="T">The type of data contained in the channel.</typeparam> |
| | | 26 | | /// |
| | | 27 | | /// <param name="reader">The <see cref="ChannelReader{T}" /> instance that this method was invoked on, and from |
| | | 28 | | /// <param name="maximumWaitTime">The maximum amount of time to wait to for an event to be available before emit |
| | | 29 | | /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> instance to signal the |
| | | 30 | | /// |
| | | 31 | | /// <returns>An asynchronous enumerator that can be used to iterate over events as they are available.</returns> |
| | | 32 | | /// |
| | | 33 | | public static async IAsyncEnumerable<T> EnumerateChannel<T>(this ChannelReader<T> reader, |
| | | 34 | | TimeSpan? maximumWaitTime, |
| | | 35 | | [EnumeratorCancellation]CancellationToken cancellati |
| | | 36 | | { |
| | 78 | 37 | | Argument.AssertNotNull(reader, nameof(reader)); |
| | | 38 | | |
| | 76 | 39 | | if (maximumWaitTime.HasValue) |
| | | 40 | | { |
| | 12 | 41 | | Argument.AssertNotNegative(maximumWaitTime.Value, nameof(maximumWaitTime)); |
| | | 42 | | } |
| | | 43 | | |
| | | 44 | | |
| | 74 | 45 | | CancellationToken waitToken = cancellationToken; |
| | 74 | 46 | | var waitSource = default(CancellationTokenSource); |
| | | 47 | | |
| | | 48 | | try |
| | | 49 | | { |
| | 39350 | 50 | | while (!cancellationToken.IsCancellationRequested) |
| | | 51 | | { |
| | 39342 | 52 | | if (reader.TryRead(out T result)) |
| | | 53 | | { |
| | 5462 | 54 | | waitSource?.CancelAfter(Timeout.Infinite); |
| | 5462 | 55 | | yield return result; |
| | | 56 | | } |
| | 33872 | 57 | | else if (reader.Completion.IsCompleted) |
| | | 58 | | { |
| | | 59 | | // If the channel was marked as final, then await the completion task to surface any exceptions. |
| | | 60 | | |
| | 12 | 61 | | await reader.Completion.ConfigureAwait(false); |
| | 4 | 62 | | break; |
| | | 63 | | } |
| | | 64 | | else |
| | | 65 | | { |
| | | 66 | | try |
| | | 67 | | { |
| | 33860 | 68 | | if (maximumWaitTime.HasValue) |
| | | 69 | | { |
| | 10196 | 70 | | if ((waitSource == null) || (waitSource.IsCancellationRequested)) |
| | | 71 | | { |
| | 20 | 72 | | waitSource?.Dispose(); |
| | 20 | 73 | | waitSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); |
| | | 74 | | } |
| | | 75 | | |
| | 10196 | 76 | | waitSource.CancelAfter(maximumWaitTime.Value); |
| | 10196 | 77 | | waitToken = waitSource.Token; |
| | | 78 | | } |
| | | 79 | | |
| | | 80 | | // Wait for an item to be available in the channel; if it becomes so, then |
| | | 81 | | // reset the loop so that it can be read and emitted. |
| | | 82 | | |
| | 33860 | 83 | | if (await reader.WaitToReadAsync(waitToken).ConfigureAwait(false)) |
| | | 84 | | { |
| | 26 | 85 | | waitSource?.CancelAfter(Timeout.Infinite); |
| | 26 | 86 | | continue; |
| | | 87 | | } |
| | 33790 | 88 | | } |
| | 20 | 89 | | catch (OperationCanceledException) |
| | | 90 | | { |
| | | 91 | | // This is thrown when the wait token expires. It may be caused by the maximum wait time |
| | | 92 | | // being exceeded or the main cancellation token being set. Ignore this as an expected |
| | | 93 | | // case; if the iteration was canceled, it will be detected in the body of the loop and |
| | | 94 | | // appropriate action taken. |
| | | 95 | | |
| | 20 | 96 | | waitSource?.Dispose(); |
| | 20 | 97 | | waitSource = null; |
| | 20 | 98 | | } |
| | | 99 | | |
| | | 100 | | // If the wait token was set, but the main cancellation token was not, then the wait time was |
| | | 101 | | // exceeded and a default item needs to be emitted. |
| | | 102 | | |
| | 33810 | 103 | | if ((waitToken.IsCancellationRequested) && (!cancellationToken.IsCancellationRequested)) |
| | | 104 | | { |
| | 18 | 105 | | yield return default; |
| | | 106 | | } |
| | | 107 | | } |
| | | 108 | | } |
| | 34 | 109 | | } |
| | | 110 | | finally |
| | | 111 | | { |
| | 74 | 112 | | waitSource?.Dispose(); |
| | | 113 | | } |
| | | 114 | | |
| | 12 | 115 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 4 | 116 | | yield break; |
| | 28 | 117 | | } |
| | | 118 | | } |
| | | 119 | | } |