< Summary

Class:Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpConnectionScope.cs
Covered lines:252
Uncovered lines:101
Coverable lines:353
Total lines:1012
Line coverage:71.3% (252 of 353)
Covered branches:23
Total branches:52
Branch coverage:44.2% (23 of 52)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_AmqpVersion()-100%100%
get_ConnectionIdleTimeout()-0%100%
get_AuthorizationRefreshBuffer()-100%100%
get_MinimumAuthorizationRefresh()-100%100%
get_AuthorizationRefreshTimeout()-100%100%
get_SessionTimeout()-100%100%
get_IsDisposed()-100%100%
set_IsDisposed(...)-100%100%
get_OperationCancellationSource()-100%100%
get_ActiveLinks()-100%100%
get_Id()-100%100%
get_ServiceEndpoint()-100%100%
get_EventHubName()-100%100%
get_TokenProvider()-100%100%
get_Transport()-100%100%
get_Proxy()-100%100%
get_ActiveConnection()-100%100%
.ctor(...)-100%100%
.ctor()-100%100%
OpenManagementLinkAsync()-100%100%
OpenConsumerLinkAsync()-100%100%
OpenProducerLinkAsync()-100%100%
Dispose()-85.71%50%
CreateAndOpenConnectionAsync()-0%0%
CreateManagementLinkAsync()-82.35%0%
CreateReceivingLinkAsync()-94.34%62.5%
CreateSendingLinkAsync()-93.02%0%
BeginTrackingLinkAsActive(...)-92.86%83.33%
CloseConnection(...)-100%100%
CalculateLinkAuthorizationRefreshInterval(...)-100%100%
CreateAuthorizationRefreshHandler(...)-79.59%100%
OpenAmqpObjectAsync()-0%0%
RequestAuthorizationUsingCbsAsync()-42.86%50%
CreateAmpqSettings(...)-0%100%
CreateTransportSettingsforTcp(...)-0%0%
CreateTransportSettingsForWebSockets(...)-0%0%
CreateAmqpConnectionSettings(...)-0%0%
ValidateTransport(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpConnectionScope.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Concurrent;
 6using System.Collections.Generic;
 7using System.Globalization;
 8using System.Net;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Azure.Core;
 12using Azure.Core.Diagnostics;
 13using Azure.Messaging.EventHubs.Authorization;
 14using Azure.Messaging.EventHubs.Consumer;
 15using Azure.Messaging.EventHubs.Core;
 16using Azure.Messaging.EventHubs.Diagnostics;
 17using Microsoft.Azure.Amqp;
 18using Microsoft.Azure.Amqp.Encoding;
 19using Microsoft.Azure.Amqp.Framing;
 20using Microsoft.Azure.Amqp.Sasl;
 21using Microsoft.Azure.Amqp.Transport;
 22
 23namespace Azure.Messaging.EventHubs.Amqp
 24{
 25    /// <summary>
 26    ///   Defines a context for AMQP operations which can be shared amongst the different
 27    ///   client types within a given scope.
 28    /// </summary>
 29    ///
 30    internal class AmqpConnectionScope : IDisposable
 31    {
 32        /// <summary>The name to assign to the SASL handler to specify that CBS tokens are in use.</summary>
 33        private const string CbsSaslHandlerName = "MSSBCBS";
 34
 35        /// <summary>The suffix to attach to the resource path when using web sockets for service communication.</summar
 36        private const string WebSocketsPathSuffix = "/$servicebus/websocket/";
 37
 38        /// <summary>The URI scheme to apply when using web sockets for service communication.</summary>
 39        private const string WebSocketsUriScheme = "wss";
 40
 41        /// <summary>The string formatting mask to apply to the service endpoint to consume events for a given consumer 
 42        private const string ConsumerPathSuffixMask = "{0}/ConsumerGroups/{1}/Partitions/{2}";
 43
 44        /// <summary>The string formatting mask to apply to the service endpoint to publish events for a given partition
 45        private const string PartitionProducerPathSuffixMask = "{0}/Partitions/{1}";
 46
 47        /// <summary>Indicates whether or not this instance has been disposed.</summary>
 48        private volatile bool _disposed = false;
 49
 50        /// <summary>
 51        ///   The version of AMQP to use within the scope.
 52        /// </summary>
 53        ///
 4254        private static Version AmqpVersion { get; } = new Version(1, 0, 0, 0);
 55
 56        /// <summary>
 57        ///   The amount of time to allow an AMQP connection to be idle before considering
 58        ///   it to be timed out.
 59        /// </summary>
 60        ///
 061        private static TimeSpan ConnectionIdleTimeout { get; } = TimeSpan.FromMinutes(1);
 62
 63        /// <summary>
 64        ///   The amount of buffer to apply to account for clock skew when
 65        ///   refreshing authorization.  Authorization will be refreshed earlier
 66        ///   than the expected expiration by this amount.
 67        /// </summary>
 68        ///
 2869        private static TimeSpan AuthorizationRefreshBuffer { get; } = TimeSpan.FromMinutes(5);
 70
 71        /// <summary>
 72        ///   The minimum amount of time for authorization to be refreshed; any calculations that
 73        ///   call for refreshing more frequently will be substituted with this value.
 74        /// </summary>
 75        ///
 3076        private static TimeSpan MinimumAuthorizationRefresh { get; } = TimeSpan.FromMinutes(4);
 77
 78        /// <summary>
 79        ///   The amount time to allow to refresh authorization of an AMQP link.
 80        /// </summary>
 81        ///
 2882        private static TimeSpan AuthorizationRefreshTimeout { get; } = TimeSpan.FromMinutes(3);
 83
 84        /// <summary>
 85        ///   The recommended timeout to associate with an AMQP session.  It is recommended that this
 86        ///   interval be used when creating or opening AMQP links and related constructs.
 87        /// </summary>
 88        ///
 66289        public TimeSpan SessionTimeout { get; } = TimeSpan.FromSeconds(30);
 90
 91        /// <summary>
 92        ///   Indicates whether this <see cref="AmqpConnectionScope"/> has been disposed.
 93        /// </summary>
 94        ///
 95        /// <value><c>true</c> if disposed; otherwise, <c>false</c>.</value>
 96        ///
 97        public bool IsDisposed
 98        {
 19899            get => _disposed;
 58100            private set => _disposed = value;
 101        }
 102
 103        /// <summary>
 104        ///   The cancellation token to use with operations initiated by the scope.
 105        /// </summary>
 106        ///
 784107        private CancellationTokenSource OperationCancellationSource { get; } = new CancellationTokenSource();
 108
 109        /// <summary>
 110        ///   The set of active AMQP links associated with the connection scope.  These are considered children
 111        ///   of the active connection and should be managed as such.
 112        /// </summary>
 113        ///
 496114        private ConcurrentDictionary<AmqpObject, Timer> ActiveLinks { get; } = new ConcurrentDictionary<AmqpObject, Time
 115
 116        /// <summary>
 117        ///   The unique identifier of the scope.
 118        /// </summary>
 119        ///
 66120        private string Id { get; }
 121
 122        /// <summary>
 123        ///   The endpoint for the Event Hubs service to which the scope is associated.
 124        /// </summary>
 125        ///
 74126        private Uri ServiceEndpoint { get; }
 127
 128        /// <summary>
 129        ///   The name of the Event Hub to which the scope is associated.
 130        /// </summary>
 131        ///
 44132        private string EventHubName { get; }
 133
 134        /// <summary>
 135        ///   The provider to use for obtaining a token for authorization with the Event Hubs service.
 136        /// </summary>
 137        ///
 52138        private CbsTokenProvider TokenProvider { get; }
 139
 140        /// <summary>
 141        ///   The type of transport to use for communication.
 142        /// </summary>
 143        ///
 40144        private EventHubsTransportType Transport { get; }
 145
 146        /// <summary>
 147        ///   The proxy, if any, which should be used for communication.
 148        /// </summary>
 149        ///
 40150        private IWebProxy Proxy { get; }
 151
 152        /// <summary>
 153        ///   The AMQP connection that is active for the current scope.
 154        /// </summary>
 155        ///
 104156        private FaultTolerantAmqpObject<AmqpConnection> ActiveConnection { get; }
 157
 158        /// <summary>
 159        ///   Initializes a new instance of the <see cref="AmqpConnectionScope"/> class.
 160        /// </summary>
 161        ///
 162        /// <param name="serviceEndpoint">Endpoint for the Event Hubs service to which the scope is associated.</param>
 163        /// <param name="eventHubName"> The name of the Event Hub to which the scope is associated.</param>
 164        /// <param name="credential">The credential to use for authorization with the Event Hubs service.</param>
 165        /// <param name="transport">The transport to use for communication.</param>
 166        /// <param name="proxy">The proxy, if any, to use for communication.</param>
 167        /// <param name="identifier">The identifier to assign this scope; if not provided, one will be generated.</param
 168        ///
 244169        public AmqpConnectionScope(Uri serviceEndpoint,
 244170                                   string eventHubName,
 244171                                   EventHubTokenCredential credential,
 244172                                   EventHubsTransportType transport,
 244173                                   IWebProxy proxy,
 244174                                   string identifier = default)
 175        {
 244176            Argument.AssertNotNull(serviceEndpoint, nameof(serviceEndpoint));
 242177            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 240178            Argument.AssertNotNull(credential, nameof(credential));
 238179            ValidateTransport(transport);
 180
 236181            ServiceEndpoint = serviceEndpoint;
 236182            EventHubName = eventHubName;
 236183            Transport = transport;
 236184            Proxy = proxy;
 236185            TokenProvider = new CbsTokenProvider(new EventHubTokenCredential(credential, serviceEndpoint.ToString()), Op
 236186            Id = identifier ?? $"{ eventHubName }-{ Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture).Substring
 187
 188#pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for t
 40189            Task<AmqpConnection> connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, Servic
 190#pragma warning restore CA2214 // Do not call overridable methods in constructors
 191
 236192            ActiveConnection = new FaultTolerantAmqpObject<AmqpConnection>(connectionFactory, CloseConnection);
 236193        }
 194
 195        /// <summary>
 196        ///   Initializes a new instance of the <see cref="AmqpConnectionScope"/> class.
 197        /// </summary>
 198        ///
 186199        protected AmqpConnectionScope()
 200        {
 186201        }
 202
 203        /// <summary>
 204        ///   Opens an AMQP link for use with management operations.
 205        /// </summary>
 206        ///
 207        /// <param name="timeout">The timeout to apply when creating the link.</param>
 208        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 209        ///
 210        /// <returns>A link for use with management operations.</returns>
 211        ///
 212        /// <remarks>
 213        ///   The authorization for this link does not require periodic
 214        ///   refreshing.
 215        /// </remarks>
 216        ///
 217        public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(TimeSpan timeout,
 218                                                                                   CancellationToken cancellationToken)
 219        {
 12220            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 221
 10222            var stopWatch = ValueStopwatch.StartNew();
 10223            var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
 8224            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 225
 8226            var link = await CreateManagementLinkAsync(connection, timeout.CalculateRemaining(stopWatch.GetElapsedTime()
 8227            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 228
 8229            await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false
 8230            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 231
 8232            return link;
 8233        }
 234
 235        /// <summary>
 236        ///   Opens an AMQP link for use with consumer operations.
 237        /// </summary>
 238        ///
 239        /// <param name="consumerGroup">The name of the consumer group in the context of which events should be received
 240        /// <param name="partitionId">The identifier of the Event Hub partition from which events should be received.</p
 241        /// <param name="eventPosition">The position of the event in the partition where the link should be filtered to.
 242        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 243        /// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this va
 244        /// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on t
 245        /// <param name="timeout">The timeout to apply when creating the link.</param>
 246        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 247        ///
 248        /// <returns>A link for use with consumer operations.</returns>
 249        ///
 250        public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consumerGroup,
 251                                                                           string partitionId,
 252                                                                           EventPosition eventPosition,
 253                                                                           TimeSpan timeout,
 254                                                                           uint prefetchCount,
 255                                                                           long? ownerLevel,
 256                                                                           bool trackLastEnqueuedEventProperties,
 257                                                                           CancellationToken cancellationToken)
 258        {
 30259            Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
 26260            Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
 261
 22262            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 263
 20264            var stopWatch = ValueStopwatch.StartNew();
 20265            var consumerEndpoint = new Uri(ServiceEndpoint, string.Format(CultureInfo.InvariantCulture, ConsumerPathSuff
 266
 20267            var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
 18268            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 269
 18270            var link = await CreateReceivingLinkAsync(
 18271                connection,
 18272                consumerEndpoint,
 18273                eventPosition,
 18274                timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
 18275                prefetchCount,
 18276                ownerLevel,
 18277                trackLastEnqueuedEventProperties,
 18278                cancellationToken
 18279            ).ConfigureAwait(false);
 280
 18281            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 282
 18283            await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false
 18284            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 285
 18286            return link;
 18287        }
 288
 289        /// <summary>
 290        ///   Opens an AMQP link for use with producer operations.
 291        /// </summary>
 292        ///
 293        /// <param name="partitionId">The identifier of the Event Hub partition to which the link should be bound; if un
 294        /// <param name="timeout">The timeout to apply when creating the link.</param>
 295        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 296        ///
 297        /// <returns>A link for use with producer operations.</returns>
 298        ///
 299        public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitionId,
 300                                                                         TimeSpan timeout,
 301                                                                         CancellationToken cancellationToken)
 302        {
 16303            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 304
 14305            var stopWatch = ValueStopwatch.StartNew();
 14306            var path = (string.IsNullOrEmpty(partitionId)) ? EventHubName : string.Format(CultureInfo.InvariantCulture, 
 14307            var producerEndpoint = new Uri(ServiceEndpoint, path);
 308
 14309            var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
 12310            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 311
 12312            var link = await CreateSendingLinkAsync(connection, producerEndpoint, timeout.CalculateRemaining(stopWatch.G
 12313            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 314
 12315            await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false
 12316            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 317
 12318            return link;
 12319        }
 320
 321        /// <summary>
 322        ///   Performs the task needed to clean up resources used by the <see cref="AmqpConnectionScope" />,
 323        ///   including ensuring that the client itself has been closed.
 324        /// </summary>
 325        ///
 326        public void Dispose()
 327        {
 58328            if (IsDisposed)
 329            {
 0330                return;
 331            }
 332
 58333            ActiveConnection?.Dispose();
 58334            OperationCancellationSource.Cancel();
 58335            OperationCancellationSource.Dispose();
 336
 58337            IsDisposed = true;
 58338        }
 339
 340        /// <summary>
 341        ///   Creates an AMQP connection for a given scope.
 342        /// </summary>
 343        ///
 344        /// <param name="amqpVersion">The version of AMQP to use for the connection.</param>
 345        /// <param name="serviceEndpoint">The endpoint for the Event Hubs service to which the scope is associated.</par
 346        /// <param name="transportType">The type of transport to use for communication.</param>
 347        /// <param name="proxy">The proxy, if any, to use for communication.</param>
 348        /// <param name="scopeIdentifier">The unique identifier for the associated scope.</param>
 349        /// <param name="timeout">The timeout to consider when creating the connection.</param>
 350        ///
 351        /// <returns>An AMQP connection that may be used for communicating with the Event Hubs service.</returns>
 352        ///
 353        protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(Version amqpVersion,
 354                                                                                  Uri serviceEndpoint,
 355                                                                                  EventHubsTransportType transportType,
 356                                                                                  IWebProxy proxy,
 357                                                                                  string scopeIdentifier,
 358                                                                                  TimeSpan timeout)
 359        {
 0360            var hostName = serviceEndpoint.Host;
 0361            AmqpSettings amqpSettings = CreateAmpqSettings(AmqpVersion);
 0362            AmqpConnectionSettings connectionSetings = CreateAmqpConnectionSettings(hostName, scopeIdentifier);
 363
 0364            TransportSettings transportSettings = transportType.IsWebSocketTransport()
 0365                ? CreateTransportSettingsForWebSockets(hostName, proxy)
 0366                : CreateTransportSettingsforTcp(hostName, serviceEndpoint.Port);
 367
 368            // Create and open the connection, respecting the timeout constraint
 369            // that was received.
 370
 0371            var stopWatch = ValueStopwatch.StartNew();
 372
 0373            var initiator = new AmqpTransportInitiator(amqpSettings, transportSettings);
 0374            TransportBase transport = await initiator.ConnectTaskAsync(timeout).ConfigureAwait(false);
 375
 0376            var connection = new AmqpConnection(transport, amqpSettings, connectionSetings);
 0377            await OpenAmqpObjectAsync(connection, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait
 378
 379#pragma warning disable CA1806 // Do not ignore method results
 380            // Create the CBS link that will be used for authorization.  The act of creating the link will associate
 381            // it with the connection.
 0382            new AmqpCbsLink(connection);
 383#pragma warning restore CA1806 // Do not ignore method results
 384
 385            // When the connection is closed, close each of the links associated with it.
 386
 0387            EventHandler closeHandler = null;
 388
 0389            closeHandler = (snd, args) =>
 0390            {
 0391                foreach (var link in ActiveLinks.Keys)
 0392                {
 0393                    link.SafeClose();
 0394                }
 0395
 0396                connection.Closed -= closeHandler;
 0397            };
 398
 0399            connection.Closed += closeHandler;
 0400            return connection;
 0401        }
 402
 403        /// <summary>
 404        ///   Creates an AMQP link for use with management operations.
 405        /// </summary>
 406        ///
 407        /// <param name="connection">The active and opened AMQP connection to use for this link.</param>
 408        /// <param name="timeout">The timeout to apply when creating the link.</param>
 409        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 410        ///
 411        /// <returns>A link for use with management operations.</returns>
 412        ///
 413        protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(AmqpConnection connection,
 414                                                                                        TimeSpan timeout,
 415                                                                                        CancellationToken cancellationTo
 416        {
 4417            Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
 4418            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 419
 4420            var session = default(AmqpSession);
 4421            var stopWatch = ValueStopwatch.StartNew();
 422
 423            try
 424            {
 425                // Create and open the AMQP session associated with the link.
 426
 4427                var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
 4428                session = connection.CreateSession(sessionSettings);
 429
 4430                await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
 4431                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 432
 433                // Create and open the link.
 434
 4435                var linkSettings = new AmqpLinkSettings();
 4436                linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime
 437
 4438                var link = new RequestResponseAmqpLink(AmqpManagement.LinkType, session, AmqpManagement.Address, linkSet
 439
 440                // Track the link before returning it, so that it can be managed with the scope.
 441
 4442                BeginTrackingLinkAsActive(link);
 4443                return link;
 444            }
 0445            catch
 446            {
 447                // Aborting the session will perform any necessary cleanup of
 448                // the associated link as well.
 449
 0450                session?.Abort();
 0451                throw;
 452            }
 4453        }
 454
 455        /// <summary>
 456        ///   Creates an AMQP link for use with receiving operations.
 457        /// </summary>
 458        ///
 459        /// <param name="connection">The active and opened AMQP connection to use for this link.</param>
 460        /// <param name="endpoint">The fully qualified endpoint to open the link for.</param>
 461        /// <param name="eventPosition">The position of the event in the partition where the link should be filtered to.
 462        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 463        /// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this va
 464        /// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on t
 465        /// <param name="timeout">The timeout to apply when creating the link.</param>
 466        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 467        ///
 468        /// <returns>A link for use for operations related to receiving events.</returns>
 469        ///
 470        protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpConnection connection,
 471                                                                                 Uri endpoint,
 472                                                                                 EventPosition eventPosition,
 473                                                                                 TimeSpan timeout,
 474                                                                                 uint prefetchCount,
 475                                                                                 long? ownerLevel,
 476                                                                                 bool trackLastEnqueuedEventProperties,
 477                                                                                 CancellationToken cancellationToken)
 478        {
 16479            Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
 16480            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 481
 16482            var session = default(AmqpSession);
 16483            var stopWatch = ValueStopwatch.StartNew();
 484
 485            try
 486            {
 487                // Perform the initial authorization for the link.
 488
 16489                var authClaims = new[] { EventHubsClaim.Listen };
 16490                var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, TokenProvider, endpoint, end
 16491                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 492
 493                // Create and open the AMQP session associated with the link.
 494
 16495                var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
 16496                session = connection.CreateSession(sessionSettings);
 497
 16498                await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
 16499                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 500
 501                // Create and open the link.
 502
 16503                var filters = new FilterSet();
 16504                filters.Add(AmqpFilter.ConsumerFilterName, AmqpFilter.CreateConsumerFilter(AmqpFilter.BuildFilterExpress
 505
 16506                var linkSettings = new AmqpLinkSettings
 16507                {
 16508                    Role = true,
 16509                    TotalLinkCredit = prefetchCount,
 16510                    AutoSendFlow = prefetchCount > 0,
 16511                    SettleType = SettleMode.SettleOnSend,
 16512                    Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters },
 16513                    Target = new Target { Address = Guid.NewGuid().ToString() }
 16514                };
 515
 16516                linkSettings.AddProperty(AmqpProperty.EntityType, (int)AmqpProperty.Entity.ConsumerGroup);
 517
 16518                if (ownerLevel.HasValue)
 519                {
 8520                    linkSettings.AddProperty(AmqpProperty.OwnerLevel, ownerLevel.Value);
 521                }
 522
 16523                if (trackLastEnqueuedEventProperties)
 524                {
 4525                    linkSettings.DesiredCapabilities = new Multiple<AmqpSymbol>(new List<AmqpSymbol>
 4526                    {
 4527                        AmqpProperty.TrackLastEnqueuedEventProperties
 4528                    });
 529                }
 530
 16531                var link = new ReceivingAmqpLink(linkSettings);
 16532                linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }";
 16533                link.AttachTo(session);
 534
 535                // Configure refresh for authorization of the link.
 536
 16537                var refreshTimer = default(Timer);
 538
 16539                var refreshHandler = CreateAuthorizationRefreshHandler
 16540                (
 16541                    connection,
 16542                    link,
 16543                    TokenProvider,
 16544                    endpoint,
 16545                    endpoint.AbsoluteUri,
 16546                    endpoint.AbsoluteUri,
 16547                    authClaims,
 16548                    AuthorizationRefreshTimeout,
 18549                    () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null)
 16550                );
 551
 16552                refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationU
 553
 554                // Track the link before returning it, so that it can be managed with the scope.
 555
 16556                BeginTrackingLinkAsActive(link, refreshTimer);
 16557                return link;
 558            }
 0559            catch
 560            {
 561                // Aborting the session will perform any necessary cleanup of
 562                // the associated link as well.
 563
 0564                session?.Abort();
 0565                throw;
 566            }
 16567        }
 568
 569        /// <summary>
 570        ///   Creates an AMQP link for use with publishing operations.
 571        /// </summary>
 572        ///
 573        /// <param name="connection">The active and opened AMQP connection to use for this link.</param>
 574        /// <param name="endpoint">The fully qualified endpoint to open the link for.</param>
 575        /// <param name="timeout">The timeout to apply when creating the link.</param>
 576        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 577        ///
 578        /// <returns>A link for use for operations related to receiving events.</returns>
 579        ///
 580        protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(AmqpConnection connection,
 581                                                                             Uri endpoint,
 582                                                                             TimeSpan timeout,
 583                                                                             CancellationToken cancellationToken)
 584        {
 10585            Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
 10586            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 587
 10588            var session = default(AmqpSession);
 10589            var stopWatch = ValueStopwatch.StartNew();
 590
 591            try
 592            {
 593                // Perform the initial authorization for the link.
 594
 10595                var authClaims = new[] { EventHubsClaim.Send };
 10596                var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, TokenProvider, endpoint, end
 10597                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 598
 599                // Create and open the AMQP session associated with the link.
 600
 10601                var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
 10602                session = connection.CreateSession(sessionSettings);
 603
 10604                await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
 10605                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 606
 607                // Create and open the link.
 608
 10609                var linkSettings = new AmqpLinkSettings
 10610                {
 10611                    Role = false,
 10612                    InitialDeliveryCount = 0,
 10613                    Source = new Source { Address = Guid.NewGuid().ToString() },
 10614                    Target = new Target { Address = endpoint.AbsolutePath }
 10615                };
 616
 10617                linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime
 10618                linkSettings.AddProperty(AmqpProperty.EntityType, (int)AmqpProperty.Entity.EventHub);
 619
 10620                var link = new SendingAmqpLink(linkSettings);
 10621                linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }";
 10622                link.AttachTo(session);
 623
 624                // Configure refresh for authorization of the link.
 625
 10626                var refreshTimer = default(Timer);
 627
 10628                var refreshHandler = CreateAuthorizationRefreshHandler
 10629                (
 10630                    connection,
 10631                    link,
 10632                    TokenProvider,
 10633                    endpoint,
 10634                    endpoint.AbsoluteUri,
 10635                    endpoint.AbsoluteUri,
 10636                    authClaims,
 10637                    AuthorizationRefreshTimeout,
 12638                    () => refreshTimer
 10639                );
 640
 10641                refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationU
 642
 643                // Track the link before returning it, so that it can be managed with the scope.
 644
 10645                BeginTrackingLinkAsActive(link, refreshTimer);
 10646                return link;
 647            }
 0648            catch
 649            {
 650                // Aborting the session will perform any necessary cleanup of
 651                // the associated link as well.
 652
 0653                session?.Abort();
 0654                throw;
 655            }
 10656        }
 657
 658        /// <summary>
 659        ///   Performs the actions needed to configure and begin tracking the specified AMQP
 660        ///   link as an active link bound to this scope.
 661        /// </summary>
 662        ///
 663        /// <param name="link">The link to begin tracking.</param>
 664        /// <param name="authorizationRefreshTimer">The timer used to manage refreshing authorization, if the link requi
 665        ///
 666        /// <remarks>
 667        ///   This method does operate on the specified <paramref name="link"/> in order to configure it
 668        ///   for active tracking; no assumptions are made about the open/connected state of the link nor are
 669        ///   its communication properties modified.
 670        /// </remarks>
 671        ///
 672        protected virtual void BeginTrackingLinkAsActive(AmqpObject link,
 673                                                         Timer authorizationRefreshTimer = null)
 674        {
 675            // Register the link as active and having authorization automatically refreshed, so that it can be
 676            // managed with the scope.
 677
 30678            if (!ActiveLinks.TryAdd(link, authorizationRefreshTimer))
 679            {
 0680                throw new EventHubsException(true, EventHubName, Resources.CouldNotCreateLink);
 681            }
 682
 683            // When the link is closed, stop refreshing authorization and remove it from the
 684            // set of associated links.
 685
 30686            var closeHandler = default(EventHandler);
 687
 30688            closeHandler = (snd, args) =>
 30689            {
 44690                ActiveLinks.TryRemove(link, out var timer);
 30691
 44692                timer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 44693                timer?.Dispose();
 30694
 44695                link.Closed -= closeHandler;
 44696            };
 697
 30698            link.Closed += closeHandler;
 30699        }
 700
 701        /// <summary>
 702        ///   Performs the tasks needed to close a connection.
 703        /// </summary>
 704        ///
 705        /// <param name="connection">The connection to close.</param>
 706        ///
 22707        protected virtual void CloseConnection(AmqpConnection connection) => connection.SafeClose();
 708
 709        /// <summary>
 710        ///   Calculates the interval after which authorization for an AMQP link should be
 711        ///   refreshed.
 712        /// </summary>
 713        ///
 714        /// <param name="expirationTimeUtc">The date/time, in UTC, that the current authorization is expected to expire.
 715        ///
 716        /// <returns>The interval after which authorization should be refreshed.</returns>
 717        ///
 718        protected virtual TimeSpan CalculateLinkAuthorizationRefreshInterval(DateTime expirationTimeUtc)
 719        {
 26720            var refreshDueInterval = (expirationTimeUtc.Subtract(DateTime.UtcNow)).Add(AuthorizationRefreshBuffer);
 26721            return (refreshDueInterval < MinimumAuthorizationRefresh) ? MinimumAuthorizationRefresh : refreshDueInterval
 722        }
 723
 724        /// <summary>
 725        ///   Creates the timer event handler to support refreshing AMQP link authorization
 726        ///   on a recurring basis.
 727        /// </summary>
 728        ///
 729        /// <param name="connection">The AMQP connection to which the link being refreshed is bound to.</param>
 730        /// <param name="amqpLink">The AMQO link to refresh authorization for.</param>
 731        /// <param name="tokenProvider">The <see cref="CbsTokenProvider" /> to use for obtaining access tokens.</param>
 732        /// <param name="endpoint">The Event Hubs service endpoint that the AMQP link is communicating with.</param>
 733        /// <param name="audience">The audience associated with the authorization.  This is likely the <paramref name="e
 734        /// <param name="resource">The resource associated with the authorization.  This is likely the <paramref name="e
 735        /// <param name="requiredClaims">The set of claims required to support the operations of the AMQP link.</param>
 736        /// <param name="refreshTimeout">The timeout to apply when requesting authorization refresh.</param>
 737        /// <param name="refreshTimerFactory">A function to allow retrieving the <see cref="Timer" /> associated with th
 738        ///
 739        /// <returns>A <see cref="TimerCallback"/> delegate to perform the refresh when a timer is due.</returns>
 740        ///
 741        protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection connection,
 742                                                                          AmqpObject amqpLink,
 743                                                                          CbsTokenProvider tokenProvider,
 744                                                                          Uri endpoint,
 745                                                                          string audience,
 746                                                                          string resource,
 747                                                                          string[] requiredClaims,
 748                                                                          TimeSpan refreshTimeout,
 749                                                                          Func<Timer> refreshTimerFactory)
 750        {
 22751            return async _ =>
 22752            {
 26753                EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshStart(EventHubName, endpoint.AbsoluteUri);
 26754                var refreshTimer = refreshTimerFactory();
 22755
 22756                try
 22757                {
 26758                    if (refreshTimer == null)
 22759                    {
 0760                        return;
 22761                    }
 22762
 26763                    var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, tokenProvider, endpoint,
 22764
 22765                    // Reset the timer for the next refresh.
 22766
 26767                    if (authExpirationUtc >= DateTimeOffset.UtcNow)
 22768                    {
 26769                        refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.Infini
 22770                    }
 26771                }
 0772                catch (ObjectDisposedException)
 22773                {
 22774                    // This can occur if the connection is closed or the scope disposed after the factory
 22775                    // is called but before the timer is updated.  The callback may also fire while the timer is
 22776                    // in the act of disposing.  Do not consider it an error.
 0777                }
 0778                catch (Exception ex)
 22779                {
 0780                    EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshError(EventHubName, endpoint.AbsoluteUri, ex.Me
 22781
 22782                    // Attempt to unset the timer; there's a decent chance that it has been disposed at this point or
 22783                    // that the connection has been closed.  Ignore potential exceptions, as they won't impact operation
 22784                    // At worse, another timer tick will occur and the operation will be retried.
 22785
 22786                    try
 22787                    {
 0788                        refreshTimer.Change(Timeout.Infinite, Timeout.Infinite);
 0789                    }
 0790                    catch
 22791                    {
 22792                        // Intentionally ignored.
 0793                    }
 0794                }
 22795                finally
 22796                {
 26797                    EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshComplete(EventHubName, endpoint.AbsoluteUri);
 22798                }
 26799            };
 800        }
 801
 802        /// <summary>
 803        ///   Performs the actions needed to open an AMQP object, such
 804        ///   as a session or link for use.
 805        /// </summary>
 806        ///
 807        /// <param name="target">The target AMQP object to open.</param>
 808        /// <param name="timeout">The timeout to apply when opening the link.</param>
 809        ///
 810        protected virtual async Task OpenAmqpObjectAsync(AmqpObject target,
 811                                                         TimeSpan timeout)
 812        {
 813            try
 814            {
 0815                await target.OpenAsync(timeout).ConfigureAwait(false);
 0816            }
 0817            catch
 818            {
 819                switch (target)
 820                {
 821                    case AmqpLink linkTarget:
 0822                        linkTarget.Session?.SafeClose();
 0823                        break;
 824                    case RequestResponseAmqpLink linkTarget:
 0825                        linkTarget.Session?.SafeClose();
 826                        break;
 827
 828                    default:
 829                        break;
 830                }
 831
 0832                target.SafeClose();
 0833                throw;
 834            }
 0835        }
 836
 837        /// <summary>
 838        ///   Requests authorization for a connection or link using a connection via the CBS mechanism.
 839        /// </summary>
 840        ///
 841        /// <param name="connection">The AMQP connection for which the authorization is associated.</param>
 842        /// <param name="tokenProvider">The <see cref="CbsTokenProvider" /> to use for obtaining access tokens.</param>
 843        /// <param name="endpoint">The Event Hubs service endpoint that the authorization is requested for.</param>
 844        /// <param name="audience">The audience associated with the authorization.  This is likely the <paramref name="e
 845        /// <param name="resource">The resource associated with the authorization.  This is likely the <paramref name="e
 846        /// <param name="requiredClaims">The set of claims required to support the operations of the AMQP link.</param>
 847        /// <param name="timeout">The timeout to apply when requesting authorization.</param>
 848        ///
 849        /// <returns>The date/time, in UTC, when the authorization expires.</returns>
 850        ///
 851        /// <remarks>
 852        ///   It is assumed that there is a valid <see cref="AmqpCbsLink" /> already associated
 853        ///   with the connection; this will be used as the transport for the authorization
 854        ///   credentials.
 855        /// </remarks>
 856        ///
 857        protected virtual async Task<DateTime> RequestAuthorizationUsingCbsAsync(AmqpConnection connection,
 858                                                                                 CbsTokenProvider tokenProvider,
 859                                                                                 Uri endpoint,
 860                                                                                 string audience,
 861                                                                                 string resource,
 862                                                                                 string[] requiredClaims,
 863                                                                                 TimeSpan timeout)
 864        {
 865            try
 866            {
 867                // If the connection is in the process of closing, then do not attempt to authorize but consider it an
 868                // unexpected scenario that should be treated as transient.
 869
 2870                if (connection.IsClosing())
 871                {
 2872                     throw new EventHubsException(true, EventHubName, Resources.UnknownCommunicationException, EventHubs
 873                }
 874
 0875                var authLink = connection.Extensions.Find<AmqpCbsLink>();
 0876                return await authLink.SendTokenAsync(TokenProvider, endpoint, audience, resource, requiredClaims, timeou
 877            }
 2878            catch (Exception ex) when ((ex is ObjectDisposedException) || (ex is OperationCanceledException))
 879            {
 880                // In the case where the attempt times out, a task cancellation occurs, which in other code paths is
 881                // considered a terminal exception.  In this case, it should be viewed as transient.
 882                //
 883                // When there's a race condition between sending the authorization request and the connection/link closi
 884                // link can sometimes be disposed when this call is taking place; because retries are likely to succeed,
 885                // this case transient.
 886                //
 887                // Wrap the source exception in a custom exception to ensure that it is eligible to be retried.
 888
 0889                throw new EventHubsException(true, EventHubName, Resources.UnknownCommunicationException, EventHubsExcep
 890            }
 0891        }
 892
 893        /// <summary>
 894        ///   Creates the settings to use for AMQP communication.
 895        /// </summary>
 896        ///
 897        /// <param name="amqpVersion">The version of AMQP to be used.</param>
 898        ///
 899        /// <returns>The settings for AMQP to use for communication with the Event Hubs service.</returns>
 900        ///
 901        private static AmqpSettings CreateAmpqSettings(Version amqpVersion)
 902        {
 0903            var saslProvider = new SaslTransportProvider();
 0904            saslProvider.Versions.Add(new AmqpVersion(amqpVersion));
 0905            saslProvider.AddHandler(new SaslAnonymousHandler(CbsSaslHandlerName));
 906
 0907            var amqpProvider = new AmqpTransportProvider();
 0908            amqpProvider.Versions.Add(new AmqpVersion(amqpVersion));
 909
 0910            var settings = new AmqpSettings();
 0911            settings.TransportProviders.Add(saslProvider);
 0912            settings.TransportProviders.Add(amqpProvider);
 913
 0914            return settings;
 915        }
 916
 917        /// <summary>
 918        ///  Creates the transport settings for use with TCP.
 919        /// </summary>
 920        ///
 921        /// <param name="hostName">The host name of the Event Hubs service endpoint.</param>
 922        /// <param name="port">The port to use for connecting to the endpoint.</param>
 923        ///
 924        /// <returns>The settings to use for transport.</returns>
 925        ///
 926        private static TransportSettings CreateTransportSettingsforTcp(string hostName,
 927                                                                       int port)
 928        {
 0929            var tcpSettings = new TcpTransportSettings
 0930            {
 0931                Host = hostName,
 0932                Port = port < 0 ? AmqpConstants.DefaultSecurePort : port,
 0933                ReceiveBufferSize = AmqpConstants.TransportBufferSize,
 0934                SendBufferSize = AmqpConstants.TransportBufferSize
 0935            };
 936
 0937            return new TlsTransportSettings(tcpSettings)
 0938            {
 0939                TargetHost = hostName,
 0940            };
 941        }
 942
 943        /// <summary>
 944        ///  Creates the transport settings for use with web sockets.
 945        /// </summary>
 946        ///
 947        /// <param name="hostName">The host name of the Event Hubs service endpoint.</param>
 948        /// <param name="proxy">The proxy to use for connecting to the endpoint.</param>
 949        ///
 950        /// <returns>The settings to use for transport.</returns>
 951        ///
 952        private static TransportSettings CreateTransportSettingsForWebSockets(string hostName,
 953                                                                              IWebProxy proxy)
 954        {
 0955            var uriBuilder = new UriBuilder(hostName)
 0956            {
 0957                Path = WebSocketsPathSuffix,
 0958                Scheme = WebSocketsUriScheme,
 0959                Port = -1
 0960            };
 961
 0962            return new WebSocketTransportSettings
 0963            {
 0964                Uri = uriBuilder.Uri,
 0965                Proxy = proxy ?? (default)
 0966            };
 967        }
 968
 969        /// <summary>
 970        ///   Creates the AMQP connection settings to use when communicating with the Event Hubs service.
 971        /// </summary>
 972        ///
 973        /// <param name="hostName">The host name of the Event Hubs service endpoint.</param>
 974        /// <param name="identifier">unique identifier of the current Event Hubs scope.</param>
 975        ///
 976        /// <returns>The settings to apply to the connection.</returns>
 977        ///
 978        private static AmqpConnectionSettings CreateAmqpConnectionSettings(string hostName,
 979                                                                           string identifier)
 980        {
 0981            var connectionSettings = new AmqpConnectionSettings
 0982            {
 0983                IdleTimeOut = (uint)ConnectionIdleTimeout.TotalMilliseconds,
 0984                MaxFrameSize = AmqpConstants.DefaultMaxFrameSize,
 0985                ContainerId = identifier,
 0986                HostName = hostName
 0987            };
 988
 0989            foreach (KeyValuePair<string, string> property in ClientLibraryInformation.Current.EnumerateProperties())
 990            {
 0991                connectionSettings.AddProperty(property.Key, property.Value);
 992            }
 993
 0994            return connectionSettings;
 995        }
 996
 997        /// <summary>
 998        ///   Validates the transport associated with the scope, throwing an argument exception
 999        ///   if it is unknown in this context.
 1000        /// </summary>
 1001        ///
 1002        /// <param name="transport">The transport to validate.</param>
 1003        ///
 1004        private static void ValidateTransport(EventHubsTransportType transport)
 1005        {
 2381006            if ((transport != EventHubsTransportType.AmqpTcp) && (transport != EventHubsTransportType.AmqpWebSockets))
 1007            {
 21008                throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.UnknownConnectionType, t
 1009            }
 2361010        }
 1011    }
 1012}