< Summary

Class:Azure.Messaging.ServiceBus.Amqp.AmqpClient
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpClient.cs
Covered lines:44
Uncovered lines:30
Coverable lines:74
Total lines:272
Line coverage:59.4% (44 of 74)
Covered branches:0
Total branches:12
Branch coverage:0% (0 of 12)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_CredentialRefreshBuffer()-0%100%
get_IsClosed()-100%100%
get_ServiceEndpoint()-100%100%
get_Credential()-0%100%
get_ConnectionScope()-100%100%
.ctor(...)-100%100%
CreateSender(...)-100%100%
CreateReceiver(...)-100%100%
CreateRuleManager(...)-0%100%
CloseAsync(...)-0%0%
AcquireAccessTokenAsync()-0%0%
UseMinimum(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Security.Authentication;
 6using System.Threading;
 7using System.Threading.Tasks;
 8using Azure.Core;
 9using Azure.Messaging.ServiceBus.Authorization;
 10using Azure.Messaging.ServiceBus.Core;
 11
 12namespace Azure.Messaging.ServiceBus.Amqp
 13{
 14    /// <summary>
 15    ///   A transport client abstraction responsible for brokering operations for AMQP-based connections.
 16    ///   It is intended that the public <see cref="ServiceBusConnection" /> make use of an instance via containment
 17    ///   and delegate operations to it.
 18    /// </summary>
 19    ///
 20    /// <seealso cref="Azure.Messaging.ServiceBus.Core.TransportClient" />
 21    ///
 22    internal class AmqpClient : TransportClient
 23    {
 24        /// <summary>
 25        ///   The buffer to apply when considering refreshing; credentials that expire less than this duration will be r
 26        /// </summary>
 27        ///
 028        private static TimeSpan CredentialRefreshBuffer { get; } = TimeSpan.FromMinutes(5);
 29
 30        /// <summary>Indicates whether or not this instance has been closed.</summary>
 31        private bool _closed = false;
 32
 33        /// <summary>The currently active token to use for authorization with the Service Bus service.</summary>
 34        private AccessToken _accessToken;
 35
 36        /// <summary>
 37        ///   Indicates whether or not this client has been closed.
 38        /// </summary>
 39        ///
 40        /// <value>
 41        ///   <c>true</c> if the client is closed; otherwise, <c>false</c>.
 42        /// </value>
 43        ///
 2044        public override bool IsClosed => _closed;
 45
 46        /// <summary>
 47        ///   The endpoint for the Service Bus service to which the client is associated.
 48        /// </summary>
 49        ///
 4250        public override Uri ServiceEndpoint { get; }
 51
 52        /// <summary>
 53        ///   Gets the credential to use for authorization with the Service Bus service.
 54        /// </summary>
 55        ///
 056        private ServiceBusTokenCredential Credential { get; }
 57
 58        /// <summary>
 59        ///   The AMQP connection scope responsible for managing transport constructs for this instance.
 60        /// </summary>
 61        ///
 1662        private AmqpConnectionScope ConnectionScope { get; }
 63
 64        /// <summary>
 65        ///   Initializes a new instance of the <see cref="AmqpClient"/> class.
 66        /// </summary>
 67        ///
 68        /// <param name="host">The fully qualified host name for the Service Bus namespace.  This is likely to be simila
 69        /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma
 70        /// <param name="options">A set of options to apply when configuring the client.</param>
 71        ///
 72        /// <remarks>
 73        ///   As an internal type, this class performs only basic sanity checks against its arguments.  It
 74        ///   is assumed that callers are trusted and have performed deep validation.
 75        ///
 76        ///   Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
 77        ///   creation of clones or otherwise protecting the parameters is assumed to be the purview of the
 78        ///   caller.
 79        /// </remarks>
 80        ///
 4281        internal AmqpClient(
 4282            string host,
 4283            ServiceBusTokenCredential credential,
 4284            ServiceBusClientOptions options)
 85        {
 4286            Argument.AssertNotNullOrEmpty(host, nameof(host));
 4287            Argument.AssertNotNull(credential, nameof(credential));
 4288            Argument.AssertNotNull(options, nameof(options));
 89
 4290            ServiceEndpoint = new UriBuilder
 4291            {
 4292                Scheme = options.TransportType.GetUriScheme(),
 4293                Host = host
 4294
 4295            }.Uri;
 96
 4297            Credential = credential;
 4298            ConnectionScope = new AmqpConnectionScope(
 4299                ServiceEndpoint,
 42100                credential,
 42101                options.TransportType,
 42102                options.Proxy);
 103
 42104        }
 105
 106        /// <summary>
 107        ///   Creates a producer strongly aligned with the active protocol and transport,
 108        ///   responsible for publishing <see cref="ServiceBusMessage" /> to the Service Bus entity.
 109        /// </summary>
 110        ///
 111        /// <param name="entityPath">The entity path to send the message to.</param>
 112        /// <param name="viaEntityPath">The entity path to route the message through. Useful when using transactions.</p
 113        /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
 114        /// <param name="identifier">The identifier for the sender.</param>
 115        ///
 116        /// <returns>A <see cref="TransportSender"/> configured in the requested manner.</returns>
 117        public override TransportSender CreateSender(
 118            string entityPath,
 119            string viaEntityPath,
 120            ServiceBusRetryPolicy retryPolicy,
 121            string identifier)
 122        {
 12123            Argument.AssertNotDisposed(_closed, nameof(AmqpClient));
 124
 12125            return new AmqpSender
 12126            (
 12127                entityPath,
 12128                viaEntityPath,
 12129                ConnectionScope,
 12130                retryPolicy,
 12131                identifier
 12132            );
 133        }
 134
 135        /// <summary>
 136        ///   Creates a consumer strongly aligned with the active protocol and transport, responsible
 137        ///   for reading <see cref="ServiceBusMessage" /> from a specific Service Bus entity.
 138        /// </summary>
 139        /// <param name="entityPath"></param>
 140        ///
 141        /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
 142        /// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults 
 143        /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet
 144        /// <param name="identifier"></param>
 145        /// <param name="sessionId"></param>
 146        /// <param name="isSessionReceiver"></param>
 147        ///
 148        /// <returns>A <see cref="TransportReceiver" /> configured in the requested manner.</returns>
 149        ///
 150        public override TransportReceiver CreateReceiver(
 151            string entityPath,
 152            ServiceBusRetryPolicy retryPolicy,
 153            ReceiveMode receiveMode,
 154            uint prefetchCount,
 155            string identifier,
 156            string sessionId,
 157            bool isSessionReceiver)
 158        {
 4159            Argument.AssertNotDisposed(_closed, nameof(AmqpClient));
 160
 4161            return new AmqpReceiver
 4162            (
 4163                entityPath,
 4164                receiveMode,
 4165                prefetchCount,
 4166                ConnectionScope,
 4167                retryPolicy,
 4168                identifier,
 4169                sessionId,
 4170                isSessionReceiver
 4171            );
 172        }
 173
 174        /// <summary>
 175        ///   Creates a rule manager strongly aligned with the active protocol and transport,
 176        ///   responsible for adding, removing and getting rules from the Service Bus subscription.
 177        /// </summary>
 178        ///
 179        /// <param name="subscriptionPath">The path of the Service Bus subscription to which the rule manager is bound.<
 180        /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
 181        /// <param name="identifier">The identifier for the rule manager.</param>
 182        ///
 183        /// <returns>A <see cref="TransportRuleManager"/> configured in the requested manner.</returns>
 184        public override TransportRuleManager CreateRuleManager(
 185            string subscriptionPath,
 186            ServiceBusRetryPolicy retryPolicy,
 187            string identifier)
 188        {
 0189            Argument.AssertNotDisposed(_closed, nameof(AmqpClient));
 190
 0191            return new AmqpRuleManager
 0192            (
 0193                subscriptionPath,
 0194                ConnectionScope,
 0195                retryPolicy,
 0196                identifier
 0197            );
 198        }
 199
 200        /// <summary>
 201        ///   Closes the connection to the transport client instance.
 202        /// </summary>
 203        ///
 204        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 205        ///
 206        public override Task CloseAsync(CancellationToken cancellationToken)
 207        {
 0208            if (_closed)
 209            {
 0210                return Task.CompletedTask;
 211            }
 212
 0213            _closed = true;
 214
 215            try
 216            {
 0217                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0218                ConnectionScope?.Dispose();
 0219                return Task.CompletedTask;
 220            }
 0221            catch (Exception)
 222            {
 0223                _closed = false;
 0224                throw;
 225            }
 0226        }
 227
 228        /// <summary>
 229        ///   Acquires an access token for authorization with the Service Bus service.
 230        /// </summary>
 231        ///
 232        /// <returns>The token to use for service authorization.</returns>
 233        ///
 234        private async Task<string> AcquireAccessTokenAsync(CancellationToken cancellationToken)
 235        {
 0236            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 237
 0238            AccessToken activeToken = _accessToken;
 239
 240            // If there was no current token, or it is within the buffer for expiration, request a new token.
 241            // There is a benign race condition here, where there may be multiple requests in-flight for a new token.  S
 242            // overlapping requests should be within a small window, allow the acquired token to replace the current one
 243            // attempting to coordinate or ensure that the most recent is kept.
 244
 0245            if ((string.IsNullOrEmpty(activeToken.Token)) || (activeToken.ExpiresOn <= DateTimeOffset.UtcNow.Add(Credent
 246            {
 0247                activeToken = await Credential.GetTokenUsingDefaultScopeAsync(cancellationToken).ConfigureAwait(false);
 248
 0249                if ((string.IsNullOrEmpty(activeToken.Token)))
 250                {
 0251                    throw new AuthenticationException(Resources.CouldNotAcquireAccessToken);
 252                }
 253
 0254                _accessToken = activeToken;
 255            }
 256
 0257            return activeToken.Token;
 0258        }
 259
 260        /// <summary>
 261        ///   Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
 262        /// </summary>
 263        ///
 264        /// <param name="firstOption">The first option to consider.</param>
 265        /// <param name="secondOption">The second option to consider.</param>
 266        ///
 267        /// <returns></returns>
 268        ///
 269        private static TimeSpan UseMinimum(TimeSpan firstOption,
 0270                                           TimeSpan secondOption) => (firstOption < secondOption) ? firstOption : second
 271    }
 272}