| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.Runtime.CompilerServices; |
| | 7 | | using System.Threading; |
| | 8 | | using System.Threading.Channels; |
| | 9 | | using System.Threading.Tasks; |
| | 10 | | using Azure.Core; |
| | 11 | |
|
| | 12 | | namespace Azure.Messaging.EventHubs.Core |
| | 13 | | { |
| | 14 | | /// <summary> |
| | 15 | | /// The set of extensions for the <see cref="ChannelReader{T}" /> |
| | 16 | | /// class. |
| | 17 | | /// </summary> |
| | 18 | | /// |
| | 19 | | internal static class ChannelReaderExtensions |
| | 20 | | { |
| | 21 | | /// <summary> |
| | 22 | | /// Enumerates the events as they become available in the associated channel. |
| | 23 | | /// </summary> |
| | 24 | | /// |
| | 25 | | /// <typeparam name="T">The type of data contained in the channel.</typeparam> |
| | 26 | | /// |
| | 27 | | /// <param name="reader">The <see cref="ChannelReader{T}" /> instance that this method was invoked on, and from |
| | 28 | | /// <param name="maximumWaitTime">The maximum amount of time to wait to for an event to be available before emit |
| | 29 | | /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> instance to signal the |
| | 30 | | /// |
| | 31 | | /// <returns>An asynchronous enumerator that can be used to iterate over events as they are available.</returns> |
| | 32 | | /// |
| | 33 | | public static async IAsyncEnumerable<T> EnumerateChannel<T>(this ChannelReader<T> reader, |
| | 34 | | TimeSpan? maximumWaitTime, |
| | 35 | | [EnumeratorCancellation]CancellationToken cancellati |
| | 36 | | { |
| 78 | 37 | | Argument.AssertNotNull(reader, nameof(reader)); |
| | 38 | |
|
| 76 | 39 | | if (maximumWaitTime.HasValue) |
| | 40 | | { |
| 12 | 41 | | Argument.AssertNotNegative(maximumWaitTime.Value, nameof(maximumWaitTime)); |
| | 42 | | } |
| | 43 | |
|
| | 44 | |
|
| 74 | 45 | | CancellationToken waitToken = cancellationToken; |
| 74 | 46 | | var waitSource = default(CancellationTokenSource); |
| | 47 | |
|
| | 48 | | try |
| | 49 | | { |
| 39350 | 50 | | while (!cancellationToken.IsCancellationRequested) |
| | 51 | | { |
| 39342 | 52 | | if (reader.TryRead(out T result)) |
| | 53 | | { |
| 5462 | 54 | | waitSource?.CancelAfter(Timeout.Infinite); |
| 5462 | 55 | | yield return result; |
| | 56 | | } |
| 33872 | 57 | | else if (reader.Completion.IsCompleted) |
| | 58 | | { |
| | 59 | | // If the channel was marked as final, then await the completion task to surface any exceptions. |
| | 60 | |
|
| 12 | 61 | | await reader.Completion.ConfigureAwait(false); |
| 4 | 62 | | break; |
| | 63 | | } |
| | 64 | | else |
| | 65 | | { |
| | 66 | | try |
| | 67 | | { |
| 33860 | 68 | | if (maximumWaitTime.HasValue) |
| | 69 | | { |
| 10196 | 70 | | if ((waitSource == null) || (waitSource.IsCancellationRequested)) |
| | 71 | | { |
| 20 | 72 | | waitSource?.Dispose(); |
| 20 | 73 | | waitSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); |
| | 74 | | } |
| | 75 | |
|
| 10196 | 76 | | waitSource.CancelAfter(maximumWaitTime.Value); |
| 10196 | 77 | | waitToken = waitSource.Token; |
| | 78 | | } |
| | 79 | |
|
| | 80 | | // Wait for an item to be available in the channel; if it becomes so, then |
| | 81 | | // reset the loop so that it can be read and emitted. |
| | 82 | |
|
| 33860 | 83 | | if (await reader.WaitToReadAsync(waitToken).ConfigureAwait(false)) |
| | 84 | | { |
| 26 | 85 | | waitSource?.CancelAfter(Timeout.Infinite); |
| 26 | 86 | | continue; |
| | 87 | | } |
| 33790 | 88 | | } |
| 20 | 89 | | catch (OperationCanceledException) |
| | 90 | | { |
| | 91 | | // This is thrown when the wait token expires. It may be caused by the maximum wait time |
| | 92 | | // being exceeded or the main cancellation token being set. Ignore this as an expected |
| | 93 | | // case; if the iteration was canceled, it will be detected in the body of the loop and |
| | 94 | | // appropriate action taken. |
| | 95 | |
|
| 20 | 96 | | waitSource?.Dispose(); |
| 20 | 97 | | waitSource = null; |
| 20 | 98 | | } |
| | 99 | |
|
| | 100 | | // If the wait token was set, but the main cancellation token was not, then the wait time was |
| | 101 | | // exceeded and a default item needs to be emitted. |
| | 102 | |
|
| 33810 | 103 | | if ((waitToken.IsCancellationRequested) && (!cancellationToken.IsCancellationRequested)) |
| | 104 | | { |
| 18 | 105 | | yield return default; |
| | 106 | | } |
| | 107 | | } |
| | 108 | | } |
| 34 | 109 | | } |
| | 110 | | finally |
| | 111 | | { |
| 74 | 112 | | waitSource?.Dispose(); |
| | 113 | | } |
| | 114 | |
|
| 12 | 115 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 116 | | yield break; |
| 28 | 117 | | } |
| | 118 | | } |
| | 119 | | } |