< Summary

Class:Azure.Messaging.EventHubs.Amqp.AmqpClient
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpClient.cs
Covered lines:143
Uncovered lines:21
Coverable lines:164
Total lines:537
Line coverage:87.1% (143 of 164)
Covered branches:43
Total branches:56
Branch coverage:76.7% (43 of 56)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_CredentialRefreshBuffer()-100%100%
get_IsClosed()-100%100%
get_ServiceEndpoint()-100%100%
get_EventHubName()-100%100%
get_Credential()-100%100%
get_MessageConverter()-100%100%
get_ConnectionScope()-100%100%
get_ManagementLink()-100%100%
.ctor(...)-100%100%
.ctor(...)-90.63%66.67%
GetPropertiesAsync()-79.41%87.5%
GetPartitionPropertiesAsync()-80.56%87.5%
CreateProducer(...)-100%100%
CreateConsumer(...)-100%100%
CloseAsync()-84.21%50%
AcquireAccessTokenAsync()-88.89%83.33%
UseMinimum(...)-100%50%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Globalization;
 6using System.Runtime.ExceptionServices;
 7using System.Security.Authentication;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using Azure.Core;
 11using Azure.Core.Diagnostics;
 12using Azure.Messaging.EventHubs.Authorization;
 13using Azure.Messaging.EventHubs.Consumer;
 14using Azure.Messaging.EventHubs.Core;
 15using Azure.Messaging.EventHubs.Diagnostics;
 16using Microsoft.Azure.Amqp;
 17
 18namespace Azure.Messaging.EventHubs.Amqp
 19{
 20    /// <summary>
 21    ///   A transport client abstraction responsible for brokering operations for AMQP-based connections.
 22    ///   It is intended that the public <see cref="EventHubConnection" /> make use of an instance via containment
 23    ///   and delegate operations to it.
 24    /// </summary>
 25    ///
 26    /// <seealso cref="Azure.Messaging.EventHubs.Core.TransportClient" />
 27    ///
 28    internal class AmqpClient : TransportClient
 29    {
 30        /// <summary>
 31        ///   The buffer to apply when considering refreshing; credentials that expire less than this duration will be r
 32        /// </summary>
 33        ///
 3834        private static TimeSpan CredentialRefreshBuffer { get; } = TimeSpan.FromMinutes(5);
 35
 36        /// <summary>Indicates whether or not this instance has been closed.</summary>
 37        private volatile bool _closed = false;
 38
 39        /// <summary>The currently active token to use for authorization with the Event Hubs service.</summary>
 40        private AccessToken _accessToken;
 41
 42        /// <summary>
 43        ///   Indicates whether or not this client has been closed.
 44        /// </summary>
 45        ///
 46        /// <value>
 47        ///   <c>true</c> if the client is closed; otherwise, <c>false</c>.
 48        /// </value>
 49        ///
 1650        public override bool IsClosed => _closed;
 51
 52        /// <summary>
 53        ///   The endpoint for the Event Hubs service to which the client is associated.
 54        /// </summary>
 55        ///
 17456        public override Uri ServiceEndpoint { get; }
 57
 58        /// <summary>
 59        ///   The name of the Event Hub to which the client is bound.
 60        /// </summary>
 61        ///
 46862        private string EventHubName { get; }
 63
 64        /// <summary>
 65        ///   Gets the credential to use for authorization with the Event Hubs service.
 66        /// </summary>
 67        ///
 3668        private EventHubTokenCredential Credential { get; }
 69
 70        /// <summary>
 71        ///   The converter to use for translating between AMQP messages and client library
 72        ///   types.
 73        /// </summary>
 74        ///
 14675        private AmqpMessageConverter MessageConverter { get; }
 76
 77        /// <summary>
 78        ///   The AMQP connection scope responsible for managing transport constructs for this instance.
 79        /// </summary>
 80        ///
 27481        private AmqpConnectionScope ConnectionScope { get; }
 82
 83        /// <summary>
 84        ///   The AMQP link intended for use with management operations.
 85        /// </summary>
 86        ///
 12487        private FaultTolerantAmqpObject<RequestResponseAmqpLink> ManagementLink { get; }
 88
 89        /// <summary>
 90        ///   Initializes a new instance of the <see cref="AmqpClient"/> class.
 91        /// </summary>
 92        ///
 93        /// <param name="host">The fully qualified host name for the Event Hubs namespace.  This is likely to be similar
 94        /// <param name="eventHubName">The name of the specific Event Hub to connect the client to.</param>
 95        /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma
 96        /// <param name="clientOptions">A set of options to apply when configuring the client.</param>
 97        ///
 98        /// <remarks>
 99        ///   As an internal type, this class performs only basic sanity checks against its arguments.  It
 100        ///   is assumed that callers are trusted and have performed deep validation.
 101        ///
 102        ///   Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
 103        ///   creation of clones or otherwise protecting the parameters is assumed to be the purview of the
 104        ///   caller.
 105        /// </remarks>
 106        ///
 107        public AmqpClient(string host,
 108                          string eventHubName,
 109                          EventHubTokenCredential credential,
 174110                          EventHubConnectionOptions clientOptions) : this(host, eventHubName, credential, clientOptions,
 111        {
 170112        }
 113
 114        /// <summary>
 115        ///   Initializes a new instance of the <see cref="AmqpClient"/> class.
 116        /// </summary>
 117        ///
 118        /// <param name="host">The fully qualified host name for the Event Hubs namespace.  This is likely to be similar
 119        /// <param name="eventHubName">The name of the specific Event Hub to connect the client to.</param>
 120        /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma
 121        /// <param name="clientOptions">A set of options to apply when configuring the client.</param>
 122        /// <param name="connectionScope">The optional scope to use for AMQP connection management.  If <c>null</c>, a n
 123        /// <param name="messageConverter">The optional converter to use for transforming AMQP message-related types.  I
 124        ///
 125        /// <remarks>
 126        ///   As an internal type, this class performs only basic sanity checks against its arguments.  It
 127        ///   is assumed that callers are trusted and have performed deep validation.
 128        ///
 129        ///   Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
 130        ///   creation of clones or otherwise protecting the parameters is assumed to be the purview of the
 131        ///   caller.
 132        /// </remarks>
 133        ///
 210134        protected AmqpClient(string host,
 210135                             string eventHubName,
 210136                             EventHubTokenCredential credential,
 210137                             EventHubConnectionOptions clientOptions,
 210138                             AmqpConnectionScope connectionScope,
 210139                             AmqpMessageConverter messageConverter)
 140        {
 210141            Argument.AssertNotNullOrEmpty(host, nameof(host));
 210142            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 210143            Argument.AssertNotNull(credential, nameof(credential));
 208144            Argument.AssertNotNull(clientOptions, nameof(clientOptions));
 145
 146            try
 147            {
 206148                EventHubsEventSource.Log.EventHubClientCreateStart(host, eventHubName);
 149
 206150                ServiceEndpoint = new UriBuilder
 206151                {
 206152                    Scheme = clientOptions.TransportType.GetUriScheme(),
 206153                    Host = host
 206154
 206155                }.Uri;
 156
 206157                EventHubName = eventHubName;
 206158                Credential = credential;
 206159                MessageConverter = messageConverter ?? new AmqpMessageConverter();
 206160                ConnectionScope = connectionScope ?? new AmqpConnectionScope(ServiceEndpoint, eventHubName, credential, 
 161
 206162                ManagementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
 274163                    timeout => ConnectionScope.OpenManagementLinkAsync(timeout, CancellationToken.None),
 206164                    link =>
 206165                    {
 0166                        link.Session?.SafeClose();
 0167                        link.SafeClose();
 0168                    });
 206169            }
 170            finally
 171            {
 206172                EventHubsEventSource.Log.EventHubClientCreateComplete(host, eventHubName);
 206173            }
 206174        }
 175
 176        /// <summary>
 177        ///   Retrieves information about an Event Hub, including the number of partitions present
 178        ///   and their identifiers.
 179        /// </summary>
 180        ///
 181        /// <param name="retryPolicy">The retry policy to use as the basis for retrieving the information.</param>
 182        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 183        ///
 184        /// <returns>The set of information for the Event Hub that this client is associated with.</returns>
 185        ///
 186        public override async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetryPolicy retryPolicy,
 187                                                                          CancellationToken cancellationToken)
 188        {
 24189            Argument.AssertNotClosed(_closed, nameof(AmqpClient));
 22190            Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
 191
 20192            var failedAttemptCount = 0;
 20193            var retryDelay = default(TimeSpan?);
 194
 20195            var stopWatch = ValueStopwatch.StartNew();
 196
 197            try
 198            {
 20199                var tryTimeout = retryPolicy.CalculateTryTimeout(0);
 200
 38201                while (!cancellationToken.IsCancellationRequested)
 202                {
 203                    try
 204                    {
 36205                        EventHubsEventSource.Log.GetPropertiesStart(EventHubName);
 206
 207                        // Create the request message and the management link.
 208
 36209                        var token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
 36210                        using AmqpMessage request = MessageConverter.CreateEventHubPropertiesRequest(EventHubName, token
 36211                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 212
 34213                        RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.
 0214                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 215
 216                        // Send the request and wait for the response.
 217
 0218                        using AmqpMessage response = await link.RequestAsync(request, tryTimeout.CalculateRemaining(stop
 0219                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 220
 221                        // Process the response.
 222
 0223                        AmqpError.ThrowIfErrorResponse(response, EventHubName);
 0224                        return MessageConverter.CreateEventHubPropertiesFromResponse(response);
 225                    }
 226                    catch (Exception ex)
 227                    {
 36228                        Exception activeEx = ex.TranslateServiceException(EventHubName);
 229
 230                        // Determine if there should be a retry for the next attempt; if so enforce the delay but do not
 231                        // Otherwise, mark the exception as active and break out of the loop.
 232
 36233                        ++failedAttemptCount;
 36234                        retryDelay = retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
 235
 36236                        if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellation
 237                        {
 18238                            EventHubsEventSource.Log.GetPropertiesError(EventHubName, activeEx.Message);
 18239                            await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
 240
 18241                            tryTimeout = retryPolicy.CalculateTryTimeout(failedAttemptCount);
 18242                            stopWatch = ValueStopwatch.StartNew();
 243                        }
 18244                        else if (ex is AmqpException)
 245                        {
 0246                            ExceptionDispatchInfo.Capture(activeEx).Throw();
 247                        }
 248                        else
 249                        {
 18250                            throw;
 251                        }
 252                    }
 253                }
 254
 255                // If no value has been returned nor exception thrown by this point,
 256                // then cancellation has been requested.
 257
 2258                throw new TaskCanceledException();
 259            }
 20260            catch (Exception ex)
 261            {
 20262                EventHubsEventSource.Log.GetPropertiesError(EventHubName, ex.Message);
 20263                throw;
 264            }
 265            finally
 266            {
 20267                EventHubsEventSource.Log.GetPropertiesComplete(EventHubName);
 268            }
 0269        }
 270
 271        /// <summary>
 272        ///   Retrieves information about a specific partition for an Event Hub, including elements that describe the av
 273        ///   events in the partition event stream.
 274        /// </summary>
 275        ///
 276        /// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
 277        /// <param name="retryPolicy">The retry policy to use as the basis for retrieving the information.</param>
 278        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 279        ///
 280        /// <returns>The set of information for the requested partition under the Event Hub this client is associated wi
 281        ///
 282        public override async Task<PartitionProperties> GetPartitionPropertiesAsync(string partitionId,
 283                                                                                    EventHubsRetryPolicy retryPolicy,
 284                                                                                    CancellationToken cancellationToken)
 285        {
 28286            Argument.AssertNotClosed(_closed, nameof(AmqpClient));
 26287            Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
 22288            Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
 289
 20290            var failedAttemptCount = 0;
 20291            var retryDelay = default(TimeSpan?);
 20292            var token = default(string);
 293            var link = default(RequestResponseAmqpLink);
 294
 20295            var stopWatch = ValueStopwatch.StartNew();
 296
 297            try
 298            {
 20299                var tryTimeout = retryPolicy.CalculateTryTimeout(0);
 300
 38301                while (!cancellationToken.IsCancellationRequested)
 302                {
 303                    try
 304                    {
 36305                        EventHubsEventSource.Log.GetPartitionPropertiesStart(EventHubName, partitionId);
 306
 307                        // Create the request message and the management link.
 308
 36309                        token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
 36310                        using AmqpMessage request = MessageConverter.CreatePartitionPropertiesRequest(EventHubName, part
 36311                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 312
 34313                        link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeo
 0314                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 315
 316                        // Send the request and wait for the response.
 317
 0318                        using AmqpMessage response = await link.RequestAsync(request, tryTimeout.CalculateRemaining(stop
 0319                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 320
 321                        // Process the response.
 322
 0323                        AmqpError.ThrowIfErrorResponse(response, EventHubName);
 0324                        return MessageConverter.CreatePartitionPropertiesFromResponse(response);
 325                    }
 326                    catch (Exception ex)
 327                    {
 36328                        Exception activeEx = ex.TranslateServiceException(EventHubName);
 329
 330                        // Determine if there should be a retry for the next attempt; if so enforce the delay but do not
 331                        // Otherwise, mark the exception as active and break out of the loop.
 332
 36333                        ++failedAttemptCount;
 36334                        retryDelay = retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
 335
 36336                        if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellation
 337                        {
 18338                            EventHubsEventSource.Log.GetPartitionPropertiesError(EventHubName, partitionId, activeEx.Mes
 18339                            await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
 340
 18341                            tryTimeout = retryPolicy.CalculateTryTimeout(failedAttemptCount);
 18342                            stopWatch = ValueStopwatch.StartNew();
 343                        }
 18344                        else if (ex is AmqpException)
 345                        {
 0346                            ExceptionDispatchInfo.Capture(activeEx).Throw();
 347                        }
 348                        else
 349                        {
 18350                            throw;
 351                        }
 352                    }
 353                }
 354
 355                // If no value has been returned nor exception thrown by this point,
 356                // then cancellation has been requested.
 357
 2358                throw new TaskCanceledException();
 359            }
 20360            catch (Exception ex)
 361            {
 20362                EventHubsEventSource.Log.GetPartitionPropertiesError(EventHubName, partitionId, ex.Message);
 20363                throw;
 364            }
 365            finally
 366            {
 20367                EventHubsEventSource.Log.GetPartitionPropertiesComplete(EventHubName, partitionId);
 368            }
 0369        }
 370
 371        /// <summary>
 372        ///   Creates a producer strongly aligned with the active protocol and transport,
 373        ///   responsible for publishing <see cref="EventData" /> to the Event Hub.
 374        /// </summary>
 375        ///
 376        /// <param name="partitionId">The identifier of the partition to which the transport producer should be bound; i
 377        /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
 378        ///
 379        /// <returns>A <see cref="TransportProducer"/> configured in the requested manner.</returns>
 380        ///
 381        public override TransportProducer CreateProducer(string partitionId,
 382                                                         EventHubsRetryPolicy retryPolicy)
 383        {
 14384            Argument.AssertNotClosed(_closed, nameof(AmqpClient));
 385
 12386            return new AmqpProducer
 12387            (
 12388                EventHubName,
 12389                partitionId,
 12390                ConnectionScope,
 12391                MessageConverter,
 12392                retryPolicy
 12393            );
 394        }
 395
 396        /// <summary>
 397        ///   Creates a consumer strongly aligned with the active protocol and transport, responsible
 398        ///   for reading <see cref="EventData" /> from a specific Event Hub partition, in the context
 399        ///   of a specific consumer group.
 400        ///
 401        ///   A consumer may be exclusive, which asserts ownership over the partition for the consumer
 402        ///   group to ensure that only one consumer from that group is reading the from the partition.
 403        ///   These exclusive consumers are sometimes referred to as "Epoch Consumers."
 404        ///
 405        ///   A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
 406        ///   group to be actively reading events from the partition.  These non-exclusive consumers are
 407        ///   sometimes referred to as "Non-epoch Consumers."
 408        ///
 409        ///   Designating a consumer as exclusive may be specified by setting the <paramref name="ownerLevel" />.
 410        ///   When <c>null</c>, consumers are created as non-exclusive.
 411        /// </summary>
 412        ///
 413        /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea
 414        /// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</par
 415        /// <param name="eventPosition">The position within the partition where the consumer should begin reading events
 416        /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
 417        /// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on t
 418        /// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this va
 419        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 420        ///
 421        /// <returns>A <see cref="TransportConsumer" /> configured in the requested manner.</returns>
 422        ///
 423        public override TransportConsumer CreateConsumer(string consumerGroup,
 424                                                         string partitionId,
 425                                                         EventPosition eventPosition,
 426                                                         EventHubsRetryPolicy retryPolicy,
 427                                                         bool trackLastEnqueuedEventProperties,
 428                                                         long? ownerLevel,
 429                                                         uint? prefetchCount)
 430        {
 64431            Argument.AssertNotClosed(_closed, nameof(AmqpClient));
 432
 62433            return new AmqpConsumer
 62434            (
 62435                EventHubName,
 62436                consumerGroup,
 62437                partitionId,
 62438                eventPosition,
 62439                trackLastEnqueuedEventProperties,
 62440                ownerLevel,
 62441                prefetchCount,
 62442                ConnectionScope,
 62443                MessageConverter,
 62444                retryPolicy
 62445            );
 446        }
 447
 448        /// <summary>
 449        ///   Closes the connection to the transport client instance.
 450        /// </summary>
 451        ///
 452        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 453        ///
 454        public override async Task CloseAsync(CancellationToken cancellationToken)
 455        {
 30456            if (_closed)
 457            {
 0458                return;
 459            }
 460
 30461            _closed = true;
 462
 30463            var clientId = GetHashCode().ToString(CultureInfo.InvariantCulture);
 30464            var clientType = GetType().Name;
 465
 466            try
 467            {
 30468                EventHubsEventSource.Log.ClientCloseStart(clientType, EventHubName, clientId);
 30469                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 470
 28471                if (ManagementLink?.TryGetOpenedObject(out var _) == true)
 472                {
 0473                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0474                    await ManagementLink.CloseAsync().ConfigureAwait(false);
 475                }
 476
 28477                ManagementLink?.Dispose();
 28478                ConnectionScope?.Dispose();
 28479            }
 2480            catch (Exception ex)
 481            {
 2482                _closed = false;
 2483                EventHubsEventSource.Log.ClientCloseError(clientType, EventHubName, clientId, ex.Message);
 484
 2485                throw;
 486            }
 487            finally
 488            {
 30489                EventHubsEventSource.Log.ClientCloseComplete(clientType, EventHubName, clientId);
 490            }
 28491        }
 492
 493        /// <summary>
 494        ///   Acquires an access token for authorization with the Event Hubs service.
 495        /// </summary>
 496        ///
 497        /// <returns>The token to use for service authorization.</returns>
 498        ///
 499        private async Task<string> AcquireAccessTokenAsync(CancellationToken cancellationToken)
 500        {
 72501            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 502
 72503            AccessToken activeToken = _accessToken;
 504
 505            // If there was no current token, or it is within the buffer for expiration, request a new token.
 506            // There is a benign race condition here, where there may be multiple requests in-flight for a new token.  S
 507            // overlapping requests should be within a small window, allow the acquired token to replace the current one
 508            // attempting to coordinate or ensure that the most recent is kept.
 509
 72510            if ((string.IsNullOrEmpty(activeToken.Token)) || (activeToken.ExpiresOn <= DateTimeOffset.UtcNow.Add(Credent
 511            {
 36512                activeToken = await Credential.GetTokenUsingDefaultScopeAsync(cancellationToken).ConfigureAwait(false);
 513
 36514                if ((string.IsNullOrEmpty(activeToken.Token)))
 515                {
 0516                    throw new AuthenticationException(Resources.CouldNotAcquireAccessToken);
 517                }
 518
 36519                _accessToken = activeToken;
 520            }
 521
 72522            return activeToken.Token;
 72523        }
 524
 525        /// <summary>
 526        ///   Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
 527        /// </summary>
 528        ///
 529        /// <param name="firstOption">The first option to consider.</param>
 530        /// <param name="secondOption">The second option to consider.</param>
 531        ///
 532        /// <returns></returns>
 533        ///
 534        private static TimeSpan UseMinimum(TimeSpan firstOption,
 68535                                           TimeSpan secondOption) => (firstOption < secondOption) ? firstOption : second
 536    }
 537}