< Summary

Class:Azure.Messaging.EventHubs.Consumer.EventHubConsumerClient
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Consumer\EventHubConsumerClient.cs
Covered lines:289
Uncovered lines:41
Coverable lines:330
Total lines:1034
Line coverage:87.5% (289 of 330)
Covered branches:116
Total branches:144
Branch coverage:80.5% (116 of 144)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-89.29%100%
get_FullyQualifiedNamespace()-100%100%
get_EventHubName()-100%100%
get_ConsumerGroup()-100%100%
get_IsClosed()-100%100%
set_IsClosed(...)-100%100%
get_OwnsConnection()-0%100%
get_RetryPolicy()-100%100%
get_Connection()-100%100%
get_ActiveConsumers()-0%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor()-0%100%
GetEventHubPropertiesAsync()-100%100%
GetPartitionIdsAsync()-100%100%
GetPartitionPropertiesAsync()-100%100%
ReadEventsFromPartitionAsync(...)-100%100%
ReadEventsFromPartitionAsync()-82.61%90.48%
ReadEventsAsync(...)-100%100%
ReadEventsAsync(...)-100%100%
ReadEventsAsync()-74.19%71.88%
CloseAsync()-82.14%87.5%
DisposeAsync()-0%100%
Equals(...)-0%100%
GetHashCode()-100%100%
ToString()-0%100%
PublishPartitionEventsToChannelAsync()-85.11%72.22%
<PublishPartitionEventsToChannelAsync()-100%87.5%
StartBackgroundChannelPublishingAsync(...)-95.96%100%
CreateEventChannel(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Consumer\EventHubConsumerClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Concurrent;
 6using System.Collections.Generic;
 7using System.ComponentModel;
 8using System.Diagnostics.CodeAnalysis;
 9using System.Globalization;
 10using System.Linq;
 11using System.Runtime.CompilerServices;
 12using System.Runtime.ExceptionServices;
 13using System.Threading;
 14using System.Threading.Channels;
 15using System.Threading.Tasks;
 16using Azure.Core;
 17using Azure.Messaging.EventHubs.Core;
 18using Azure.Messaging.EventHubs.Diagnostics;
 19
 20namespace Azure.Messaging.EventHubs.Consumer
 21{
 22    /// <summary>
 23    ///   A client responsible for reading <see cref="EventData" /> from a specific Event Hub
 24    ///   as a member of a specific consumer group.
 25    ///
 26    ///   A consumer may be exclusive, which asserts ownership over associated partitions for the consumer
 27    ///   group to ensure that only one consumer from that group is reading the from the partition.
 28    ///   These exclusive consumers are sometimes referred to as "Epoch Consumers."
 29    ///
 30    ///   A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
 31    ///   group to be actively reading events from a given partition.  These non-exclusive consumers are
 32    ///   sometimes referred to as "Non-Epoch Consumers."
 33    /// </summary>
 34    ///
 35    public class EventHubConsumerClient : IAsyncDisposable
 36    {
 37        /// <summary>The name of the default consumer group in the Event Hubs service.</summary>
 38        public const string DefaultConsumerGroupName = "$Default";
 39
 40        /// <summary>The maximum wait time for receiving an event batch for the background publishing operation used for
 041        private readonly TimeSpan BackgroundPublishingWaitTime = TimeSpan.FromMilliseconds(250);
 42
 43        /// <summary>Indicates whether or not this instance has been closed.</summary>
 44        private volatile bool _closed = false;
 45
 46        /// <summary>
 47        ///   The fully qualified Event Hubs namespace that the consumer is associated with.  This is likely
 48        ///   to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
 49        /// </summary>
 50        ///
 251        public string FullyQualifiedNamespace => Connection.FullyQualifiedNamespace;
 52
 53        /// <summary>
 54        ///   The name of the Event Hub that the consumer is connected to, specific to the
 55        ///   Event Hubs namespace that contains it.
 56        /// </summary>
 57        ///
 43458        public string EventHubName => Connection.EventHubName;
 59
 60        /// <summary>
 61        ///   The name of the consumer group that this consumer is associated with.  Events will be read
 62        ///   only in the context of this group.
 63        /// </summary>
 64        ///
 14865        public string ConsumerGroup { get; }
 66
 67        /// <summary>
 68        ///   Indicates whether or not this <see cref="EventHubConsumerClient"/> has been closed.
 69        /// </summary>
 70        ///
 71        /// <value>
 72        ///   <c>true</c> if the client is closed; otherwise, <c>false</c>.
 73        /// </value>
 74        ///
 75        public bool IsClosed
 76        {
 18077            get => _closed;
 1278            protected set => _closed = value;
 79        }
 80
 81        /// <summary>
 82        ///   Indicates whether the client has ownership of the associated <see cref="EventHubConnection" />
 83        ///   and should take responsibility for managing its lifespan.
 84        /// </summary>
 85        ///
 086        private bool OwnsConnection { get; } = true;
 87
 88        /// <summary>
 89        ///   The policy to use for determining retry behavior for when an operation fails.
 90        /// </summary>
 91        ///
 28692        private EventHubsRetryPolicy RetryPolicy { get; }
 93
 94        /// <summary>
 95        ///   The active connection to the Azure Event Hubs service, enabling client communications for metadata
 96        ///   about the associated Event Hub and access to transport-aware consumers.
 97        /// </summary>
 98        ///
 65299        private EventHubConnection Connection { get; }
 100
 101        /// <summary>
 102        ///   The set of active Event Hub transport-specific consumers created by this client for use with
 103        ///   delegated operations.
 104        /// </summary>
 105        ///
 0106        private ConcurrentDictionary<string, TransportConsumer> ActiveConsumers { get; } = new ConcurrentDictionary<stri
 107
 108        /// <summary>
 109        ///   Initializes a new instance of the <see cref="EventHubConsumerClient"/> class.
 110        /// </summary>
 111        ///
 112        /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea
 113        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 114        ///
 115        /// <remarks>
 116        ///   If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of t
 117        ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]
 118        ///   connection string.  For example, ";EntityPath=telemetry-hub".
 119        ///
 120        ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s
 121        ///   Event Hub will result in a connection string that contains the name.
 122        /// </remarks>
 123        ///
 124        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 125        ///
 126        public EventHubConsumerClient(string consumerGroup,
 22127                                      string connectionString) : this(consumerGroup, connectionString, null, null)
 128        {
 6129        }
 130
 131        /// <summary>
 132        ///   Initializes a new instance of the <see cref="EventHubConsumerClient"/> class.
 133        /// </summary>
 134        ///
 135        /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea
 136        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 137        /// <param name="clientOptions">The set of options to use for this consumer.</param>
 138        ///
 139        /// <remarks>
 140        ///   If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of t
 141        ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]
 142        ///   connection string.  For example, ";EntityPath=telemetry-hub".
 143        ///
 144        ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s
 145        ///   Event Hub will result in a connection string that contains the name.
 146        /// </remarks>
 147        ///
 148        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 149        ///
 150        public EventHubConsumerClient(string consumerGroup,
 151                                      string connectionString,
 6152                                      EventHubConsumerClientOptions clientOptions) : this(consumerGroup, connectionStrin
 153        {
 2154        }
 155
 156        /// <summary>
 157        ///   Initializes a new instance of the <see cref="EventHubConsumerClient"/> class.
 158        /// </summary>
 159        ///
 160        /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea
 161        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 162        /// <param name="eventHubName">The name of the specific Event Hub to associate the consumer with.</param>
 163        ///
 164        /// <remarks>
 165        ///   If the connection string is copied from the Event Hub itself, it will contain the name of the desired Even
 166        ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub 
 167        ///   passed only once, either as part of the connection string or separately.
 168        /// </remarks>
 169        ///
 170        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 171        ///
 172        public EventHubConsumerClient(string consumerGroup,
 173                                      string connectionString,
 4174                                      string eventHubName) : this(consumerGroup, connectionString, eventHubName, null)
 175        {
 2176        }
 177
 178        /// <summary>
 179        ///   Initializes a new instance of the <see cref="EventHubConsumerClient"/> class.
 180        /// </summary>
 181        ///
 182        /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea
 183        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 184        /// <param name="eventHubName">The name of the specific Event Hub to associate the consumer with.</param>
 185        /// <param name="clientOptions">A set of options to apply when configuring the consumer.</param>
 186        ///
 187        /// <remarks>
 188        ///   If the connection string is copied from the Event Hub itself, it will contain the name of the desired Even
 189        ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub 
 190        ///   passed only once, either as part of the connection string or separately.
 191        /// </remarks>
 192        ///
 193        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 194        ///
 36195        public EventHubConsumerClient(string consumerGroup,
 36196                                      string connectionString,
 36197                                      string eventHubName,
 36198                                      EventHubConsumerClientOptions clientOptions)
 199        {
 36200            Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
 32201            Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString));
 202
 24203            clientOptions = clientOptions?.Clone() ?? new EventHubConsumerClientOptions();
 204
 24205            OwnsConnection = true;
 24206            Connection = new EventHubConnection(connectionString, eventHubName, clientOptions.ConnectionOptions);
 10207            ConsumerGroup = consumerGroup;
 10208            RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
 10209        }
 210
 211        /// <summary>
 212        ///   Initializes a new instance of the <see cref="EventHubConsumerClient"/> class.
 213        /// </summary>
 214        ///
 215        /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea
 216        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel
 217        /// <param name="eventHubName">The name of the specific Event Hub to associate the consumer with.</param>
 218        /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma
 219        /// <param name="clientOptions">A set of options to apply when configuring the consumer.</param>
 220        ///
 22221        public EventHubConsumerClient(string consumerGroup,
 22222                                      string fullyQualifiedNamespace,
 22223                                      string eventHubName,
 22224                                      TokenCredential credential,
 22225                                      EventHubConsumerClientOptions clientOptions = default)
 226        {
 22227            Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
 18228            Argument.AssertWellFormedEventHubsNamespace(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace));
 12229            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 8230            Argument.AssertNotNull(credential, nameof(credential));
 231
 6232            clientOptions = clientOptions?.Clone() ?? new EventHubConsumerClientOptions();
 233
 6234            OwnsConnection = true;
 6235            Connection = new EventHubConnection(fullyQualifiedNamespace, eventHubName, credential, clientOptions.Connect
 6236            ConsumerGroup = consumerGroup;
 6237            RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
 6238        }
 239
 240        /// <summary>
 241        ///   Initializes a new instance of the <see cref="EventHubConsumerClient"/> class.
 242        /// </summary>
 243        ///
 244        /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea
 245        /// <param name="connection">The <see cref="EventHubConnection" /> connection to use for communication with the 
 246        /// <param name="clientOptions">A set of options to apply when configuring the consumer.</param>
 247        ///
 128248        public EventHubConsumerClient(string consumerGroup,
 128249                                      EventHubConnection connection,
 128250                                      EventHubConsumerClientOptions clientOptions = default)
 251        {
 128252            Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
 128253            Argument.AssertNotNull(connection, nameof(connection));
 126254            clientOptions = clientOptions?.Clone() ?? new EventHubConsumerClientOptions();
 255
 126256            OwnsConnection = false;
 126257            Connection = connection;
 126258            ConsumerGroup = consumerGroup;
 126259            RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
 126260        }
 261
 262        /// <summary>
 263        ///   Initializes a new instance of the <see cref="EventHubConsumerClient"/> class.
 264        /// </summary>
 265        ///
 0266        protected EventHubConsumerClient()
 267        {
 0268            OwnsConnection = false;
 0269        }
 270
 271        /// <summary>
 272        ///   Retrieves information about the Event Hub that the connection is associated with, including
 273        ///   the number of partitions present and their identifiers.
 274        /// </summary>
 275        ///
 276        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 277        ///
 278        /// <returns>The set of information for the Event Hub that this client is associated with.</returns>
 279        ///
 280        public virtual async Task<EventHubProperties> GetEventHubPropertiesAsync(CancellationToken cancellationToken = d
 281        {
 2282            Argument.AssertNotClosed(IsClosed, nameof(EventHubConsumerClient));
 2283            return await Connection.GetPropertiesAsync(RetryPolicy, cancellationToken).ConfigureAwait(false);
 2284        }
 285
 286        /// <summary>
 287        ///   Retrieves the set of identifiers for the partitions of an Event Hub.
 288        /// </summary>
 289        ///
 290        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 291        ///
 292        /// <returns>The set of identifiers for the partitions within the Event Hub that this client is associated with.
 293        ///
 294        /// <remarks>
 295        ///   This method is synonymous with invoking <see cref="GetEventHubPropertiesAsync(CancellationToken)" /> and r
 296        ///   property that is returned. It is offered as a convenience for quick access to the set of partition identif
 297        ///   No new or extended information is presented.
 298        /// </remarks>
 299        ///
 300        public virtual async Task<string[]> GetPartitionIdsAsync(CancellationToken cancellationToken = default)
 301        {
 302
 66303            Argument.AssertNotClosed(IsClosed, nameof(EventHubConsumerClient));
 66304            return await Connection.GetPartitionIdsAsync(RetryPolicy, cancellationToken).ConfigureAwait(false);
 66305        }
 306
 307        /// <summary>
 308        ///   Retrieves information about a specific partition for an Event Hub, including elements that describe the av
 309        ///   events in the partition event stream.
 310        /// </summary>
 311        ///
 312        /// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
 313        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 314        ///
 315        /// <returns>The set of information for the requested partition under the Event Hub this client is associated wi
 316        ///
 317        public virtual async Task<PartitionProperties> GetPartitionPropertiesAsync(string partitionId,
 318                                                                             CancellationToken cancellationToken = defau
 319        {
 2320            Argument.AssertNotClosed(IsClosed, nameof(EventHubConsumerClient));
 2321            return await Connection.GetPartitionPropertiesAsync(partitionId, RetryPolicy, cancellationToken).ConfigureAw
 2322        }
 323
 324        /// <summary>
 325        ///   Reads events from the requested partition as an asynchronous enumerable, allowing events to be iterated as
 326        ///   become available on the partition, waiting as necessary should there be no events available.
 327        ///
 328        ///   This enumerator may block for an indeterminate amount of time for an <c>await</c> if events are not availa
 329        ///   cancellation via the <paramref name="cancellationToken"/> to be requested in order to return control.  It 
 330        ///   which accepts a set of options for configuring read behavior for scenarios where a more deterministic maxi
 331        /// </summary>
 332        ///
 333        /// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</par
 334        /// <param name="startingPosition">The position within the partition where the consumer should begin reading eve
 335        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 336        ///
 337        /// <returns>An <see cref="IAsyncEnumerable{T}"/> to be used for iterating over events in the partition.</return
 338        ///
 339        /// <remarks>
 340        ///   Each reader of events is presented with an independent iterator; if there are multiple readers, each recei
 341        ///   process, rather than competing for them.
 342        /// </remarks>
 343        ///
 344        /// <seealso cref="ReadEventsFromPartitionAsync(string, EventPosition, ReadEventOptions, CancellationToken)"/>
 345        ///
 346        public virtual IAsyncEnumerable<PartitionEvent> ReadEventsFromPartitionAsync(string partitionId,
 347                                                                                     EventPosition startingPosition,
 28348                                                                                     CancellationToken cancellationToken
 349
 350        /// <summary>
 351        ///   Reads events from the requested partition as an asynchronous enumerable, allowing events to be iterated as
 352        ///   become available on the partition, waiting as necessary should there be no events available.
 353        ///
 354        ///   This enumerator may block for an indeterminate amount of time for an <c>await</c> if events are not availa
 355        ///   cancellation via the <paramref name="cancellationToken"/> to be requested in order to return control.  It 
 356        ///   <see cref="ReadEventOptions.MaximumWaitTime" /> for scenarios where a more deterministic maximum waiting p
 357        /// </summary>
 358        ///
 359        /// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</par
 360        /// <param name="startingPosition">The position within the partition where the consumer should begin reading eve
 361        /// <param name="readOptions">The set of options to use for configuring read behavior; if not specified the defa
 362        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 363        ///
 364        /// <returns>An <see cref="IAsyncEnumerable{T}"/> to be used for iterating over events in the partition.</return
 365        ///
 366        /// <remarks>
 367        ///   Each reader of events is presented with an independent iterator; if there are multiple readers, each recei
 368        ///   process, rather than competing for them.
 369        /// </remarks>
 370        ///
 371        /// <seealso cref="ReadEventsFromPartitionAsync(string, EventPosition, CancellationToken)"/>
 372        ///
 373        public virtual async IAsyncEnumerable<PartitionEvent> ReadEventsFromPartitionAsync(string partitionId,
 374                                                                                           EventPosition startingPositio
 375                                                                                           ReadEventOptions readOptions,
 376                                                                                           [EnumeratorCancellation] Canc
 377        {
 42378            Argument.AssertNotClosed(IsClosed, nameof(EventHubConsumerClient));
 40379            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 380
 38381            EventHubsEventSource.Log.ReadEventsFromPartitionStart(EventHubName, partitionId);
 38382            readOptions = readOptions?.Clone() ?? new ReadEventOptions();
 383
 38384            var transportConsumer = default(TransportConsumer);
 38385            var partitionContext = default(PartitionContext);
 38386            var emptyPartitionEvent = default(PartitionEvent);
 38387            var closeException = default(Exception);
 388
 389            try
 390            {
 391                // Attempt to initialize the common objects. These will be used during the read and emit operations
 392                // that follow.  Because catch blocks cannot be used when yielding items, this step is wrapped individua
 393                // to ensure that any exceptions are logged.
 394
 395                try
 396                {
 38397                    transportConsumer = Connection.CreateTransportConsumer(ConsumerGroup, partitionId, startingPosition,
 38398                    partitionContext = new PartitionContext(partitionId, transportConsumer);
 38399                    emptyPartitionEvent = new PartitionEvent(partitionContext, null);
 38400                }
 0401                catch (Exception ex)
 402                {
 0403                    EventHubsEventSource.Log.ReadEventsFromPartitionError(EventHubName, partitionId, ex.Message);
 0404                    throw;
 405                }
 406
 407                // Process items.  Because catch blocks cannot be used when yielding items, ensure that any exceptions d
 408                // the receive operation are caught in an independent try/catch block.
 409
 38410                var emptyBatch = true;
 38411                var eventBatch = default(IReadOnlyList<EventData>);
 412
 66413                while (!cancellationToken.IsCancellationRequested)
 414                {
 415                    try
 416                    {
 64417                        emptyBatch = true;
 64418                        eventBatch = await transportConsumer.ReceiveAsync(readOptions.CacheEventCount, readOptions.Maxim
 52419                    }
 2420                    catch (TaskCanceledException)
 421                    {
 2422                        throw;
 423                    }
 2424                    catch (OperationCanceledException ex)
 425                    {
 2426                        throw new TaskCanceledException(ex.Message, ex);
 427                    }
 8428                    catch (Exception ex)
 429                    {
 8430                        EventHubsEventSource.Log.ReadEventsFromPartitionError(EventHubName, partitionId, ex.Message);
 8431                        throw;
 432                    }
 433
 434                    // The batch will be available after the receive operation, regardless of whether there were events
 435                    // available or not; if there are any events in the set to yield, then mark the batch as non-empty.
 436
 3414437                    foreach (var eventData in eventBatch)
 438                    {
 1664439                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 440
 1662441                        emptyBatch = false;
 1662442                        yield return new PartitionEvent(partitionContext, eventData);
 443                    }
 444
 445                    // If there were no events received, only emit an empty event if there was a maximum wait time
 446                    // explicitly requested, otherwise, continue attempting to receive without emitting any value
 447                    // to the enumerable.
 448
 34449                    if ((emptyBatch) && (readOptions.MaximumWaitTime.HasValue))
 450                    {
 8451                        yield return emptyPartitionEvent;
 452                    }
 453                }
 454            }
 455            finally
 456            {
 38457                if (transportConsumer != null)
 458                {
 459                    try
 460                    {
 38461                        await transportConsumer.CloseAsync(CancellationToken.None).ConfigureAwait(false);
 38462                    }
 0463                    catch (Exception ex)
 464                    {
 0465                        EventHubsEventSource.Log.ReadEventsFromPartitionError(EventHubName, partitionId, ex.Message);
 0466                        closeException = ex;
 0467                    }
 468                }
 469
 38470                EventHubsEventSource.Log.ReadEventsFromPartitionComplete(EventHubName, partitionId);
 471            }
 472
 473            // If an exception was captured when closing the transport consumer, surface it.
 474
 2475            if (closeException != default)
 476            {
 0477                ExceptionDispatchInfo.Capture(closeException).Throw();
 478            }
 479
 480            // If cancellation was requested, then surface the expected exception.
 481
 2482            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 26483        }
 484
 485        /// <summary>
 486        ///   Reads events from all partitions of the event hub as an asynchronous enumerable, allowing events to be ite
 487        ///   become available on the partition, waiting as necessary should there be no events available.
 488        ///
 489        ///   This enumerator may block for an indeterminate amount of time for an <c>await</c> if events are not availa
 490        ///   cancellation via the <paramref name="cancellationToken"/> to be requested in order to return control.  It 
 491        ///   <see cref="ReadEventOptions.MaximumWaitTime" /> for scenarios where a more deterministic maximum waiting p
 492        /// </summary>
 493        ///
 494        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 495        ///
 496        /// <returns>An <see cref="IAsyncEnumerable{T}"/> to be used for iterating over events in the partition.</return
 497        ///
 498        /// <remarks>
 499        ///   This method is not recommended for production use; the <c>EventProcessorClient</c> should be used for read
 500        ///   production scenario, as it offers a much more robust experience with higher throughput.
 501        ///
 502        ///   It is important to note that this method does not guarantee fairness amongst the partitions during iterati
 503        ///   events to be read by the enumerator.  Depending on service communication, there may be a clustering of eve
 504        ///   bias for a given partition or subset of partitions.
 505        ///
 506        ///   Each reader of events is presented with an independent iterator; if there are multiple readers, each recei
 507        ///   process, rather than competing for them.
 508        /// </remarks>
 509        ///
 510        /// <seealso href="https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor" />
 511        /// <seealso cref="ReadEventsAsync(ReadEventOptions, CancellationToken)"/>
 512        ///
 52513        public virtual IAsyncEnumerable<PartitionEvent> ReadEventsAsync(CancellationToken cancellationToken = default) =
 514
 515        /// <summary>
 516        ///   Reads events from all partitions of the event hub as an asynchronous enumerable, allowing events to be ite
 517        ///   become available on the partition, waiting as necessary should there be no events available.
 518        ///
 519        ///   This enumerator may block for an indeterminate amount of time for an <c>await</c> if events are not availa
 520        ///   cancellation via the <paramref name="cancellationToken"/> to be requested in order to return control.  It 
 521        ///   <see cref="ReadEventOptions.MaximumWaitTime" /> for scenarios where a more deterministic maximum waiting p
 522        /// </summary>
 523        ///
 524        /// <param name="readOptions">The set of options to use for configuring read behavior; if not specified the defa
 525        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 526        ///
 527        /// <returns>An <see cref="IAsyncEnumerable{T}"/> to be used for iterating over events in the partition.</return
 528        ///
 529        /// <remarks>
 530        ///   This method is not recommended for production use; the <c>EventProcessorClient</c> should be used for read
 531        ///   production scenario, as it offers a much more robust experience with higher throughput.
 532        ///
 533        ///   It is important to note that this method does not guarantee fairness amongst the partitions during iterati
 534        ///   events to be read by the enumerator.  Depending on service communication, there may be a clustering of eve
 535        ///   bias for a given partition or subset of partitions.
 536        ///
 537        ///   Each reader of events is presented with an independent iterator; if there are multiple readers, each recei
 538        ///   process, rather than competing for them.
 539        /// </remarks>
 540        ///
 541        /// <seealso href="https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor" />
 542        /// <seealso cref="ReadEventsAsync(CancellationToken)"/>
 543        ///
 544        public virtual IAsyncEnumerable<PartitionEvent> ReadEventsAsync(ReadEventOptions readOptions,
 60545                                                                        CancellationToken cancellationToken = default) =
 546
 547        /// <summary>
 548        ///   Reads events from all partitions of the event hub as an asynchronous enumerable, allowing events to be ite
 549        ///   become available on the partition, waiting as necessary should there be no events available.
 550        ///
 551        ///   This enumerator may block for an indeterminate amount of time for an <c>await</c> if events are not availa
 552        ///   cancellation via the <paramref name="cancellationToken"/> to be requested in order to return control.  It 
 553        ///   <see cref="ReadEventOptions.MaximumWaitTime" /> for scenarios where a more deterministic maximum waiting p
 554        /// </summary>
 555        ///
 556        /// <param name="startReadingAtEarliestEvent"><c>true</c> to begin reading at the first events available in each
 557        /// <param name="readOptions">The set of options to use for configuring read behavior; if not specified the defa
 558        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 559        ///
 560        /// <returns>An <see cref="IAsyncEnumerable{T}"/> to be used for iterating over events in the partition.</return
 561        ///
 562        /// <remarks>
 563        ///   This method is not recommended for production use; the <c>EventProcessorClient</c> should be used for read
 564        ///   production scenario, as it offers a much more robust experience with higher throughput.
 565        ///
 566        ///   It is important to note that this method does not guarantee fairness amongst the partitions during iterati
 567        ///   events to be read by the enumerator.  Depending on service communication, there may be a clustering of eve
 568        ///   bias for a given partition or subset of partitions.
 569        ///
 570        ///   Each reader of events is presented with an independent iterator; if there are multiple readers, each recei
 571        ///   process, rather than competing for them.
 572        /// </remarks>
 573        ///
 574        /// <seealso href="https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor" />
 575        /// <seealso cref="ReadEventsAsync(CancellationToken)"/>
 576        /// <seealso cref="ReadEventsAsync(ReadEventOptions, CancellationToken)"/>
 577        ///
 578        public virtual async IAsyncEnumerable<PartitionEvent> ReadEventsAsync(bool startReadingAtEarliestEvent,
 579                                                                              ReadEventOptions readOptions = default,
 580                                                                              [EnumeratorCancellation] CancellationToken
 581        {
 56582            Argument.AssertNotClosed(IsClosed, nameof(EventHubConsumerClient));
 54583            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 584
 52585            EventHubsEventSource.Log.ReadAllEventsStart(EventHubName);
 586
 52587            var cancelPublishingAsync = default(Func<Task>);
 52588            var eventChannel = default(Channel<PartitionEvent>);
 52589            var options = readOptions?.Clone() ?? new ReadEventOptions();
 52590            var startingPosition = startReadingAtEarliestEvent ? EventPosition.Earliest : EventPosition.Latest;
 591
 52592            using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 593
 594            try
 595            {
 596                // Determine the partitions for the Event Hub and create the shared channel.
 597
 52598                var partitions = await GetPartitionIdsAsync(cancellationToken).ConfigureAwait(false);
 599
 52600                var channelSize = options.CacheEventCount * partitions.Length * 2L;
 52601                eventChannel = CreateEventChannel((int)Math.Min(channelSize, int.MaxValue));
 602
 603                // Start publishing for all partitions.
 604
 52605                var publishingTasks = new Task<Func<Task>>[partitions.Length];
 606
 312607                for (var index = 0; index < partitions.Length; ++index)
 608                {
 104609                    publishingTasks[index] = PublishPartitionEventsToChannelAsync(partitions[index], startingPosition, o
 610                }
 611
 612                // Capture the callbacks to cancel publishing for all events.
 613
 52614                var cancelPublishingCallbacks = await Task.WhenAll(publishingTasks).ConfigureAwait(false);
 208615                cancelPublishingAsync = () => Task.WhenAll(cancelPublishingCallbacks.Select(cancelCallback => cancelCall
 52616            }
 0617            catch (Exception ex)
 618            {
 0619                EventHubsEventSource.Log.ReadAllEventsError(EventHubName, ex.Message);
 0620                cancellationSource?.Cancel();
 621
 0622                if (cancelPublishingAsync != null)
 623                {
 0624                    await cancelPublishingAsync().ConfigureAwait(false);
 625                }
 626
 0627                EventHubsEventSource.Log.ReadAllEventsComplete(EventHubName);
 0628                throw;
 629            }
 630
 631            // Iterate the events from the channel.
 632
 633            try
 634            {
 10962635                await foreach (var partitionEvent in eventChannel.Reader.EnumerateChannel(options.MaximumWaitTime, cance
 636                {
 5438637                    yield return partitionEvent;
 638                }
 639            }
 640            finally
 641            {
 52642                cancellationSource?.Cancel();
 52643                await cancelPublishingAsync().ConfigureAwait(false);
 28644                EventHubsEventSource.Log.ReadAllEventsComplete(EventHubName);
 645            }
 646
 647            // If cancellation was requested, then surface the expected exception.
 648
 0649            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 22650        }
 651
 652        /// <summary>
 653        ///   Closes the consumer.
 654        /// </summary>
 655        ///
 656        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 657        ///
 658        /// <returns>A task to be resolved on when the operation has completed.</returns>
 659        ///
 660        public virtual async Task CloseAsync(CancellationToken cancellationToken = default)
 661        {
 12662            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 663
 12664            if (IsClosed)
 665            {
 0666                return;
 667            }
 668
 12669            IsClosed = true;
 670
 12671            var clientHash = GetHashCode().ToString(CultureInfo.InvariantCulture);
 12672            EventHubsEventSource.Log.ClientCloseStart(nameof(EventHubConsumerClient), EventHubName, clientHash);
 673
 674            // Attempt to close the active transport consumers.  In the event that an exception is encountered,
 675            // it should not impact the attempt to close the connection, assuming ownership.
 676
 12677            var transportConsumerException = default(Exception);
 678
 679            try
 680            {
 12681                var pendingCloses = new List<Task>();
 682
 32683                foreach (var consumer in ActiveConsumers.Values)
 684                {
 4685                    pendingCloses.Add(consumer.CloseAsync(CancellationToken.None));
 686                }
 687
 12688                ActiveConsumers.Clear();
 12689                await Task.WhenAll(pendingCloses).ConfigureAwait(false);
 10690            }
 2691            catch (Exception ex)
 692            {
 2693                EventHubsEventSource.Log.ClientCloseError(nameof(EventHubConsumerClient), EventHubName, clientHash, ex.M
 2694                transportConsumerException = ex;
 2695            }
 696
 697            // An exception when closing the connection supersedes one observed when closing the
 698            // individual transport clients.
 699
 700            try
 701            {
 12702                if (OwnsConnection)
 703                {
 2704                    await Connection.CloseAsync().ConfigureAwait(false);
 705                }
 12706            }
 0707            catch (Exception ex)
 708            {
 0709                EventHubsEventSource.Log.ClientCloseError(nameof(EventHubConsumerClient), EventHubName, clientHash, ex.M
 0710                transportConsumerException = null;
 0711                throw;
 712            }
 713            finally
 714            {
 12715                EventHubsEventSource.Log.ClientCloseComplete(nameof(EventHubConsumerClient), EventHubName, clientHash);
 716            }
 717
 718            // If there was an active exception pending from closing the individual
 719            // transport consumers, surface it now.
 720
 12721            if (transportConsumerException != default)
 722            {
 2723                ExceptionDispatchInfo.Capture(transportConsumerException).Throw();
 724            }
 10725        }
 726
 727        /// <summary>
 728        ///   Performs the task needed to clean up resources used by the <see cref="EventHubConsumerClient" />,
 729        ///   including ensuring that the client itself has been closed.
 730        /// </summary>
 731        ///
 732        /// <returns>A task to be resolved on when the operation has completed.</returns>
 733        ///
 734        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 0735        public virtual async ValueTask DisposeAsync() => await CloseAsync().ConfigureAwait(false);
 736
 737        /// <summary>
 738        ///   Determines whether the specified <see cref="System.Object" /> is equal to this instance.
 739        /// </summary>
 740        ///
 741        /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param>
 742        ///
 743        /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c>
 744        ///
 745        [EditorBrowsable(EditorBrowsableState.Never)]
 0746        public override bool Equals(object obj) => base.Equals(obj);
 747
 748        /// <summary>
 749        ///   Returns a hash code for this instance.
 750        /// </summary>
 751        ///
 752        /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha
 753        ///
 754        [EditorBrowsable(EditorBrowsableState.Never)]
 12755        public override int GetHashCode() => base.GetHashCode();
 756
 757        /// <summary>
 758        ///   Converts the instance to string representation.
 759        /// </summary>
 760        ///
 761        /// <returns>A <see cref="System.String" /> that represents this instance.</returns>
 762        ///
 763        [EditorBrowsable(EditorBrowsableState.Never)]
 0764        public override string ToString() => base.ToString();
 765
 766        /// <summary>
 767        ///   Publishes events for the requested <paramref name="partitionId"/> to the provided
 768        ///   <paramref name="channel" /> in the background, using a dedicated transport consumer
 769        ///   instance.
 770        /// </summary>
 771        ///
 772        /// <param name="partitionId">The identifier of the partition from which events should be read.</param>
 773        /// <param name="startingPosition">The position within the partition's event stream that reading should begin fr
 774        /// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on t
 775        /// <param name="receiveBatchSize">The batch size to use when receiving events.</param>
 776        /// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this va
 777        /// <param name="prefetchCount">The count of events requested eagerly and queued without regard to whether a rea
 778        /// <param name="channel">The channel to which events should be published.</param>
 779        /// <param name="publishingCancellationSource">A cancellation source which can be used for signaling publication
 780        ///
 781        /// <returns>A function to invoke when publishing should stop; once complete, background publishing is no longer
 782        ///
 783        /// <remarks>
 784        ///   This method assumes co-ownership of the <paramref name="channel" />, marking its writer as completed
 785        ///   when publishing is complete or when an exception is encountered.
 786        /// </remarks>
 787        ///
 788        private async Task<Func<Task>> PublishPartitionEventsToChannelAsync(string partitionId,
 789                                                                            EventPosition startingPosition,
 790                                                                            bool trackLastEnqueuedEventProperties,
 791                                                                            int receiveBatchSize,
 792                                                                            long? ownerLevel,
 793                                                                            uint prefetchCount,
 794                                                                            Channel<PartitionEvent> channel,
 795                                                                            CancellationTokenSource publishingCancellati
 796        {
 104797            publishingCancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>();
 104798            EventHubsEventSource.Log.PublishPartitionEventsToChannelStart(EventHubName, partitionId);
 799
 104800            var transportConsumer = default(TransportConsumer);
 104801            var publishingTask = default(Task);
 104802            var observedException = default(Exception);
 104803            var publisherId = Guid.NewGuid().ToString();
 804
 805            // Termination must take place in multiple places due to the a catch block being
 806            // disallowed with the use of "yield return".  Create a local function that encapsulates
 807            // the cleanup tasks needed.
 808
 809            async Task performCleanup()
 810            {
 104811                publishingCancellationSource?.Cancel();
 812
 104813                if (publishingTask != null)
 814                {
 815                    try
 816                    {
 104817                        await publishingTask.ConfigureAwait(false);
 84818                        channel.Writer.TryComplete();
 84819                    }
 20820                    catch (Exception ex) when ((ex is TaskCanceledException) || (ex is ChannelClosedException))
 821                    {
 822                        // Due to the non-determinism when requesting cancellation of the background
 823                        // publishing, it may surface as the expected cancellation or may result in
 824                        // an attempt to write to the shared channel after another publisher has
 825                        // marked it as final.
 826                        //
 827                        // These are expected scenarios; no action is needed.
 20828                    }
 829                }
 830
 104831                if (transportConsumer != null)
 832                {
 104833                    ActiveConsumers.TryRemove(publisherId, out var _);
 104834                    await transportConsumer.CloseAsync(CancellationToken.None).ConfigureAwait(false);
 835                }
 836
 837                try
 838                {
 104839                    if (observedException != default)
 840                    {
 34841                        EventHubsEventSource.Log.PublishPartitionEventsToChannelError(EventHubName, partitionId, observe
 34842                        ExceptionDispatchInfo.Capture(observedException).Throw();
 843                    }
 70844                }
 845                finally
 846                {
 104847                    EventHubsEventSource.Log.PublishPartitionEventsToChannelComplete(EventHubName, partitionId);
 848                }
 70849            }
 850
 851            // Setup the publishing context and begin publishing to the channel in the background.
 852
 853            try
 854            {
 104855                transportConsumer = Connection.CreateTransportConsumer(ConsumerGroup, partitionId, startingPosition, Ret
 856
 104857                if (!ActiveConsumers.TryAdd(publisherId, transportConsumer))
 858                {
 0859                    await transportConsumer.CloseAsync(CancellationToken.None).ConfigureAwait(false);
 0860                    transportConsumer = null;
 0861                    throw new EventHubsException(false, EventHubName, string.Format(CultureInfo.CurrentCulture, Resource
 862                }
 863
 864                void exceptionCallback(Exception ex)
 865                {
 866                    // Ignore the known exception cases that present during cancellation across
 867                    // background publishing for multiple partitions.
 868
 48869                    if ((ex is ChannelClosedException) || (ex is TaskCanceledException))
 870                    {
 14871                        return;
 872                    }
 873
 34874                    observedException = ex;
 34875                }
 876
 104877                publishingTask = StartBackgroundChannelPublishingAsync
 104878                (
 104879                    transportConsumer,
 104880                    channel,
 104881                    new PartitionContext(partitionId, transportConsumer),
 104882                    receiveBatchSize,
 104883                    exceptionCallback,
 104884                    publishingCancellationSource.Token
 104885                );
 104886            }
 0887            catch (Exception ex)
 888            {
 0889                EventHubsEventSource.Log.PublishPartitionEventsToChannelError(EventHubName, partitionId, ex.Message);
 0890                await performCleanup().ConfigureAwait(false);
 891
 0892                throw;
 893            }
 894
 104895            return performCleanup;
 104896        }
 897
 898        /// <summary>
 899        ///   Begins the background process responsible for receiving from the specified <see cref="TransportConsumer" /
 900        ///   and publishing to the requested <see cref="Channel{PartitionEvent}" />.
 901        /// </summary>
 902        ///
 903        /// <param name="transportConsumer">The consumer to use for receiving events.</param>
 904        /// <param name="channel">The channel to which received events should be published.</param>
 905        /// <param name="partitionContext">The context that represents the partition from which events being received.</
 906        /// <param name="receiveBatchSize">The batch size to use when receiving events.</param>
 907        /// <param name="notifyException">An action to be invoked when an exception is encountered during publishing.</p
 908        /// <param name="cancellationToken">The <see cref="CancellationToken"/> to signal the request to cancel the back
 909        ///
 910        /// <returns>A task to be resolved on when the operation has completed.</returns>
 911        ///
 912        private Task StartBackgroundChannelPublishingAsync(TransportConsumer transportConsumer,
 913                                                           Channel<PartitionEvent> channel,
 914                                                           PartitionContext partitionContext,
 915                                                           int receiveBatchSize,
 916                                                           Action<Exception> notifyException,
 917                                                           CancellationToken cancellationToken) =>
 108918            Task.Run(async () =>
 108919            {
 198920                var failedAttemptCount = 0;
 198921                var writtenItems = 0;
 198922                var receivedItems = default(IReadOnlyList<EventData>);
 198923                var retryDelay = default(TimeSpan?);
 198924                var activeException = default(Exception);
 108925
 51802926                while (!cancellationToken.IsCancellationRequested)
 108927                {
 108928                    try
 108929                    {
 108930                        // Receive items in batches and then write them to the subscribed channels.  The channels will n
 108931                        // block if they reach their maximum queue size, so there is no need to throttle publishing.
 108932
 51768933                        if (receivedItems == default)
 108934                        {
 51762935                            receivedItems = await transportConsumer.ReceiveAsync(receiveBatchSize, BackgroundPublishingW
 108936                        }
 108937
 118792938                        foreach (EventData item in receivedItems)
 108939                        {
 7858940                            await channel.Writer.WriteAsync(new PartitionEvent(partitionContext, item), cancellationToke
 7842941                            ++writtenItems;
 108942                        }
 108943
 51692944                        receivedItems = default;
 51692945                        writtenItems = 0;
 51692946                        failedAttemptCount = 0;
 51692947                    }
 116948                    catch (TaskCanceledException ex)
 108949                    {
 108950                        // In the case that a task was canceled, if cancellation was requested, then publishing should
 108951                        // is already terminating.  Otherwise, something unexpected canceled the operation and it should
 108952                        // be treated as an exception to ensure that the channels are marked final and consumers are mad
 108953                        // aware.
 108954
 116955                        activeException = (cancellationToken.IsCancellationRequested) ? null : ex;
 116956                        break;
 108957                    }
 114958                    catch (OperationCanceledException ex)
 108959                    {
 114960                        activeException = new TaskCanceledException(ex.Message, ex);
 114961                        break;
 108962                    }
 132963                    catch (EventHubsException ex) when
 132964                        (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected
 132965                        || ex.Reason == EventHubsException.FailureReason.ClientClosed)
 108966                    {
 108967                        // If the consumer was disconnected or closed, it is known to be unrecoverable; do not offer the
 108968
 0969                        activeException = ex;
 0970                        break;
 108971                    }
 170972                    catch (Exception ex) when (ex.IsFatalException())
 108973                    {
 0974                        channel.Writer.TryComplete(ex);
 0975                        throw;
 108976                    }
 170977                    catch (Exception ex)
 108978                    {
 108979                        // Determine if there should be a retry for the next attempt; if so enforce the delay but do not
 108980                        // Otherwise, mark the exception as active and break out of the loop.
 108981
 170982                        ++failedAttemptCount;
 170983                        retryDelay = RetryPolicy.CalculateRetryDelay(ex, failedAttemptCount);
 108984
 170985                        if (retryDelay.HasValue)
 108986                        {
 108987                            // If items were being emitted at the time of the exception, skip to the
 108988                            // last active item that was published to the channel so that publishing
 108989                            // resumes at the next event in sequence and duplicates are not written.
 108990
 130991                            if ((receivedItems != default) && (writtenItems > 0))
 108992                            {
 114993                                receivedItems = receivedItems.Skip(writtenItems).ToList();
 108994                            }
 108995
 130996                            await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
 128997                            activeException = null;
 108998                        }
 108999                        else
 1081000                        {
 1481001                            activeException = ex;
 1481002                            break;
 1081003                        }
 1081004                    }
 1081005                }
 1081006
 1081007                // Publishing has terminated; if there was an active exception, then take the necessary steps to mark pu
 1081008                // than completed normally.
 1081009
 1961010                if (activeException != null)
 1081011                {
 1581012                    channel.Writer.TryComplete(activeException);
 1581013                    notifyException(activeException);
 1081014                }
 1081015
 1961016            }, cancellationToken);
 1017
 1018        /// <summary>
 1019        ///   Creates a channel for publishing events to local subscribers.
 1020        /// </summary>
 1021        ///
 1022        /// <param name="capacity">The maximum amount of events that can be queued in the channel.</param>
 1023        ///
 1024        /// <returns>A bounded channel, configured for 1:many read/write usage.</returns>
 1025        ///
 1026        private Channel<PartitionEvent> CreateEventChannel(int capacity) =>
 521027            Channel.CreateBounded<PartitionEvent>(new BoundedChannelOptions(capacity)
 521028            {
 521029                FullMode = BoundedChannelFullMode.Wait,
 521030                SingleWriter = false,
 521031                SingleReader = true
 521032            });
 1033    }
 1034}