< Summary

Class:Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpConnectionScope.cs
Covered lines:26
Uncovered lines:420
Coverable lines:446
Total lines:1130
Line coverage:5.8% (26 of 446)
Covered branches:5
Total branches:64
Branch coverage:7.8% (5 of 64)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_AmqpVersion()-0%100%
get_ConnectionIdleTimeout()-0%100%
get_AuthorizationRefreshBuffer()-0%100%
get_MinimumAuthorizationRefresh()-0%100%
get_AuthorizationRefreshTimeout()-0%100%
get_IsDisposed()-0%100%
get_OperationCancellationSource()-100%100%
get_ActiveLinks()-0%100%
get_Id()-0%100%
get_ServiceEndpoint()-100%100%
get_TokenProvider()-0%100%
get_Transport()-0%100%
get_Proxy()-0%100%
get_ActiveConnection()-0%100%
get_TransactionController()-0%100%
.ctor(...)-95.24%100%
CreateControllerAsync()-0%0%
CloseController(...)-0%100%
.ctor()-100%100%
OpenManagementLinkAsync()-0%100%
OpenReceiverLinkAsync()-0%100%
OpenSenderLinkAsync()-0%100%
Dispose()-0%0%
CreateAndOpenConnectionAsync()-0%0%
CreateManagementLinkAsync()-0%0%
CreateReceivingLinkAsync()-0%0%
CreateSendingLinkAsync()-0%0%
BeginTrackingLinkAsActive(...)-0%0%
CloseConnection(...)-0%100%
CalculateLinkAuthorizationRefreshInterval(...)-0%0%
CreateAuthorizationRefreshHandler(...)-0%100%
OpenAmqpObjectAsync()-0%0%
RequestAuthorizationUsingCbsAsync()-0%0%
CreateAmpqSettings(...)-0%100%
CreateTransportSettingsforTcp(...)-0%0%
CreateTransportSettingsForWebSockets(...)-0%0%
CreateAmqpConnectionSettings(...)-0%0%
ValidateTransport(...)-66.67%75%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpConnectionScope.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Concurrent;
 6using System.Collections.Generic;
 7using System.Globalization;
 8using System.Net;
 9using System.Runtime.ExceptionServices;
 10using System.Threading;
 11using System.Threading.Tasks;
 12using Azure.Core;
 13using Azure.Core.Diagnostics;
 14using Azure.Messaging.ServiceBus.Authorization;
 15using Azure.Messaging.ServiceBus.Core;
 16using Azure.Messaging.ServiceBus.Diagnostics;
 17using Microsoft.Azure.Amqp;
 18using Microsoft.Azure.Amqp.Framing;
 19using Microsoft.Azure.Amqp.Sasl;
 20using Microsoft.Azure.Amqp.Transaction;
 21using Microsoft.Azure.Amqp.Transport;
 22
 23namespace Azure.Messaging.ServiceBus.Amqp
 24{
 25    /// <summary>
 26    ///   Defines a context for AMQP operations which can be shared amongst the different
 27    ///   client types within a given scope.
 28    /// </summary>
 29    ///
 30    internal class AmqpConnectionScope : TransportConnectionScope
 31    {
 32        /// <summary>The name to assign to the SASL handler to specify that CBS tokens are in use.</summary>
 33        private const string CbsSaslHandlerName = "MSSBCBS";
 34
 35        /// <summary>The suffix to attach to the resource path when using web sockets for service communication.</summar
 36        private const string WebSocketsPathSuffix = "/$servicebus/websocket/";
 37
 38        /// <summary>The URI scheme to apply when using web sockets for service communication.</summary>
 39        private const string WebSocketsUriScheme = "wss";
 40
 41        /// <summary>
 42        ///   The version of AMQP to use within the scope.
 43        /// </summary>
 44        ///
 045        private static Version AmqpVersion { get; } = new Version(1, 0, 0, 0);
 46
 47        /// <summary>
 48        ///   The amount of time to allow an AMQP connection to be idle before considering
 49        ///   it to be timed out.
 50        /// </summary>
 51        ///
 052        private static TimeSpan ConnectionIdleTimeout { get; } = TimeSpan.FromMinutes(1);
 53
 54        /// <summary>
 55        ///   The amount of buffer to apply to account for clock skew when
 56        ///   refreshing authorization.  Authorization will be refreshed earlier
 57        ///   than the expected expiration by this amount.
 58        /// </summary>
 59        ///
 060        private static TimeSpan AuthorizationRefreshBuffer { get; } = TimeSpan.FromMinutes(5);
 61
 62        /// <summary>
 63        ///   The minimum amount of time for authorization to be refreshed; any calculations that
 64        ///   call for refreshing more frequently will be substituted with this value.
 65        /// </summary>
 66        ///
 067        private static TimeSpan MinimumAuthorizationRefresh { get; } = TimeSpan.FromMinutes(4);
 68
 69        /// <summary>
 70        ///   The amount time to allow to refresh authorization of an AMQP link.
 71        /// </summary>
 72        ///
 073        private static TimeSpan AuthorizationRefreshTimeout { get; } = TimeSpan.FromMinutes(3);
 74
 75        /// <summary>
 76        ///   Indicates whether this <see cref="AmqpConnectionScope"/> has been disposed.
 77        /// </summary>
 78        ///
 79        /// <value><c>true</c> if disposed; otherwise, <c>false</c>.</value>
 80        ///
 081        public override bool IsDisposed { get; protected set; }
 82
 83        /// <summary>
 84        ///   The cancellation token to use with operations initiated by the scope.
 85        /// </summary>
 86        ///
 11287        private CancellationTokenSource OperationCancellationSource { get; } = new CancellationTokenSource();
 88
 89        /// <summary>
 90        ///   The set of active AMQP links associated with the connection scope.  These are considered children
 91        ///   of the active connection and should be managed as such.
 92        /// </summary>
 93        ///
 094        private ConcurrentDictionary<AmqpObject, Timer> ActiveLinks { get; } = new ConcurrentDictionary<AmqpObject, Time
 95
 96        /// <summary>
 97        ///   The unique identifier of the scope.
 98        /// </summary>
 99        ///
 0100        private string Id { get; }
 101
 102        /// <summary>
 103        ///   The endpoint for the Service Bus service to which the scope is associated.
 104        /// </summary>
 105        ///
 42106        private Uri ServiceEndpoint { get; }
 107
 108        /// <summary>
 109        ///   The provider to use for obtaining a token for authorization with the Service Bus service.
 110        /// </summary>
 111        ///
 0112        private CbsTokenProvider TokenProvider { get; }
 113
 114        /// <summary>
 115        ///   The type of transport to use for communication.
 116        /// </summary>
 117        ///
 0118        private ServiceBusTransportType Transport { get; }
 119
 120        /// <summary>
 121        ///   The proxy, if any, which should be used for communication.
 122        /// </summary>
 123        ///
 0124        private IWebProxy Proxy { get; }
 125
 126        /// <summary>
 127        ///   The AMQP connection that is active for the current scope.
 128        /// </summary>
 129        ///
 0130        private FaultTolerantAmqpObject<AmqpConnection> ActiveConnection { get; }
 0131        public FaultTolerantAmqpObject<Controller> TransactionController { get; }
 132
 133        /// <summary>
 134        ///   Initializes a new instance of the <see cref="AmqpConnectionScope"/> class.
 135        /// </summary>
 136        ///
 137        /// <param name="serviceEndpoint">Endpoint for the Service Bus service to which the scope is associated.</param>
 138        /// <param name="credential">The credential to use for authorization with the Service Bus service.</param>
 139        /// <param name="transport">The transport to use for communication.</param>
 140        /// <param name="proxy">The proxy, if any, to use for communication.</param>
 141        /// <param name="identifier">The identifier to assign this scope; if not provided, one will be generated.</param
 142        ///
 42143        public AmqpConnectionScope(Uri serviceEndpoint,
 42144                                   ServiceBusTokenCredential credential,
 42145                                   ServiceBusTransportType transport,
 42146                                   IWebProxy proxy,
 42147                                   string identifier = default)
 148        {
 42149            Argument.AssertNotNull(serviceEndpoint, nameof(serviceEndpoint));
 42150            Argument.AssertNotNull(credential, nameof(credential));
 42151            ValidateTransport(transport);
 152
 42153            ServiceEndpoint = serviceEndpoint;
 42154            Transport = transport;
 42155            Proxy = proxy;
 42156            TokenProvider = new CbsTokenProvider(new ServiceBusTokenCredential(credential, serviceEndpoint.ToString()), 
 42157            Id = identifier ?? $"{ ServiceEndpoint }-{ Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture).Substr
 158
 159#pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for t
 0160            Task<AmqpConnection> connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, Servic
 161#pragma warning restore CA2214 // Do not call overridable methods in constructors
 42162            ActiveConnection = new FaultTolerantAmqpObject<AmqpConnection>(
 42163                connectionFactory,
 42164                CloseConnection);
 42165            TransactionController = new FaultTolerantAmqpObject<Controller>(
 42166                CreateControllerAsync,
 42167                CloseController);
 42168        }
 169
 170        private async Task<Controller> CreateControllerAsync(TimeSpan timeout)
 171        {
 0172            var stopWatch = ValueStopwatch.StartNew();
 0173            AmqpConnection connection = await ActiveConnection.GetOrCreateAsync(timeout.CalculateRemaining(stopWatch.Get
 174
 0175            var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
 0176            AmqpSession amqpSession = null;
 177            Controller controller;
 178
 179            try
 180            {
 0181                amqpSession = connection.CreateSession(sessionSettings);
 0182                await amqpSession.OpenAsync(timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false
 183
 0184                controller = new Controller(amqpSession, timeout.CalculateRemaining(stopWatch.GetElapsedTime()));
 0185                await controller.OpenAsync(timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false)
 0186            }
 0187            catch (Exception exception)
 188            {
 0189                if (amqpSession != null)
 190                {
 0191                    await amqpSession.CloseAsync(timeout).ConfigureAwait(false);
 192                }
 193
 0194                ServiceBusEventSource.Log.CreateControllerException(ActiveConnection.ToString(), exception.ToString());
 0195                throw;
 0196            }
 197
 0198            return controller;
 0199        }
 200
 201        private void CloseController(Controller controller) =>
 0202            controller.Close();
 203
 204        /// <summary>
 205        ///   Initializes a new instance of the <see cref="AmqpConnectionScope"/> class.
 206        /// </summary>
 207        ///
 28208        protected AmqpConnectionScope()
 209        {
 28210        }
 211
 212        /// <summary>
 213        ///   Opens an AMQP link for use with management operations.
 214        /// </summary>
 215        /// <param name="entityPath">The path for the entity.</param>
 216        /// <param name="identifier">The identifier for the sender or receiver that is opening a management link.</param
 217        /// <param name="timeout">The timeout to apply when creating the link.</param>
 218        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 219        ///
 220        /// <returns>A link for use with management operations.</returns>
 221        ///
 222        /// <remarks>
 223        ///   The authorization for this link does not require periodic
 224        ///   refreshing.
 225        /// </remarks>
 226        ///
 227        public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
 228            string entityPath,
 229            string identifier,
 230            TimeSpan timeout,
 231            CancellationToken cancellationToken)
 232        {
 0233            ServiceBusEventSource.Log.CreateManagementLinkStart(identifier);
 234            try
 235            {
 0236                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 237
 0238                var stopWatch = ValueStopwatch.StartNew();
 0239                var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
 0240                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 241
 0242                var link = await CreateManagementLinkAsync(
 0243                    entityPath,
 0244                    connection,
 0245                    timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
 0246                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 247
 0248                await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(f
 0249                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0250                ServiceBusEventSource.Log.CreateManagementLinkComplete(identifier);
 0251                return link;
 252            }
 0253            catch (Exception ex)
 254            {
 0255                ServiceBusEventSource.Log.CreateManagementLinkException(identifier, ex.ToString());
 0256                throw;
 257            }
 0258        }
 259
 260        /// <summary>
 261        ///   Opens an AMQP link for use with consumer operations.
 262        /// </summary>
 263        /// <param name="entityPath"></param>
 264        ///
 265        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 266        /// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults 
 267        /// <param name="sessionId"></param>
 268        /// <param name="isSessionReceiver"></param>
 269        /// <param name="timeout">The timeout to apply when creating the link.</param>
 270        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 271        ///
 272        /// <returns>A link for use with consumer operations.</returns>
 273        ///
 274        public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
 275            string entityPath,
 276            TimeSpan timeout,
 277            uint prefetchCount,
 278            ReceiveMode receiveMode,
 279            string sessionId,
 280            bool isSessionReceiver,
 281            CancellationToken cancellationToken)
 282        {
 0283            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 284
 0285            var stopWatch = ValueStopwatch.StartNew();
 0286            var receiverEndpoint = new Uri(ServiceEndpoint, entityPath);
 287
 0288            var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
 0289            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 290
 0291            ReceivingAmqpLink link = await CreateReceivingLinkAsync(
 0292                entityPath,
 0293                connection,
 0294                receiverEndpoint,
 0295                timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
 0296                prefetchCount,
 0297                receiveMode,
 0298                sessionId,
 0299                isSessionReceiver,
 0300                cancellationToken
 0301            ).ConfigureAwait(false);
 302
 0303            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 304
 0305            await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false
 0306            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0307            return link;
 308
 0309        }
 310
 311        /// <summary>
 312        ///   Opens an AMQP link for use with producer operations.
 313        /// </summary>
 314        /// <param name="entityPath"></param>
 315        /// <param name="viaEntityPath">The entity path to route the message through. Useful when using transactions.</p
 316        /// <param name="timeout">The timeout to apply when creating the link.</param>
 317        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 318        ///
 319        /// <returns>A link for use with producer operations.</returns>
 320        ///
 321        public virtual async Task<SendingAmqpLink> OpenSenderLinkAsync(
 322            string entityPath,
 323            string viaEntityPath,
 324            TimeSpan timeout,
 325            CancellationToken cancellationToken)
 326        {
 0327            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 328
 0329            var stopWatch = ValueStopwatch.StartNew();
 330
 0331            AmqpConnection connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
 0332            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 333
 0334            SendingAmqpLink link = await CreateSendingLinkAsync(
 0335                entityPath,
 0336                viaEntityPath,
 0337                connection,
 0338                timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
 339
 0340            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 341
 0342            await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false
 0343            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 344
 0345            return link;
 0346        }
 347
 348        /// <summary>
 349        ///   Performs the task needed to clean up resources used by the <see cref="AmqpConnectionScope" />,
 350        ///   including ensuring that the client itself has been closed.
 351        /// </summary>
 352        ///
 353        public override void Dispose()
 354        {
 0355            if (IsDisposed)
 356            {
 0357                return;
 358            }
 359
 0360            ActiveConnection?.Dispose();
 0361            OperationCancellationSource.Cancel();
 0362            OperationCancellationSource.Dispose();
 363
 0364            IsDisposed = true;
 0365        }
 366
 367        /// <summary>
 368        ///   Creates an AMQP connection for a given scope.
 369        /// </summary>
 370        ///
 371        /// <param name="amqpVersion">The version of AMQP to use for the connection.</param>
 372        /// <param name="serviceEndpoint">The endpoint for the Service Bus service to which the scope is associated.</pa
 373        /// <param name="transportType">The type of transport to use for communication.</param>
 374        /// <param name="proxy">The proxy, if any, to use for communication.</param>
 375        /// <param name="scopeIdentifier">The unique identifier for the associated scope.</param>
 376        /// <param name="timeout">The timeout to consider when creating the connection.</param>
 377        ///
 378        /// <returns>An AMQP connection that may be used for communicating with the Service Bus service.</returns>
 379        ///
 380        protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(
 381            Version amqpVersion,
 382            Uri serviceEndpoint,
 383            ServiceBusTransportType transportType,
 384            IWebProxy proxy,
 385            string scopeIdentifier,
 386            TimeSpan timeout)
 387        {
 0388            var hostName = serviceEndpoint.Host;
 0389            AmqpSettings amqpSettings = CreateAmpqSettings(AmqpVersion);
 0390            AmqpConnectionSettings connectionSetings = CreateAmqpConnectionSettings(hostName, scopeIdentifier);
 391
 0392            TransportSettings transportSettings = transportType.IsWebSocketTransport()
 0393                ? CreateTransportSettingsForWebSockets(hostName, proxy)
 0394                : CreateTransportSettingsforTcp(hostName, serviceEndpoint.Port);
 395
 396            // Create and open the connection, respecting the timeout constraint
 397            // that was received.
 398
 0399            var stopWatch = ValueStopwatch.StartNew();
 400
 0401            var initiator = new AmqpTransportInitiator(amqpSettings, transportSettings);
 0402            TransportBase transport = await initiator.ConnectTaskAsync(timeout).ConfigureAwait(false);
 403
 0404            var connection = new AmqpConnection(transport, amqpSettings, connectionSetings);
 0405            await OpenAmqpObjectAsync(connection, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait
 406
 407            // Create the CBS link that will be used for authorization.  The act of creating the link will associate
 408            // it with the connection.
 409
 410#pragma warning disable CA1806 // Do not ignore method results
 0411            new AmqpCbsLink(connection);
 412#pragma warning restore CA1806 // Do not ignore method results
 413
 414            // When the connection is closed, close each of the links associated with it.
 415
 0416            EventHandler closeHandler = null;
 417
 0418            closeHandler = (snd, args) =>
 0419            {
 0420                foreach (var link in ActiveLinks.Keys)
 0421                {
 0422                    link.SafeClose();
 0423                }
 0424
 0425                connection.Closed -= closeHandler;
 0426            };
 427
 0428            connection.Closed += closeHandler;
 0429            return connection;
 0430        }
 431
 432        /// <summary>
 433        ///   Creates an AMQP link for use with management operations.
 434        /// </summary>
 435        /// <param name="entityPath"></param>
 436        ///
 437        /// <param name="connection">The active and opened AMQP connection to use for this link.</param>
 438        /// <param name="timeout">The timeout to apply when creating the link.</param>
 439        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 440        ///
 441        /// <returns>A link for use with management operations.</returns>
 442        ///
 443        protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
 444            string entityPath,
 445            AmqpConnection connection,
 446            TimeSpan timeout,
 447            CancellationToken cancellationToken)
 448        {
 0449            Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
 0450            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 451
 0452            var session = default(AmqpSession);
 0453            var stopWatch = ValueStopwatch.StartNew();
 454
 455            try
 456            {
 457                // Create and open the AMQP session associated with the link.
 458
 0459                var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
 0460                session = connection.CreateSession(sessionSettings);
 461
 0462                await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
 0463                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 464
 465                // Create and open the link.
 466
 0467                var linkSettings = new AmqpLinkSettings();
 0468                linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.CalculateRemaining(stopWatch.Get
 0469                linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);
 0470                entityPath += '/' + AmqpClientConstants.ManagementAddress;
 471
 472                // Perform the initial authorization for the link.
 473
 0474                string[] claims = { ServiceBusClaim.Manage, ServiceBusClaim.Listen, ServiceBusClaim.Send };
 0475                var endpoint = new Uri(ServiceEndpoint, entityPath);
 0476                var audience = new[] { endpoint.AbsoluteUri };
 0477                DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync(
 0478                    connection,
 0479                    TokenProvider,
 0480                    ServiceEndpoint,
 0481                    audience,
 0482                    claims,
 0483                    timeout.CalculateRemaining(stopWatch.GetElapsedTime()))
 0484                    .ConfigureAwait(false);
 485
 0486                var link = new RequestResponseAmqpLink(
 0487                    AmqpClientConstants.EntityTypeManagement,
 0488                    session,
 0489                    entityPath,
 0490                    linkSettings.Properties);
 0491                linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}
 492
 493                // Track the link before returning it, so that it can be managed with the scope.
 0494                var refreshTimer = default(Timer);
 495
 0496                TimerCallback refreshHandler = CreateAuthorizationRefreshHandler
 0497                (
 0498                    entityPath,
 0499                    connection,
 0500                    link,
 0501                    TokenProvider,
 0502                    ServiceEndpoint,
 0503                    audience,
 0504                    claims,
 0505                    AuthorizationRefreshTimeout,
 0506                    () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null)
 0507                );
 508
 0509                refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationU
 510
 511                // Track the link before returning it, so that it can be managed with the scope.
 512
 0513                BeginTrackingLinkAsActive(entityPath, link, refreshTimer);
 0514                return link;
 515            }
 516            catch (Exception exception)
 517            {
 518                // Aborting the session will perform any necessary cleanup of
 519                // the associated link as well.
 520
 0521                session?.Abort();
 0522                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
 0523                    exception,
 0524                    null,
 0525                    session.GetInnerException(),
 0526                    connection.IsClosing()))
 0527                .Throw();
 528
 0529                throw; // will never be reached
 530            }
 0531        }
 532
 533        /// <summary>
 534        ///   Creates an AMQP link for use with receiving operations.
 535        /// </summary>
 536        /// <param name="entityPath"></param>
 537        ///
 538        /// <param name="connection">The active and opened AMQP connection to use for this link.</param>
 539        /// <param name="endpoint">The fully qualified endpoint to open the link for.</param>
 540        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 541        /// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults 
 542        /// <param name="sessionId"></param>
 543        /// <param name="isSessionReceiver"></param>
 544        /// <param name="timeout">The timeout to apply when creating the link.</param>
 545        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 546        ///
 547        /// <returns>A link for use for operations related to receiving events.</returns>
 548        ///
 549        protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
 550            string entityPath,
 551            AmqpConnection connection,
 552            Uri endpoint,
 553            TimeSpan timeout,
 554            uint prefetchCount,
 555             ReceiveMode receiveMode,
 556            string sessionId,
 557            bool isSessionReceiver,
 558            CancellationToken cancellationToken)
 559        {
 0560            Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
 0561            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 562
 0563            var session = default(AmqpSession);
 0564            var stopWatch = ValueStopwatch.StartNew();
 565
 566            try
 567            {
 568                // Perform the initial authorization for the link.
 569
 0570                string[] authClaims = new string[] { ServiceBusClaim.Send };
 0571                var audience = new[] { endpoint.AbsoluteUri };
 0572                DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync(
 0573                    connection,
 0574                    TokenProvider,
 0575                    endpoint,
 0576                    audience,
 0577                    authClaims,
 0578                    timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
 0579                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 580
 581                // Create and open the AMQP session associated with the link.
 582
 0583                var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
 0584                session = connection.CreateSession(sessionSettings);
 585
 0586                await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
 0587                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 588
 0589                var filters = new FilterSet();
 590
 591                // even if supplied sessionId is null, we need to add the Session filter if it is a session receiver
 0592                if (isSessionReceiver)
 593                {
 0594                    filters.Add(AmqpClientConstants.SessionFilterName, sessionId);
 595                }
 596
 0597                var linkSettings = new AmqpLinkSettings
 0598                {
 0599                    Role = true,
 0600                    TotalLinkCredit = prefetchCount,
 0601                    AutoSendFlow = prefetchCount > 0,
 0602                    SettleType = (receiveMode == ReceiveMode.PeekLock) ? SettleMode.SettleOnDispose : SettleMode.SettleO
 0603                    Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters },
 0604                    Target = new Target { Address = Guid.NewGuid().ToString() }
 0605                };
 606
 0607                var link = new ReceivingAmqpLink(linkSettings);
 0608                linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}
 609
 0610                link.AttachTo(session);
 611
 612                // Configure refresh for authorization of the link.
 613
 0614                var refreshTimer = default(Timer);
 615
 0616                TimerCallback refreshHandler = CreateAuthorizationRefreshHandler
 0617                (
 0618                    entityPath,
 0619                    connection,
 0620                    link,
 0621                    TokenProvider,
 0622                    endpoint,
 0623                    audience,
 0624                    authClaims,
 0625                    AuthorizationRefreshTimeout,
 0626                    () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null)
 0627                );
 628
 0629                refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationU
 630
 631                // Track the link before returning it, so that it can be managed with the scope.
 632
 0633                BeginTrackingLinkAsActive(entityPath, link, refreshTimer);
 0634                return link;
 635            }
 636            catch (Exception exception)
 637            {
 638                // Aborting the session will perform any necessary cleanup of
 639                // the associated link as well.
 640
 0641                session?.Abort();
 0642                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
 0643                    exception,
 0644                    null,
 0645                    session.GetInnerException(),
 0646                    connection.IsClosing()))
 0647                .Throw();
 648
 0649                throw; // will never be reached
 650            }
 0651        }
 652
 653        /// <summary>
 654        ///   Creates an AMQP link for use with publishing operations.
 655        /// </summary>
 656        /// <param name="entityPath"></param>
 657        /// <param name="viaEntityPath">The entity path to route the message through. Useful when using transactions.</p
 658        /// <param name="connection">The active and opened AMQP connection to use for this link.</param>
 659        /// <param name="timeout">The timeout to apply when creating the link.</param>
 660        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 661        ///
 662        /// <returns>A link for use for operations related to receiving events.</returns>
 663        ///
 664        protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
 665            string entityPath,
 666            string viaEntityPath,
 667            AmqpConnection connection,
 668            TimeSpan timeout,
 669            CancellationToken cancellationToken)
 670        {
 0671            Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
 0672            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 673
 0674            var session = default(AmqpSession);
 0675            var stopWatch = ValueStopwatch.StartNew();
 676
 677            try
 678            {
 679                string[] audience;
 0680                Uri destinationEndpoint = null;
 681
 682                // if there is a via entityPath, include that in the audience
 683
 0684                if (!string.IsNullOrEmpty(viaEntityPath))
 685                {
 0686                    destinationEndpoint = new Uri(ServiceEndpoint, viaEntityPath);
 0687                    var finalDestinationEndpoint = new Uri(ServiceEndpoint, entityPath);
 0688                    audience = new string[] { finalDestinationEndpoint.AbsoluteUri, destinationEndpoint.AbsoluteUri };
 689                }
 690                else
 691                {
 0692                    destinationEndpoint = new Uri(ServiceEndpoint, entityPath);
 0693                    audience = new string[] { destinationEndpoint.AbsoluteUri };
 694                }
 695
 696                // Perform the initial authorization for the link.
 697
 0698                var authClaims = new[] { ServiceBusClaim.Send };
 699
 0700                DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync(
 0701                    connection,
 0702                    TokenProvider,
 0703                    destinationEndpoint,
 0704                    audience,
 0705                    authClaims,
 0706                    timeout.CalculateRemaining(stopWatch.GetElapsedTime()))
 0707                    .ConfigureAwait(false);
 708
 0709                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 710
 711                // Create and open the AMQP session associated with the link.
 712
 0713                var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
 0714                session = connection.CreateSession(sessionSettings);
 715
 0716                await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
 0717                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 718
 719                // Create and open the link.
 720
 0721                var linkSettings = new AmqpLinkSettings
 0722                {
 0723                    Role = false,
 0724                    InitialDeliveryCount = 0,
 0725                    Source = new Source { Address = Guid.NewGuid().ToString() },
 0726                    Target = new Target { Address = destinationEndpoint.AbsolutePath }
 0727                };
 728
 0729                if (!string.IsNullOrEmpty(viaEntityPath))
 730                {
 0731                    linkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, entityPath);
 732                }
 733
 0734                linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.CalculateRemaining(stopWatch.Get
 735
 0736                var link = new SendingAmqpLink(linkSettings);
 0737                linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }";
 0738                link.AttachTo(session);
 739
 740                // Configure refresh for authorization of the link.
 741
 0742                var refreshTimer = default(Timer);
 743
 0744                TimerCallback refreshHandler = CreateAuthorizationRefreshHandler
 0745                (
 0746                    entityPath,
 0747                    connection,
 0748                    link,
 0749                    TokenProvider,
 0750                    destinationEndpoint,
 0751                    audience,
 0752                    authClaims,
 0753                    AuthorizationRefreshTimeout,
 0754                    () => refreshTimer
 0755                );
 756
 0757                refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationU
 758
 759                // Track the link before returning it, so that it can be managed with the scope.
 760
 0761                BeginTrackingLinkAsActive(entityPath, link, refreshTimer);
 0762                return link;
 763            }
 764            catch (Exception exception)
 765            {
 766                // Aborting the session will perform any necessary cleanup of
 767                // the associated link as well.
 768
 0769                session?.Abort();
 0770                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
 0771                    exception,
 0772                    null,
 0773                    session.GetInnerException(),
 0774                    connection.IsClosing()))
 0775                .Throw();
 776
 0777                throw; // will never be reached
 778            }
 0779        }
 780
 781        /// <summary>
 782        ///   Performs the actions needed to configure and begin tracking the specified AMQP
 783        ///   link as an active link bound to this scope.
 784        /// </summary>
 785        /// <param name="entityPath"></param>
 786        ///
 787        /// <param name="link">The link to begin tracking.</param>
 788        /// <param name="authorizationRefreshTimer">The timer used to manage refreshing authorization, if the link requi
 789        ///
 790        /// <remarks>
 791        ///   This method does operate on the specified <paramref name="link"/> in order to configure it
 792        ///   for active tracking; no assumptions are made about the open/connected state of the link nor are
 793        ///   its communication properties modified.
 794        /// </remarks>
 795        ///
 796        protected virtual void BeginTrackingLinkAsActive(
 797            string entityPath,
 798            AmqpObject link,
 799            Timer authorizationRefreshTimer = null)
 800        {
 801            // Register the link as active and having authorization automatically refreshed, so that it can be
 802            // managed with the scope.
 803
 0804            if (!ActiveLinks.TryAdd(link, authorizationRefreshTimer))
 805            {
 0806                throw new ServiceBusException(true, entityPath, Resources.CouldNotCreateLink);
 807            }
 808
 809            // When the link is closed, stop refreshing authorization and remove it from the
 810            // set of associated links.
 811
 0812            var closeHandler = default(EventHandler);
 813
 0814            closeHandler = (snd, args) =>
 0815            {
 0816                ActiveLinks.TryRemove(link, out var timer);
 0817
 0818                timer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 0819                timer?.Dispose();
 0820
 0821                link.Closed -= closeHandler;
 0822            };
 823
 0824            link.Closed += closeHandler;
 0825        }
 826
 827        /// <summary>
 828        ///   Performs the tasks needed to close a connection.
 829        /// </summary>
 830        ///
 831        /// <param name="connection">The connection to close.</param>
 832        ///
 0833        protected virtual void CloseConnection(AmqpConnection connection) => connection.SafeClose();
 834
 835        /// <summary>
 836        ///   Calculates the interval after which authorization for an AMQP link should be
 837        ///   refreshed.
 838        /// </summary>
 839        ///
 840        /// <param name="expirationTimeUtc">The date/time, in UTC, that the current authorization is expected to expire.
 841        ///
 842        /// <returns>The interval after which authorization should be refreshed.</returns>
 843        ///
 844        protected virtual TimeSpan CalculateLinkAuthorizationRefreshInterval(DateTime expirationTimeUtc)
 845        {
 0846            var refreshDueInterval = (expirationTimeUtc.Subtract(DateTime.UtcNow)).Add(AuthorizationRefreshBuffer);
 0847            return (refreshDueInterval < MinimumAuthorizationRefresh) ? MinimumAuthorizationRefresh : refreshDueInterval
 848        }
 849
 850        /// <summary>
 851        ///   Creates the timer event handler to support refreshing AMQP link authorization
 852        ///   on a recurring basis.
 853        /// </summary>
 854        /// <param name="entityPath"></param>
 855        ///
 856        /// <param name="connection">The AMQP connection to which the link being refreshed is bound to.</param>
 857        /// <param name="amqpLink">The AMQO link to refresh authorization for.</param>
 858        /// <param name="tokenProvider">The <see cref="CbsTokenProvider" /> to use for obtaining access tokens.</param>
 859        /// <param name="endpoint">The Service Bus service endpoint that the AMQP link is communicating with.</param>
 860        /// <param name="audience">The audience associated with the authorization.  This is likely the <paramref name="e
 861        /// <param name="requiredClaims">The set of claims required to support the operations of the AMQP link.</param>
 862        /// <param name="refreshTimeout">The timeout to apply when requesting authorization refresh.</param>
 863        /// <param name="refreshTimerFactory">A function to allow retrieving the <see cref="Timer" /> associated with th
 864        ///
 865        /// <returns>A <see cref="TimerCallback"/> delegate to perform the refresh when a timer is due.</returns>
 866        ///
 867        protected virtual TimerCallback CreateAuthorizationRefreshHandler(
 868            string entityPath,
 869            AmqpConnection connection,
 870            AmqpObject amqpLink,
 871            CbsTokenProvider tokenProvider,
 872            Uri endpoint,
 873            string[] audience,
 874            string[] requiredClaims,
 875            TimeSpan refreshTimeout,
 876            Func<Timer> refreshTimerFactory)
 877        {
 0878            return async _ =>
 0879            {
 0880                ServiceBusEventSource.Log.AmqpLinkAuthorizationRefreshStart(entityPath, endpoint.AbsoluteUri);
 0881                Timer refreshTimer = refreshTimerFactory();
 0882
 0883                try
 0884                {
 0885                    if (refreshTimer == null)
 0886                    {
 0887                        return;
 0888                    }
 0889
 0890                    DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync(
 0891                        connection,
 0892                        tokenProvider,
 0893                        endpoint,
 0894                        audience,
 0895                        requiredClaims,
 0896                        refreshTimeout)
 0897                    .ConfigureAwait(false);
 0898
 0899                // Reset the timer for the next refresh.
 0900
 0901                if (authExpirationUtc >= DateTimeOffset.UtcNow)
 0902                    {
 0903                        refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.Infini
 0904                    }
 0905                }
 0906                catch (ObjectDisposedException)
 0907                {
 0908                // This can occur if the connection is closed or the scope disposed after the factory
 0909                // is called but before the timer is updated.  The callback may also fire while the timer is
 0910                // in the act of disposing.  Do not consider it an error.
 0911            }
 0912                catch (Exception ex)
 0913                {
 0914                    ServiceBusEventSource.Log.AmqpLinkAuthorizationRefreshError(entityPath, endpoint.AbsoluteUri, ex.Mes
 0915
 0916                // Attempt to unset the timer; there's a decent chance that it has been disposed at this point or
 0917                // that the connection has been closed.  Ignore potential exceptions, as they won't impact operation.
 0918                // At worse, another timer tick will occur and the operation will be retried.
 0919
 0920                try
 0921                    { refreshTimer.Change(Timeout.Infinite, Timeout.Infinite); }
 0922                    catch { }
 0923                }
 0924                finally
 0925                {
 0926                    ServiceBusEventSource.Log.AmqpLinkAuthorizationRefreshComplete(entityPath, endpoint.AbsoluteUri);
 0927                }
 0928            };
 929        }
 930
 931        /// <summary>
 932        ///   Performs the actions needed to open an AMQP object, such
 933        ///   as a session or link for use.
 934        /// </summary>
 935        ///
 936        /// <param name="target">The target AMQP object to open.</param>
 937        /// <param name="timeout">The timeout to apply when opening the link.</param>
 938        ///
 939        protected virtual async Task OpenAmqpObjectAsync(
 940            AmqpObject target,
 941            TimeSpan timeout)
 942        {
 943            try
 944            {
 0945                await target.OpenAsync(timeout).ConfigureAwait(false);
 0946            }
 0947            catch
 948            {
 949                switch (target)
 950                {
 951                    case AmqpLink linkTarget:
 0952                        linkTarget.Session?.SafeClose();
 0953                        break;
 954                    case RequestResponseAmqpLink linkTarget:
 0955                        linkTarget.Session?.SafeClose();
 956                        break;
 957
 958                    default:
 959                        break;
 960                }
 961
 0962                target.SafeClose();
 0963                throw;
 964            }
 0965        }
 966
 967        /// <summary>
 968        ///   Requests authorization for a connection or link using a connection via the CBS mechanism.
 969        /// </summary>
 970        ///
 971        /// <param name="connection">The AMQP connection for which the authorization is associated.</param>
 972        /// <param name="tokenProvider">The <see cref="CbsTokenProvider" /> to use for obtaining access tokens.</param>
 973        /// <param name="endpoint">The Service Bus service endpoint that the authorization is requested for.</param>
 974        /// <param name="audience">The audience associated with the authorization.  This is likely the <paramref name="e
 975        /// <param name="requiredClaims">The set of claims required to support the operations of the AMQP link.</param>
 976        /// <param name="timeout">The timeout to apply when requesting authorization.</param>
 977        ///
 978        /// <returns>The date/time, in UTC, when the authorization expires.</returns>
 979        ///
 980        /// <remarks>
 981        ///   It is assumed that there is a valid <see cref="AmqpCbsLink" /> already associated
 982        ///   with the connection; this will be used as the transport for the authorization
 983        ///   credentials.
 984        /// </remarks>
 985        ///
 986        protected virtual async Task<DateTime> RequestAuthorizationUsingCbsAsync(
 987            AmqpConnection connection,
 988            CbsTokenProvider tokenProvider,
 989            Uri endpoint,
 990            string[] audience,
 991            string[] requiredClaims,
 992            TimeSpan timeout)
 993        {
 0994            AmqpCbsLink authLink = connection.Extensions.Find<AmqpCbsLink>();
 0995            DateTime cbsTokenExpiresAtUtc = DateTime.MaxValue;
 0996            foreach (string resource in audience)
 997            {
 0998                DateTime expiresAt =
 0999                    await authLink.SendTokenAsync(TokenProvider, endpoint, resource, resource, requiredClaims, timeout).
 01000                if (expiresAt < cbsTokenExpiresAtUtc)
 1001                {
 01002                    cbsTokenExpiresAtUtc = expiresAt;
 1003                }
 1004            }
 01005            return cbsTokenExpiresAtUtc;
 01006        }
 1007
 1008        /// <summary>
 1009        ///   Creates the settings to use for AMQP communication.
 1010        /// </summary>
 1011        ///
 1012        /// <param name="amqpVersion">The version of AMQP to be used.</param>
 1013        ///
 1014        /// <returns>The settings for AMQP to use for communication with the Service Bus service.</returns>
 1015        ///
 1016        private static AmqpSettings CreateAmpqSettings(Version amqpVersion)
 1017        {
 01018            var saslProvider = new SaslTransportProvider();
 01019            saslProvider.Versions.Add(new AmqpVersion(amqpVersion));
 01020            saslProvider.AddHandler(new SaslAnonymousHandler(CbsSaslHandlerName));
 1021
 01022            var amqpProvider = new AmqpTransportProvider();
 01023            amqpProvider.Versions.Add(new AmqpVersion(amqpVersion));
 1024
 01025            var settings = new AmqpSettings();
 01026            settings.TransportProviders.Add(saslProvider);
 01027            settings.TransportProviders.Add(amqpProvider);
 1028
 01029            return settings;
 1030        }
 1031
 1032        /// <summary>
 1033        ///  Creates the transport settings for use with TCP.
 1034        /// </summary>
 1035        ///
 1036        /// <param name="hostName">The host name of the Service Bus service endpoint.</param>
 1037        /// <param name="port">The port to use for connecting to the endpoint.</param>
 1038        ///
 1039        /// <returns>The settings to use for transport.</returns>
 1040        ///
 1041        private static TransportSettings CreateTransportSettingsforTcp(
 1042            string hostName,
 1043            int port)
 1044        {
 01045            var tcpSettings = new TcpTransportSettings
 01046            {
 01047                Host = hostName,
 01048                Port = port < 0 ? AmqpConstants.DefaultSecurePort : port,
 01049                ReceiveBufferSize = AmqpConstants.TransportBufferSize,
 01050                SendBufferSize = AmqpConstants.TransportBufferSize
 01051            };
 1052
 01053            return new TlsTransportSettings(tcpSettings)
 01054            {
 01055                TargetHost = hostName,
 01056            };
 1057        }
 1058
 1059        /// <summary>
 1060        ///  Creates the transport settings for use with web sockets.
 1061        /// </summary>
 1062        ///
 1063        /// <param name="hostName">The host name of the Service Bus service endpoint.</param>
 1064        /// <param name="proxy">The proxy to use for connecting to the endpoint.</param>
 1065        ///
 1066        /// <returns>The settings to use for transport.</returns>
 1067        ///
 1068        private static TransportSettings CreateTransportSettingsForWebSockets(
 1069            string hostName,
 1070            IWebProxy proxy)
 1071        {
 01072            var uriBuilder = new UriBuilder(hostName)
 01073            {
 01074                Path = WebSocketsPathSuffix,
 01075                Scheme = WebSocketsUriScheme,
 01076                Port = -1
 01077            };
 1078
 01079            return new WebSocketTransportSettings
 01080            {
 01081                Uri = uriBuilder.Uri,
 01082                Proxy = proxy ?? (default)
 01083            };
 1084        }
 1085
 1086        /// <summary>
 1087        ///   Creates the AMQP connection settings to use when communicating with the Service Bus service.
 1088        /// </summary>
 1089        ///
 1090        /// <param name="hostName">The host name of the Service Bus service endpoint.</param>
 1091        /// <param name="identifier">unique identifier of the current Service Bus scope.</param>
 1092        ///
 1093        /// <returns>The settings to apply to the connection.</returns>
 1094        ///
 1095        private static AmqpConnectionSettings CreateAmqpConnectionSettings(
 1096            string hostName,
 1097            string identifier)
 1098        {
 01099            var connectionSettings = new AmqpConnectionSettings
 01100            {
 01101                IdleTimeOut = (uint)ConnectionIdleTimeout.TotalMilliseconds,
 01102                MaxFrameSize = AmqpConstants.DefaultMaxFrameSize,
 01103                ContainerId = identifier,
 01104                HostName = hostName
 01105            };
 1106
 01107            foreach (KeyValuePair<string, string> property in ClientLibraryInformation.Current.EnumerateProperties())
 1108            {
 01109                connectionSettings.AddProperty(property.Key, property.Value);
 1110            }
 1111
 01112            return connectionSettings;
 1113        }
 1114
 1115        /// <summary>
 1116        ///   Validates the transport associated with the scope, throwing an argument exception
 1117        ///   if it is unknown in this context.
 1118        /// </summary>
 1119        ///
 1120        /// <param name="transport">The transport to validate.</param>
 1121        ///
 1122        private static void ValidateTransport(ServiceBusTransportType transport)
 1123        {
 421124            if ((transport != ServiceBusTransportType.AmqpTcp) && (transport != ServiceBusTransportType.AmqpWebSockets))
 1125            {
 01126                throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.UnknownConnectionType, t
 1127            }
 421128        }
 1129    }
 1130}