< Summary

Class:Azure.Messaging.EventHubs.Core.ChannelReaderExtensions
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Core\ChannelReaderExtensions.cs
Covered lines:33
Uncovered lines:0
Coverable lines:33
Total lines:119
Line coverage:100% (33 of 33)
Covered branches:55
Total branches:58
Branch coverage:94.8% (55 of 58)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
EnumerateChannel()-100%94.83%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Core\ChannelReaderExtensions.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Runtime.CompilerServices;
 7using System.Threading;
 8using System.Threading.Channels;
 9using System.Threading.Tasks;
 10using Azure.Core;
 11
 12namespace Azure.Messaging.EventHubs.Core
 13{
 14    /// <summary>
 15    ///   The set of extensions for the <see cref="ChannelReader{T}" />
 16    ///   class.
 17    /// </summary>
 18    ///
 19    internal static class ChannelReaderExtensions
 20    {
 21        /// <summary>
 22        ///   Enumerates the events as they become available in the associated channel.
 23        /// </summary>
 24        ///
 25        /// <typeparam name="T">The type of data contained in the channel.</typeparam>
 26        ///
 27        /// <param name="reader">The <see cref="ChannelReader{T}" /> instance that this method was invoked on, and from 
 28        /// <param name="maximumWaitTime">The maximum amount of time to wait to for an event to be available before emit
 29        /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> instance to signal the 
 30        ///
 31        /// <returns>An asynchronous enumerator that can be used to iterate over events as they are available.</returns>
 32        ///
 33        public static async IAsyncEnumerable<T> EnumerateChannel<T>(this ChannelReader<T> reader,
 34                                                                    TimeSpan? maximumWaitTime,
 35                                                                    [EnumeratorCancellation]CancellationToken cancellati
 36        {
 7837            Argument.AssertNotNull(reader, nameof(reader));
 38
 7639            if (maximumWaitTime.HasValue)
 40            {
 1241                Argument.AssertNotNegative(maximumWaitTime.Value, nameof(maximumWaitTime));
 42            }
 43
 44
 7445            CancellationToken waitToken = cancellationToken;
 7446            var waitSource = default(CancellationTokenSource);
 47
 48            try
 49            {
 3935050                while (!cancellationToken.IsCancellationRequested)
 51                {
 3934252                    if (reader.TryRead(out T result))
 53                    {
 546254                        waitSource?.CancelAfter(Timeout.Infinite);
 546255                        yield return result;
 56                    }
 3387257                    else if (reader.Completion.IsCompleted)
 58                    {
 59                        // If the channel was marked as final, then await the completion task to surface any exceptions.
 60
 1261                        await reader.Completion.ConfigureAwait(false);
 462                        break;
 63                    }
 64                    else
 65                    {
 66                        try
 67                        {
 3386068                            if (maximumWaitTime.HasValue)
 69                            {
 1019670                                if ((waitSource == null) || (waitSource.IsCancellationRequested))
 71                                {
 2072                                    waitSource?.Dispose();
 2073                                    waitSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 74                                }
 75
 1019676                                waitSource.CancelAfter(maximumWaitTime.Value);
 1019677                                waitToken = waitSource.Token;
 78                            }
 79
 80                            // Wait for an item to be available in the channel; if it becomes so, then
 81                            // reset the loop so that it can be read and emitted.
 82
 3386083                            if (await reader.WaitToReadAsync(waitToken).ConfigureAwait(false))
 84                            {
 2685                                waitSource?.CancelAfter(Timeout.Infinite);
 2686                                continue;
 87                            }
 3379088                        }
 2089                        catch (OperationCanceledException)
 90                        {
 91                            // This is thrown when the wait token expires.  It may be caused by the maximum wait time
 92                            // being exceeded or the main cancellation token being set.  Ignore this as an expected
 93                            // case; if the iteration was canceled, it will be detected in the body of the loop and
 94                            // appropriate action taken.
 95
 2096                            waitSource?.Dispose();
 2097                            waitSource = null;
 2098                        }
 99
 100                        // If the wait token was set, but the main cancellation token was not, then the wait time was
 101                        // exceeded and a default item needs to be emitted.
 102
 33810103                        if ((waitToken.IsCancellationRequested) && (!cancellationToken.IsCancellationRequested))
 104                        {
 18105                            yield return default;
 106                        }
 107                    }
 108                }
 34109            }
 110            finally
 111            {
 74112                waitSource?.Dispose();
 113            }
 114
 12115            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 4116            yield break;
 28117        }
 118    }
 119}

Methods/Properties

EnumerateChannel()