< Summary

Class:Azure.Messaging.EventHubs.Amqp.AmqpConsumer
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpConsumer.cs
Covered lines:107
Uncovered lines:39
Coverable lines:146
Total lines:435
Line coverage:73.2% (107 of 146)
Covered branches:21
Total branches:50
Branch coverage:42% (21 of 50)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-0%100%
get_IsClosed()-100%100%
get_EventHubName()-100%100%
get_ConsumerGroup()-100%100%
get_PartitionId()-100%100%
get_CurrentEventPosition()-100%100%
get_TrackLastEnqueuedEventProperties()-0%100%
get_RetryPolicy()-100%100%
get_MessageConverter()-0%100%
get_ConnectionScope()-100%100%
get_ReceiveLink()-100%100%
.ctor(...)-92.68%25%
ReceiveAsync()-56.67%41.67%
CloseAsync()-83.33%50%
CreateConsumerLinkAsync()-73.33%100%
UseMinimum(...)-100%50%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpConsumer.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Globalization;
 7using System.Runtime.ExceptionServices;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using Azure.Core;
 11using Azure.Core.Diagnostics;
 12using Azure.Messaging.EventHubs.Consumer;
 13using Azure.Messaging.EventHubs.Core;
 14using Azure.Messaging.EventHubs.Diagnostics;
 15using Microsoft.Azure.Amqp;
 16
 17namespace Azure.Messaging.EventHubs.Amqp
 18{
 19    /// <summary>
 20    ///   A transport client abstraction responsible for brokering operations for AMQP-based connections.
 21    ///   It is intended that the public <see cref="EventHubConsumerClient" /> make use of an instance
 22    ///   via containment and delegate operations to it.
 23    /// </summary>
 24    ///
 25    /// <seealso cref="Azure.Messaging.EventHubs.Core.TransportConsumer" />
 26    ///
 27    internal class AmqpConsumer : TransportConsumer
 28    {
 29        /// <summary>The default prefetch count to use for the consumer.</summary>
 30        private const uint DefaultPrefetchCount = 300;
 31
 32        /// <summary>An empty set of events which can be dispatched when no events are available.</summary>
 033        private static readonly IReadOnlyList<EventData> EmptyEventSet = Array.Empty<EventData>();
 34
 35        /// <summary>Indicates whether or not this instance has been closed.</summary>
 36        private volatile bool _closed = false;
 37
 38        /// <summary>
 39        ///   Indicates whether or not this consumer has been closed.
 40        /// </summary>
 41        ///
 42        /// <value>
 43        ///   <c>true</c> if the consumer is closed; otherwise, <c>false</c>.
 44        /// </value>
 45        ///
 646        public override bool IsClosed => _closed;
 47
 48        /// <summary>
 49        ///   The name of the Event Hub to which the client is bound.
 50        /// </summary>
 51        ///
 19252        private string EventHubName { get; }
 53
 54        /// <summary>
 55        ///   The name of the consumer group that this consumer is associated with.  Events will be read
 56        ///   only in the context of this group.
 57        /// </summary>
 58        ///
 8659        private string ConsumerGroup { get; }
 60
 61        /// <summary>
 62        ///   The identifier of the Event Hub partition that this consumer is associated with.  Events will be read
 63        ///   only from this partition.
 64        /// </summary>
 65        ///
 8666        private string PartitionId { get; }
 67
 68        /// <summary>
 69        ///   The current position for the consumer, updated as events are received from the
 70        ///   partition.
 71        /// </summary>
 72        ///
 73        /// <remarks>
 74        ///   When creating or recovering the associated AMQP link, this value is used
 75        ///   to set the position.  It is intended to primarily support recreating links
 76        ///   transparently to callers, allowing progress in the stream to be remembered.
 77        /// </remarks>
 78        ///
 12679        private EventPosition CurrentEventPosition { get; set; }
 80
 81        /// <summary>
 82        ///   Indicates whether or not the consumer should request information on the last enqueued event on the partiti
 83        ///   associated with a given event, and track that information as events are received.
 84        /// </summary>
 85        ///
 86        /// <value><c>true</c> if information about a partition's last event should be requested and tracked; otherwise,
 87        ///
 088        private bool TrackLastEnqueuedEventProperties { get; }
 89
 90        /// <summary>
 91        ///   The policy to use for determining retry behavior for when an operation fails.
 92        /// </summary>
 93        ///
 7094        private EventHubsRetryPolicy RetryPolicy { get; }
 95
 96        /// <summary>
 97        ///   The converter to use for translating between AMQP messages and client library
 98        ///   types.
 99        /// </summary>
 100        ///
 0101        private AmqpMessageConverter MessageConverter { get; }
 102
 103        /// <summary>
 104        ///   The AMQP connection scope responsible for managing transport constructs for this instance.
 105        /// </summary>
 106        ///
 86107        private AmqpConnectionScope ConnectionScope { get; }
 108
 109        /// <summary>
 110        ///   The AMQP link intended for use with receiving operations.
 111        /// </summary>
 112        ///
 66113        private FaultTolerantAmqpObject<ReceivingAmqpLink> ReceiveLink { get; }
 114
 115        /// <summary>
 116        ///   Initializes a new instance of the <see cref="AmqpConsumer"/> class.
 117        /// </summary>
 118        ///
 119        /// <param name="eventHubName">The name of the Event Hub from which events will be consumed.</param>
 120        /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea
 121        /// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</par
 122        /// <param name="eventPosition">The position of the event in the partition where the consumer should begin readi
 123        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 124        /// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this va
 125        /// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on t
 126        /// <param name="connectionScope">The AMQP connection context for operations .</param>
 127        /// <param name="messageConverter">The converter to use for translating between AMQP messages and client types.<
 128        /// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
 129        ///
 130        /// <remarks>
 131        ///   As an internal type, this class performs only basic sanity checks against its arguments.  It
 132        ///   is assumed that callers are trusted and have performed deep validation.
 133        ///
 134        ///   Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
 135        ///   creation of clones or otherwise protecting the parameters is assumed to be the purview of the
 136        ///   caller.
 137        /// </remarks>
 138        ///
 110139        public AmqpConsumer(string eventHubName,
 110140                            string consumerGroup,
 110141                            string partitionId,
 110142                            EventPosition eventPosition,
 110143                            bool trackLastEnqueuedEventProperties,
 110144                            long? ownerLevel,
 110145                            uint? prefetchCount,
 110146                            AmqpConnectionScope connectionScope,
 110147                            AmqpMessageConverter messageConverter,
 110148                            EventHubsRetryPolicy retryPolicy)
 149        {
 110150            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 106151            Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
 102152            Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
 98153            Argument.AssertNotNull(connectionScope, nameof(connectionScope));
 96154            Argument.AssertNotNull(messageConverter, nameof(messageConverter));
 94155            Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
 156
 92157            EventHubName = eventHubName;
 92158            ConsumerGroup = consumerGroup;
 92159            PartitionId = partitionId;
 92160            CurrentEventPosition = eventPosition;
 92161            TrackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
 92162            ConnectionScope = connectionScope;
 92163            RetryPolicy = retryPolicy;
 92164            MessageConverter = messageConverter;
 165
 92166            ReceiveLink = new FaultTolerantAmqpObject<ReceivingAmqpLink>(
 92167                timeout =>
 126168                   CreateConsumerLinkAsync(
 126169                        consumerGroup,
 126170                        partitionId,
 126171                        CurrentEventPosition,
 126172                        prefetchCount ?? DefaultPrefetchCount,
 126173                        ownerLevel,
 126174                        trackLastEnqueuedEventProperties,
 126175                        timeout,
 126176                        CancellationToken.None),
 92177                link =>
 92178                {
 0179                    link.Session?.SafeClose();
 0180                    link.SafeClose();
 0181                });
 92182        }
 183
 184        /// <summary>
 185        ///   Receives a batch of <see cref="EventData" /> from the Event Hub partition.
 186        /// </summary>
 187        ///
 188        /// <param name="maximumMessageCount">The maximum number of messages to receive in this batch.</param>
 189        /// <param name="maximumWaitTime">The maximum amount of time to wait to build up the requested message count for
 190        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 191        ///
 192        /// <returns>The batch of <see cref="EventData" /> from the Event Hub partition this consumer is associated with
 193        ///
 194        public override async Task<IReadOnlyList<EventData>> ReceiveAsync(int maximumMessageCount,
 195                                                                          TimeSpan? maximumWaitTime,
 196                                                                          CancellationToken cancellationToken)
 197        {
 26198            Argument.AssertNotClosed(_closed, nameof(AmqpConsumer));
 24199            Argument.AssertAtLeast(maximumMessageCount, 1, nameof(maximumMessageCount));
 200
 18201            var receivedEventCount = 0;
 18202            var failedAttemptCount = 0;
 18203            var tryTimeout = RetryPolicy.CalculateTryTimeout(0);
 18204            var waitTime = (maximumWaitTime ?? tryTimeout);
 18205            var operationId = Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture);
 18206            var link = default(ReceivingAmqpLink);
 18207            var retryDelay = default(TimeSpan?);
 18208            var amqpMessages = default(IEnumerable<AmqpMessage>);
 18209            var receivedEvents = default(List<EventData>);
 18210            var lastReceivedEvent = default(EventData);
 211
 18212            var stopWatch = ValueStopwatch.StartNew();
 213
 214            try
 215            {
 36216                while ((!cancellationToken.IsCancellationRequested) && (!_closed))
 217                {
 218                    try
 219                    {
 220                        // Creation of the link happens without explicit knowledge of the cancellation token
 221                        // used for this operation; validate the token state before attempting link creation and
 222                        // again after the operation completes to provide best efforts in respecting it.
 223
 34224                        EventHubsEventSource.Log.EventReceiveStart(EventHubName, ConsumerGroup, PartitionId, operationId
 34225                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 226
 34227                        link = await ReceiveLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)
 0228                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 229
 0230                        var messagesReceived = await Task.Factory.FromAsync
 0231                        (
 0232                            (callback, state) => link.BeginReceiveMessages(maximumMessageCount, waitTime, callback, stat
 0233                            (asyncResult) => link.EndReceiveMessages(asyncResult, out amqpMessages),
 0234                            TaskCreationOptions.RunContinuationsAsynchronously
 0235                        ).ConfigureAwait(false);
 236
 0237                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 238
 239                        // If event messages were received, then package them for consumption and
 240                        // return them.
 241
 0242                        if ((messagesReceived) && (amqpMessages != null))
 243                        {
 0244                            receivedEvents ??= new List<EventData>();
 245
 0246                            foreach (AmqpMessage message in amqpMessages)
 247                            {
 0248                                link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
 0249                                receivedEvents.Add(MessageConverter.CreateEventFromMessage(message));
 0250                                message.Dispose();
 251                            }
 252
 0253                            receivedEventCount = receivedEvents.Count;
 254
 0255                            if (receivedEventCount > 0)
 256                            {
 0257                                lastReceivedEvent = receivedEvents[receivedEventCount - 1];
 258
 0259                                if (lastReceivedEvent.Offset > long.MinValue)
 260                                {
 0261                                    CurrentEventPosition = EventPosition.FromOffset(lastReceivedEvent.Offset, false);
 262                                }
 263
 0264                                if (TrackLastEnqueuedEventProperties)
 265                                {
 0266                                    LastReceivedEvent = lastReceivedEvent;
 267                                }
 268                            }
 269
 0270                            return receivedEvents;
 271                        }
 272
 273                        // No events were available.
 274
 0275                        return EmptyEventSet;
 276                    }
 20277                    catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ServiceTimeout)
 278                    {
 279                        // Because the timeout specified with the request is intended to be the maximum
 280                        // amount of time to wait for events, a timeout isn't considered an error condition,
 281                        // rather a sign that no events were available in the requested period.
 282
 0283                        return EmptyEventSet;
 284                    }
 285                    catch (Exception ex)
 286                    {
 34287                        Exception activeEx = ex.TranslateServiceException(EventHubName);
 288
 289                        // Determine if there should be a retry for the next attempt; if so enforce the delay but do not
 290                        // Otherwise, bubble the exception.
 291
 34292                        ++failedAttemptCount;
 34293                        retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
 294
 34295                        if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellation
 296                        {
 18297                            EventHubsEventSource.Log.EventReceiveError(EventHubName, ConsumerGroup, PartitionId, operati
 18298                            await Task.Delay(UseMinimum(retryDelay.Value, waitTime.CalculateRemaining(stopWatch.GetElaps
 299
 18300                            tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
 301                        }
 16302                        else if (ex is AmqpException)
 303                        {
 0304                            ExceptionDispatchInfo.Capture(activeEx).Throw();
 305                        }
 306                        else
 307                        {
 16308                            throw;
 309                        }
 310                    }
 311                }
 312
 313                // If no value has been returned nor exception thrown by this point,
 314                // then cancellation has been requested.
 315
 2316                throw new TaskCanceledException();
 317            }
 2318            catch (TaskCanceledException)
 319            {
 2320                throw;
 321            }
 16322            catch (Exception ex)
 323            {
 16324                EventHubsEventSource.Log.EventReceiveError(EventHubName, ConsumerGroup, PartitionId, operationId, ex.Mes
 16325                throw;
 326            }
 327            finally
 328            {
 18329                EventHubsEventSource.Log.EventReceiveComplete(EventHubName, ConsumerGroup, PartitionId, operationId, fai
 330            }
 0331        }
 332
 333        /// <summary>
 334        ///   Closes the connection to the transport consumer instance.
 335        /// </summary>
 336        ///
 337        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 338        ///
 339        public override async Task CloseAsync(CancellationToken cancellationToken)
 340        {
 18341            if (_closed)
 342            {
 0343                return;
 344            }
 345
 18346            _closed = true;
 347
 18348            var clientId = GetHashCode().ToString(CultureInfo.InvariantCulture);
 18349            var clientType = GetType().Name;
 350
 351            try
 352            {
 18353                EventHubsEventSource.Log.ClientCloseStart(clientType, EventHubName, clientId);
 18354                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 355
 16356                if (ReceiveLink?.TryGetOpenedObject(out var _) == true)
 357                {
 0358                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0359                    await ReceiveLink.CloseAsync().ConfigureAwait(false);
 360                }
 361
 16362                ReceiveLink?.Dispose();
 16363            }
 2364            catch (Exception ex)
 365            {
 2366                _closed = false;
 2367                EventHubsEventSource.Log.ClientCloseError(clientType, EventHubName, clientId, ex.Message);
 368
 2369                throw;
 370            }
 371            finally
 372            {
 18373                EventHubsEventSource.Log.ClientCloseComplete(clientType, EventHubName, clientId);
 374            }
 16375        }
 376
 377        /// <summary>
 378        ///   Creates the AMQP link to be used for consumer-related operations.
 379        /// </summary>
 380        ///
 381        /// <param name="consumerGroup">The consumer group of the Event Hub to which the link is bound.</param>
 382        /// <param name="partitionId">The identifier of the Event Hub partition to which the link is bound.</param>
 383        /// <param name="eventStartingPosition">The place within the partition's event stream to begin consuming events.
 384        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 385        /// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this va
 386        /// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on t
 387        /// <param name="timeout">The timeout to apply when creating the link.</param>
 388        /// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
 389        ///
 390        /// <returns>The AMQP link to use for consumer-related operations.</returns>
 391        ///
 392        private async Task<ReceivingAmqpLink> CreateConsumerLinkAsync(string consumerGroup,
 393                                                                      string partitionId,
 394                                                                      EventPosition eventStartingPosition,
 395                                                                      uint prefetchCount,
 396                                                                      long? ownerLevel,
 397                                                                      bool trackLastEnqueuedEventProperties,
 398                                                                      TimeSpan timeout,
 399                                                                      CancellationToken cancellationToken)
 400        {
 34401            var link = default(ReceivingAmqpLink);
 402
 403            try
 404            {
 34405                link = await ConnectionScope.OpenConsumerLinkAsync(
 34406                    consumerGroup,
 34407                    partitionId,
 34408                    eventStartingPosition,
 34409                    timeout,
 34410                    prefetchCount,
 34411                    ownerLevel,
 34412                    trackLastEnqueuedEventProperties,
 34413                    cancellationToken).ConfigureAwait(false);
 0414            }
 415            catch (Exception ex)
 416            {
 34417                ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Thro
 0418            }
 419
 0420            return link;
 0421        }
 422
 423        /// <summary>
 424        ///   Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
 425        /// </summary>
 426        ///
 427        /// <param name="firstOption">The first option to consider.</param>
 428        /// <param name="secondOption">The second option to consider.</param>
 429        ///
 430        /// <returns>The smaller of the two specified intervals.</returns>
 431        ///
 432        private static TimeSpan UseMinimum(TimeSpan firstOption,
 52433                                           TimeSpan secondOption) => (firstOption < secondOption) ? firstOption : second
 434    }
 435}