< Summary

Class:Microsoft.Azure.ServiceBus.ServiceBusConnection
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\ServiceBusConnection.cs
Covered lines:34
Uncovered lines:96
Coverable lines:130
Total lines:329
Line coverage:26.1% (34 of 130)
Covered branches:12
Total branches:38
Branch coverage:31.5% (12 of 38)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-0%100%
.ctor(...)-100%50%
.ctor(...)-100%100%
.ctor(...)-75%50%
.ctor(...)-0%0%
.ctor(...)-0%0%
.ctor(...)-100%50%
get_Endpoint()-100%100%
get_OperationTimeout()-100%100%
get_RetryPolicy()-0%100%
get_TransportType()-100%100%
get_TokenProvider()-100%100%
get_IsClosedOrClosing()-100%100%
set_IsClosedOrClosing(...)-0%100%
get_ConnectionManager()-0%100%
get_TransactionController()-0%100%
ThrowIfClosed()-66.67%50%
CloseAsync()-0%0%
InitializeConnection(...)-91.67%87.5%
CloseConnection(...)-0%100%
CloseController(...)-0%100%
CreateConnectionAsync()-0%0%
CreateControllerAsync()-0%0%
CreateTransportSettings()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\ServiceBusConnection.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus
 5{
 6    using System;
 7    using System.Net;
 8    using System.Threading.Tasks;
 9    using Microsoft.Azure.Amqp;
 10    using Microsoft.Azure.Amqp.Framing;
 11    using Microsoft.Azure.Amqp.Transaction;
 12    using Microsoft.Azure.Amqp.Transport;
 13    using Microsoft.Azure.ServiceBus.Amqp;
 14    using Microsoft.Azure.ServiceBus.Primitives;
 15
 16    /// <summary>
 17    /// Connection object to service bus namespace
 18    /// </summary>
 19    public class ServiceBusConnection
 20    {
 021        static readonly Version AmqpVersion = new Version(1, 0, 0, 0);
 22        readonly object syncLock;
 23        bool isClosedOrClosing;
 24
 25        /// <summary>
 26        /// Creates a new connection to service bus.
 27        /// </summary>
 28        /// <param name="connectionStringBuilder"><see cref="ServiceBusConnectionStringBuilder"/> having namespace infor
 29        /// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsyn
 30        public ServiceBusConnection(ServiceBusConnectionStringBuilder connectionStringBuilder)
 1831            : this(connectionStringBuilder?.GetNamespaceConnectionString())
 32        {
 1833        }
 34
 35        /// <summary>
 36        /// Creates a new connection to service bus.
 37        /// </summary>
 38        /// <param name="namespaceConnectionString">Namespace connection string</param>
 39        /// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsyn
 40        public ServiceBusConnection(string namespaceConnectionString)
 2641            : this(namespaceConnectionString, RetryPolicy.Default)
 42        {
 2643        }
 44
 45        /// <summary>
 46        /// Creates a new connection to service bus.
 47        /// </summary>
 48        /// <param name="namespaceConnectionString">Namespace connection string.</param>
 49        /// <param name="retryPolicy">Retry policy for operations. Defaults to <see cref="RetryPolicy.Default"/></param>
 50        /// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsyn
 51        public ServiceBusConnection(string namespaceConnectionString, RetryPolicy retryPolicy = null)
 2652            : this(retryPolicy)
 53        {
 2654            if (string.IsNullOrWhiteSpace(namespaceConnectionString))
 55            {
 056                throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(namespaceConnectionString));
 57            }
 58
 2659            var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder(namespaceConnectionString);
 2660            if (!string.IsNullOrWhiteSpace(serviceBusConnectionStringBuilder.EntityPath))
 61            {
 062                throw Fx.Exception.Argument(nameof(namespaceConnectionString), "NamespaceConnectionString should not con
 63            }
 64
 2665            this.InitializeConnection(serviceBusConnectionStringBuilder);
 2666        }
 67
 68        /// <summary>
 69        /// Creates a new connection to service bus.
 70        /// </summary>
 71        /// <param name="namespaceConnectionString">Namespace connection string.</param>
 72        /// <param name="operationTimeout">Duration after which individual operations will timeout.</param>
 73        /// <param name="retryPolicy">Retry policy for operations. Defaults to <see cref="RetryPolicy.Default"/></param>
 74        /// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsyn
 75        [Obsolete("This constructor is obsolete. Use ServiceBusConnection(string namespaceConnectionString, RetryPolicy 
 76        public ServiceBusConnection(string namespaceConnectionString, TimeSpan operationTimeout, RetryPolicy retryPolicy
 077            : this(retryPolicy)
 78        {
 079            if (string.IsNullOrWhiteSpace(namespaceConnectionString))
 80            {
 081                throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(namespaceConnectionString));
 82            }
 83
 084            var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder(namespaceConnectionString);
 085            if (!string.IsNullOrWhiteSpace(serviceBusConnectionStringBuilder.EntityPath))
 86            {
 087                throw Fx.Exception.Argument(nameof(namespaceConnectionString), "NamespaceConnectionString should not con
 88            }
 89
 090            this.InitializeConnection(serviceBusConnectionStringBuilder);
 91            // operationTimeout argument explicitly provided by caller should take precedence over OperationTimeout foun
 092            this.OperationTimeout = operationTimeout;
 093        }
 94
 95        /// <summary>
 96        /// Creates a new connection to service bus.
 97        /// </summary>
 98        /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.
 99        /// <param name="transportType">Transport type.</param>
 100        /// <param name="retryPolicy">Retry policy for operations. Defaults to <see cref="RetryPolicy.Default"/></param>
 101        public ServiceBusConnection(string endpoint, TransportType transportType, RetryPolicy retryPolicy = null)
 0102            : this(retryPolicy)
 103        {
 0104            if (string.IsNullOrWhiteSpace(endpoint))
 105            {
 0106                throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(endpoint));
 107            }
 108
 0109            var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder()
 0110            {
 0111                Endpoint = endpoint,
 0112                TransportType = transportType
 0113            };
 114
 0115            this.InitializeConnection(serviceBusConnectionStringBuilder);
 0116        }
 117
 26118        internal ServiceBusConnection(RetryPolicy retryPolicy = null)
 119        {
 26120            this.RetryPolicy = retryPolicy ?? RetryPolicy.Default;
 26121            this.syncLock = new object();
 26122        }
 123
 124        /// <summary>
 125        /// Fully qualified domain name for Service Bus.
 126        /// </summary>
 64127        public Uri Endpoint { get; set; }
 128
 129        /// <summary>
 130        /// OperationTimeout is applied in erroneous situations to notify the caller about the relevant <see cref="Servi
 131        /// </summary>
 132        /// <remarks>Defaults to 1 minute.</remarks>
 38133        public TimeSpan OperationTimeout { get; set; }
 134
 135        /// <summary>
 136        /// Retry policy for operations performed on the connection.
 137        /// </summary>
 138        /// <remarks>Defaults to <see cref="RetryPolicy.Default"/></remarks>
 0139        public RetryPolicy RetryPolicy { get; set; }
 140
 141        /// <summary>
 142        /// Get the transport type from the connection string.
 143        /// <remarks>Available options: Amqp and AmqpWebSockets.</remarks>
 144        /// </summary>
 30145        public TransportType TransportType { get; set; }
 146
 147        /// <summary>
 148        /// Token provider for authentication. <see cref="TokenProvider"/>
 149        /// </summary>
 58150        public ITokenProvider TokenProvider { get; set; }
 151
 152        /// <summary>
 153        /// Returns true if the Service Bus Connection is closed or closing.
 154        /// </summary>
 155        public bool IsClosedOrClosing
 156        {
 157            get
 158            {
 12159                lock (syncLock)
 160                {
 12161                    return isClosedOrClosing;
 162                }
 12163            }
 164            internal set
 165            {
 0166                lock (syncLock)
 167                {
 0168                    isClosedOrClosing = value;
 0169                }
 0170            }
 171        }
 172
 0173        internal FaultTolerantAmqpObject<AmqpConnection> ConnectionManager { get; set; }
 174
 0175        internal FaultTolerantAmqpObject<Controller> TransactionController { get; set; }
 176
 177        /// <summary>
 178        /// Throw an OperationCanceledException if the object is Closing.
 179        /// </summary>
 180        internal virtual void ThrowIfClosed()
 181        {
 12182            if (this.IsClosedOrClosing)
 183            {
 0184                throw new ObjectDisposedException($"{nameof(ServiceBusConnection)} has already been closed. Please creat
 185            }
 12186        }
 187
 188        /// <summary>
 189        /// Closes the connection.
 190        /// </summary>
 191        public async Task CloseAsync()
 192        {
 0193            var callClose = false;
 0194            lock (this.syncLock)
 195            {
 0196                if (!this.IsClosedOrClosing)
 197                {
 0198                    this.IsClosedOrClosing = true;
 0199                    callClose = true;
 200                }
 0201            }
 202
 0203            if (callClose)
 204            {
 0205                await this.ConnectionManager.CloseAsync().ConfigureAwait(false);
 206            }
 0207        }
 208
 209        void InitializeConnection(ServiceBusConnectionStringBuilder builder)
 210        {
 26211            this.Endpoint = new Uri(builder.Endpoint);
 212
 26213            if (builder.SasToken != null)
 214            {
 0215                this.TokenProvider = new SharedAccessSignatureTokenProvider(builder.SasToken);
 216            }
 26217            else if (builder.SasKeyName != null || builder.SasKey != null)
 218            {
 14219                this.TokenProvider = new SharedAccessSignatureTokenProvider(builder.SasKeyName, builder.SasKey);
 220            }
 12221            else if (builder.Authentication.Equals(ServiceBusConnectionStringBuilder.AuthenticationType.ManagedIdentity)
 222            {
 6223                this.TokenProvider = new ManagedIdentityTokenProvider();
 224            }
 225
 26226            this.OperationTimeout = builder.OperationTimeout;
 26227            this.TransportType = builder.TransportType;
 26228            this.ConnectionManager = new FaultTolerantAmqpObject<AmqpConnection>(this.CreateConnectionAsync, CloseConnec
 26229            this.TransactionController = new FaultTolerantAmqpObject<Controller>(this.CreateControllerAsync, CloseContro
 26230        }
 231
 232        static void CloseConnection(AmqpConnection connection)
 233        {
 0234            MessagingEventSource.Log.AmqpConnectionClosed(connection);
 0235            connection.SafeClose();
 0236        }
 237
 238        static void CloseController(Controller controller)
 239        {
 0240            controller.Close();
 0241        }
 242
 243        async Task<AmqpConnection> CreateConnectionAsync(TimeSpan timeout)
 244        {
 0245            var hostName = this.Endpoint.Host;
 246
 0247            var timeoutHelper = new TimeoutHelper(timeout, true);
 0248            var amqpSettings = AmqpConnectionHelper.CreateAmqpSettings(
 0249                amqpVersion: AmqpVersion,
 0250                useSslStreamSecurity: true,
 0251                hasTokenProvider: true,
 0252                useWebSockets: TransportType == TransportType.AmqpWebSockets);
 253
 0254            var transportSettings = CreateTransportSettings();
 0255            var amqpTransportInitiator = new AmqpTransportInitiator(amqpSettings, transportSettings);
 0256            var transport = await amqpTransportInitiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(
 257
 0258            var containerId = Guid.NewGuid().ToString();
 0259            var amqpConnectionSettings = AmqpConnectionHelper.CreateAmqpConnectionSettings(AmqpConstants.DefaultMaxFrame
 0260            var connection = new AmqpConnection(transport, amqpSettings, amqpConnectionSettings);
 0261            await connection.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
 262
 263            // Always create the CBS Link + Session
 0264            var cbsLink = new AmqpCbsLink(connection);
 0265            if (connection.Extensions.Find<AmqpCbsLink>() == null)
 266            {
 0267                connection.Extensions.Add(cbsLink);
 268            }
 269
 0270            MessagingEventSource.Log.AmqpConnectionCreated(hostName, connection);
 271
 0272            return connection;
 0273        }
 274
 275        async Task<Controller> CreateControllerAsync(TimeSpan timeout)
 276        {
 0277            var timeoutHelper = new TimeoutHelper(timeout, true);
 0278            var connection = await this.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait
 279
 0280            var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
 0281            AmqpSession amqpSession = null;
 282            Controller controller;
 283
 284            try
 285            {
 0286                amqpSession = connection.CreateSession(sessionSettings);
 0287                await amqpSession.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
 288
 0289                controller = new Controller(amqpSession, timeoutHelper.RemainingTime());
 0290                await controller.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
 0291            }
 0292            catch (Exception exception)
 293            {
 0294                if (amqpSession != null)
 295                {
 0296                    await amqpSession.CloseAsync(timeout).ConfigureAwait(false);
 297                }
 298
 0299                MessagingEventSource.Log.AmqpCreateControllerException(this.ConnectionManager.ToString(), exception);
 0300                throw;
 0301            }
 302
 0303            return controller;
 0304        }
 305
 306        TransportSettings CreateTransportSettings()
 307        {
 0308            var hostName = this.Endpoint.Host;
 0309            var networkHost = this.Endpoint.Host;
 0310            var port = this.Endpoint.Port;
 311
 0312            if (TransportType == TransportType.AmqpWebSockets)
 313            {
 0314                return AmqpConnectionHelper.CreateWebSocketTransportSettings(
 0315                    networkHost: networkHost,
 0316                    hostName: hostName,
 0317                    port: port,
 0318                    proxy: WebRequest.DefaultWebProxy);
 319            }
 320
 0321            return AmqpConnectionHelper.CreateTcpTransportSettings(
 0322                networkHost: networkHost,
 0323                hostName: hostName,
 0324                port: port,
 0325                useSslStreamSecurity: true);
 326        }
 327
 328    }
 329}