|   |  | 1 |  | // Copyright (c) Microsoft Corporation. All rights reserved. | 
|   |  | 2 |  | // Licensed under the MIT License. | 
|   |  | 3 |  |  | 
|   |  | 4 |  | using System; | 
|   |  | 5 |  | using System.Security.Authentication; | 
|   |  | 6 |  | using System.Threading; | 
|   |  | 7 |  | using System.Threading.Tasks; | 
|   |  | 8 |  | using Azure.Core; | 
|   |  | 9 |  | using Azure.Messaging.ServiceBus.Authorization; | 
|   |  | 10 |  | using Azure.Messaging.ServiceBus.Core; | 
|   |  | 11 |  |  | 
|   |  | 12 |  | namespace Azure.Messaging.ServiceBus.Amqp | 
|   |  | 13 |  | { | 
|   |  | 14 |  |     /// <summary> | 
|   |  | 15 |  |     ///   A transport client abstraction responsible for brokering operations for AMQP-based connections. | 
|   |  | 16 |  |     ///   It is intended that the public <see cref="ServiceBusConnection" /> make use of an instance via containment | 
|   |  | 17 |  |     ///   and delegate operations to it. | 
|   |  | 18 |  |     /// </summary> | 
|   |  | 19 |  |     /// | 
|   |  | 20 |  |     /// <seealso cref="Azure.Messaging.ServiceBus.Core.TransportClient" /> | 
|   |  | 21 |  |     /// | 
|   |  | 22 |  |     internal class AmqpClient : TransportClient | 
|   |  | 23 |  |     { | 
|   |  | 24 |  |         /// <summary> | 
|   |  | 25 |  |         ///   The buffer to apply when considering refreshing; credentials that expire less than this duration will be r | 
|   |  | 26 |  |         /// </summary> | 
|   |  | 27 |  |         /// | 
|   | 0 | 28 |  |         private static TimeSpan CredentialRefreshBuffer { get; } = TimeSpan.FromMinutes(5); | 
|   |  | 29 |  |  | 
|   |  | 30 |  |         /// <summary>Indicates whether or not this instance has been closed.</summary> | 
|   |  | 31 |  |         private bool _closed = false; | 
|   |  | 32 |  |  | 
|   |  | 33 |  |         /// <summary>The currently active token to use for authorization with the Service Bus service.</summary> | 
|   |  | 34 |  |         private AccessToken _accessToken; | 
|   |  | 35 |  |  | 
|   |  | 36 |  |         /// <summary> | 
|   |  | 37 |  |         ///   Indicates whether or not this client has been closed. | 
|   |  | 38 |  |         /// </summary> | 
|   |  | 39 |  |         /// | 
|   |  | 40 |  |         /// <value> | 
|   |  | 41 |  |         ///   <c>true</c> if the client is closed; otherwise, <c>false</c>. | 
|   |  | 42 |  |         /// </value> | 
|   |  | 43 |  |         /// | 
|   | 20 | 44 |  |         public override bool IsClosed => _closed; | 
|   |  | 45 |  |  | 
|   |  | 46 |  |         /// <summary> | 
|   |  | 47 |  |         ///   The endpoint for the Service Bus service to which the client is associated. | 
|   |  | 48 |  |         /// </summary> | 
|   |  | 49 |  |         /// | 
|   | 42 | 50 |  |         public override Uri ServiceEndpoint { get; } | 
|   |  | 51 |  |  | 
|   |  | 52 |  |         /// <summary> | 
|   |  | 53 |  |         ///   Gets the credential to use for authorization with the Service Bus service. | 
|   |  | 54 |  |         /// </summary> | 
|   |  | 55 |  |         /// | 
|   | 0 | 56 |  |         private ServiceBusTokenCredential Credential { get; } | 
|   |  | 57 |  |  | 
|   |  | 58 |  |         /// <summary> | 
|   |  | 59 |  |         ///   The AMQP connection scope responsible for managing transport constructs for this instance. | 
|   |  | 60 |  |         /// </summary> | 
|   |  | 61 |  |         /// | 
|   | 16 | 62 |  |         private AmqpConnectionScope ConnectionScope { get; } | 
|   |  | 63 |  |  | 
|   |  | 64 |  |         /// <summary> | 
|   |  | 65 |  |         ///   Initializes a new instance of the <see cref="AmqpClient"/> class. | 
|   |  | 66 |  |         /// </summary> | 
|   |  | 67 |  |         /// | 
|   |  | 68 |  |         /// <param name="host">The fully qualified host name for the Service Bus namespace.  This is likely to be simila | 
|   |  | 69 |  |         /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma | 
|   |  | 70 |  |         /// <param name="options">A set of options to apply when configuring the client.</param> | 
|   |  | 71 |  |         /// | 
|   |  | 72 |  |         /// <remarks> | 
|   |  | 73 |  |         ///   As an internal type, this class performs only basic sanity checks against its arguments.  It | 
|   |  | 74 |  |         ///   is assumed that callers are trusted and have performed deep validation. | 
|   |  | 75 |  |         /// | 
|   |  | 76 |  |         ///   Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose; | 
|   |  | 77 |  |         ///   creation of clones or otherwise protecting the parameters is assumed to be the purview of the | 
|   |  | 78 |  |         ///   caller. | 
|   |  | 79 |  |         /// </remarks> | 
|   |  | 80 |  |         /// | 
|   | 42 | 81 |  |         internal AmqpClient( | 
|   | 42 | 82 |  |             string host, | 
|   | 42 | 83 |  |             ServiceBusTokenCredential credential, | 
|   | 42 | 84 |  |             ServiceBusClientOptions options) | 
|   |  | 85 |  |         { | 
|   | 42 | 86 |  |             Argument.AssertNotNullOrEmpty(host, nameof(host)); | 
|   | 42 | 87 |  |             Argument.AssertNotNull(credential, nameof(credential)); | 
|   | 42 | 88 |  |             Argument.AssertNotNull(options, nameof(options)); | 
|   |  | 89 |  |  | 
|   | 42 | 90 |  |             ServiceEndpoint = new UriBuilder | 
|   | 42 | 91 |  |             { | 
|   | 42 | 92 |  |                 Scheme = options.TransportType.GetUriScheme(), | 
|   | 42 | 93 |  |                 Host = host | 
|   | 42 | 94 |  |  | 
|   | 42 | 95 |  |             }.Uri; | 
|   |  | 96 |  |  | 
|   | 42 | 97 |  |             Credential = credential; | 
|   | 42 | 98 |  |             ConnectionScope = new AmqpConnectionScope( | 
|   | 42 | 99 |  |                 ServiceEndpoint, | 
|   | 42 | 100 |  |                 credential, | 
|   | 42 | 101 |  |                 options.TransportType, | 
|   | 42 | 102 |  |                 options.Proxy); | 
|   |  | 103 |  |  | 
|   | 42 | 104 |  |         } | 
|   |  | 105 |  |  | 
|   |  | 106 |  |         /// <summary> | 
|   |  | 107 |  |         ///   Creates a producer strongly aligned with the active protocol and transport, | 
|   |  | 108 |  |         ///   responsible for publishing <see cref="ServiceBusMessage" /> to the Service Bus entity. | 
|   |  | 109 |  |         /// </summary> | 
|   |  | 110 |  |         /// | 
|   |  | 111 |  |         /// <param name="entityPath">The entity path to send the message to.</param> | 
|   |  | 112 |  |         /// <param name="viaEntityPath">The entity path to route the message through. Useful when using transactions.</p | 
|   |  | 113 |  |         /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param> | 
|   |  | 114 |  |         /// <param name="identifier">The identifier for the sender.</param> | 
|   |  | 115 |  |         /// | 
|   |  | 116 |  |         /// <returns>A <see cref="TransportSender"/> configured in the requested manner.</returns> | 
|   |  | 117 |  |         public override TransportSender CreateSender( | 
|   |  | 118 |  |             string entityPath, | 
|   |  | 119 |  |             string viaEntityPath, | 
|   |  | 120 |  |             ServiceBusRetryPolicy retryPolicy, | 
|   |  | 121 |  |             string identifier) | 
|   |  | 122 |  |         { | 
|   | 12 | 123 |  |             Argument.AssertNotDisposed(_closed, nameof(AmqpClient)); | 
|   |  | 124 |  |  | 
|   | 12 | 125 |  |             return new AmqpSender | 
|   | 12 | 126 |  |             ( | 
|   | 12 | 127 |  |                 entityPath, | 
|   | 12 | 128 |  |                 viaEntityPath, | 
|   | 12 | 129 |  |                 ConnectionScope, | 
|   | 12 | 130 |  |                 retryPolicy, | 
|   | 12 | 131 |  |                 identifier | 
|   | 12 | 132 |  |             ); | 
|   |  | 133 |  |         } | 
|   |  | 134 |  |  | 
|   |  | 135 |  |         /// <summary> | 
|   |  | 136 |  |         ///   Creates a consumer strongly aligned with the active protocol and transport, responsible | 
|   |  | 137 |  |         ///   for reading <see cref="ServiceBusMessage" /> from a specific Service Bus entity. | 
|   |  | 138 |  |         /// </summary> | 
|   |  | 139 |  |         /// <param name="entityPath"></param> | 
|   |  | 140 |  |         /// | 
|   |  | 141 |  |         /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param> | 
|   |  | 142 |  |         /// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults  | 
|   |  | 143 |  |         /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet | 
|   |  | 144 |  |         /// <param name="identifier"></param> | 
|   |  | 145 |  |         /// <param name="sessionId"></param> | 
|   |  | 146 |  |         /// <param name="isSessionReceiver"></param> | 
|   |  | 147 |  |         /// | 
|   |  | 148 |  |         /// <returns>A <see cref="TransportReceiver" /> configured in the requested manner.</returns> | 
|   |  | 149 |  |         /// | 
|   |  | 150 |  |         public override TransportReceiver CreateReceiver( | 
|   |  | 151 |  |             string entityPath, | 
|   |  | 152 |  |             ServiceBusRetryPolicy retryPolicy, | 
|   |  | 153 |  |             ReceiveMode receiveMode, | 
|   |  | 154 |  |             uint prefetchCount, | 
|   |  | 155 |  |             string identifier, | 
|   |  | 156 |  |             string sessionId, | 
|   |  | 157 |  |             bool isSessionReceiver) | 
|   |  | 158 |  |         { | 
|   | 4 | 159 |  |             Argument.AssertNotDisposed(_closed, nameof(AmqpClient)); | 
|   |  | 160 |  |  | 
|   | 4 | 161 |  |             return new AmqpReceiver | 
|   | 4 | 162 |  |             ( | 
|   | 4 | 163 |  |                 entityPath, | 
|   | 4 | 164 |  |                 receiveMode, | 
|   | 4 | 165 |  |                 prefetchCount, | 
|   | 4 | 166 |  |                 ConnectionScope, | 
|   | 4 | 167 |  |                 retryPolicy, | 
|   | 4 | 168 |  |                 identifier, | 
|   | 4 | 169 |  |                 sessionId, | 
|   | 4 | 170 |  |                 isSessionReceiver | 
|   | 4 | 171 |  |             ); | 
|   |  | 172 |  |         } | 
|   |  | 173 |  |  | 
|   |  | 174 |  |         /// <summary> | 
|   |  | 175 |  |         ///   Creates a rule manager strongly aligned with the active protocol and transport, | 
|   |  | 176 |  |         ///   responsible for adding, removing and getting rules from the Service Bus subscription. | 
|   |  | 177 |  |         /// </summary> | 
|   |  | 178 |  |         /// | 
|   |  | 179 |  |         /// <param name="subscriptionPath">The path of the Service Bus subscription to which the rule manager is bound.< | 
|   |  | 180 |  |         /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param> | 
|   |  | 181 |  |         /// <param name="identifier">The identifier for the rule manager.</param> | 
|   |  | 182 |  |         /// | 
|   |  | 183 |  |         /// <returns>A <see cref="TransportRuleManager"/> configured in the requested manner.</returns> | 
|   |  | 184 |  |         public override TransportRuleManager CreateRuleManager( | 
|   |  | 185 |  |             string subscriptionPath, | 
|   |  | 186 |  |             ServiceBusRetryPolicy retryPolicy, | 
|   |  | 187 |  |             string identifier) | 
|   |  | 188 |  |         { | 
|   | 0 | 189 |  |             Argument.AssertNotDisposed(_closed, nameof(AmqpClient)); | 
|   |  | 190 |  |  | 
|   | 0 | 191 |  |             return new AmqpRuleManager | 
|   | 0 | 192 |  |             ( | 
|   | 0 | 193 |  |                 subscriptionPath, | 
|   | 0 | 194 |  |                 ConnectionScope, | 
|   | 0 | 195 |  |                 retryPolicy, | 
|   | 0 | 196 |  |                 identifier | 
|   | 0 | 197 |  |             ); | 
|   |  | 198 |  |         } | 
|   |  | 199 |  |  | 
|   |  | 200 |  |         /// <summary> | 
|   |  | 201 |  |         ///   Closes the connection to the transport client instance. | 
|   |  | 202 |  |         /// </summary> | 
|   |  | 203 |  |         /// | 
|   |  | 204 |  |         /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t | 
|   |  | 205 |  |         /// | 
|   |  | 206 |  |         public override Task CloseAsync(CancellationToken cancellationToken) | 
|   |  | 207 |  |         { | 
|   | 0 | 208 |  |             if (_closed) | 
|   |  | 209 |  |             { | 
|   | 0 | 210 |  |                 return Task.CompletedTask; | 
|   |  | 211 |  |             } | 
|   |  | 212 |  |  | 
|   | 0 | 213 |  |             _closed = true; | 
|   |  | 214 |  |  | 
|   |  | 215 |  |             try | 
|   |  | 216 |  |             { | 
|   | 0 | 217 |  |                 cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | 
|   | 0 | 218 |  |                 ConnectionScope?.Dispose(); | 
|   | 0 | 219 |  |                 return Task.CompletedTask; | 
|   |  | 220 |  |             } | 
|   | 0 | 221 |  |             catch (Exception) | 
|   |  | 222 |  |             { | 
|   | 0 | 223 |  |                 _closed = false; | 
|   | 0 | 224 |  |                 throw; | 
|   |  | 225 |  |             } | 
|   | 0 | 226 |  |         } | 
|   |  | 227 |  |  | 
|   |  | 228 |  |         /// <summary> | 
|   |  | 229 |  |         ///   Acquires an access token for authorization with the Service Bus service. | 
|   |  | 230 |  |         /// </summary> | 
|   |  | 231 |  |         /// | 
|   |  | 232 |  |         /// <returns>The token to use for service authorization.</returns> | 
|   |  | 233 |  |         /// | 
|   |  | 234 |  |         private async Task<string> AcquireAccessTokenAsync(CancellationToken cancellationToken) | 
|   |  | 235 |  |         { | 
|   | 0 | 236 |  |             cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | 
|   |  | 237 |  |  | 
|   | 0 | 238 |  |             AccessToken activeToken = _accessToken; | 
|   |  | 239 |  |  | 
|   |  | 240 |  |             // If there was no current token, or it is within the buffer for expiration, request a new token. | 
|   |  | 241 |  |             // There is a benign race condition here, where there may be multiple requests in-flight for a new token.  S | 
|   |  | 242 |  |             // overlapping requests should be within a small window, allow the acquired token to replace the current one | 
|   |  | 243 |  |             // attempting to coordinate or ensure that the most recent is kept. | 
|   |  | 244 |  |  | 
|   | 0 | 245 |  |             if ((string.IsNullOrEmpty(activeToken.Token)) || (activeToken.ExpiresOn <= DateTimeOffset.UtcNow.Add(Credent | 
|   |  | 246 |  |             { | 
|   | 0 | 247 |  |                 activeToken = await Credential.GetTokenUsingDefaultScopeAsync(cancellationToken).ConfigureAwait(false); | 
|   |  | 248 |  |  | 
|   | 0 | 249 |  |                 if ((string.IsNullOrEmpty(activeToken.Token))) | 
|   |  | 250 |  |                 { | 
|   | 0 | 251 |  |                     throw new AuthenticationException(Resources.CouldNotAcquireAccessToken); | 
|   |  | 252 |  |                 } | 
|   |  | 253 |  |  | 
|   | 0 | 254 |  |                 _accessToken = activeToken; | 
|   |  | 255 |  |             } | 
|   |  | 256 |  |  | 
|   | 0 | 257 |  |             return activeToken.Token; | 
|   | 0 | 258 |  |         } | 
|   |  | 259 |  |  | 
|   |  | 260 |  |         /// <summary> | 
|   |  | 261 |  |         ///   Uses the minimum value of the two specified <see cref="TimeSpan" /> instances. | 
|   |  | 262 |  |         /// </summary> | 
|   |  | 263 |  |         /// | 
|   |  | 264 |  |         /// <param name="firstOption">The first option to consider.</param> | 
|   |  | 265 |  |         /// <param name="secondOption">The second option to consider.</param> | 
|   |  | 266 |  |         /// | 
|   |  | 267 |  |         /// <returns></returns> | 
|   |  | 268 |  |         /// | 
|   |  | 269 |  |         private static TimeSpan UseMinimum(TimeSpan firstOption, | 
|   | 0 | 270 |  |                                            TimeSpan secondOption) => (firstOption < secondOption) ? firstOption : second | 
|   |  | 271 |  |     } | 
|   |  | 272 |  | } |