|  |  | 1 |  | // Copyright (c) Microsoft Corporation. All rights reserved. | 
|  |  | 2 |  | // Licensed under the MIT License. | 
|  |  | 3 |  |  | 
|  |  | 4 |  | using System; | 
|  |  | 5 |  | using System.ComponentModel; | 
|  |  | 6 |  | using System.Globalization; | 
|  |  | 7 |  | using System.Threading; | 
|  |  | 8 |  | using System.Threading.Tasks; | 
|  |  | 9 |  | using Azure.Core; | 
|  |  | 10 |  | using Azure.Messaging.EventHubs.Amqp; | 
|  |  | 11 |  | using Azure.Messaging.EventHubs.Authorization; | 
|  |  | 12 |  | using Azure.Messaging.EventHubs.Consumer; | 
|  |  | 13 |  | using Azure.Messaging.EventHubs.Core; | 
|  |  | 14 |  | using Azure.Messaging.EventHubs.Diagnostics; | 
|  |  | 15 |  |  | 
|  |  | 16 |  | namespace Azure.Messaging.EventHubs | 
|  |  | 17 |  | { | 
|  |  | 18 |  |     /// <summary> | 
|  |  | 19 |  |     ///   A connection to the Azure Event Hubs service, enabling client communications with a specific | 
|  |  | 20 |  |     ///   Event Hub instance within an Event Hubs namespace.  A single connection may be shared among multiple | 
|  |  | 21 |  |     ///   Event Hub producers and/or consumers, or may be used as a dedicated connection for a single | 
|  |  | 22 |  |     ///   producer or consumer client. | 
|  |  | 23 |  |     /// </summary> | 
|  |  | 24 |  |     /// | 
|  |  | 25 |  |     /// <seealso href="https://docs.microsoft.com/en-us/Azure/event-hubs/event-hubs-about" /> | 
|  |  | 26 |  |     /// | 
|  |  | 27 |  |     public class EventHubConnection : IAsyncDisposable | 
|  |  | 28 |  |     { | 
|  |  | 29 |  |         /// <summary> | 
|  |  | 30 |  |         ///   The fully qualified Event Hubs namespace that the connection is associated with.  This is likely | 
|  |  | 31 |  |         ///   to be similar to <c>{yournamespace}.servicebus.windows.net</c>. | 
|  |  | 32 |  |         /// </summary> | 
|  |  | 33 |  |         /// | 
|  | 132 | 34 |  |         public string FullyQualifiedNamespace { get; } | 
|  |  | 35 |  |  | 
|  |  | 36 |  |         /// <summary> | 
|  |  | 37 |  |         ///   The name of the Event Hub that the connection is associated with, specific to the | 
|  |  | 38 |  |         ///   Event Hubs namespace that contains it. | 
|  |  | 39 |  |         /// </summary> | 
|  |  | 40 |  |         /// | 
|  | 722 | 41 |  |         public string EventHubName { get; } | 
|  |  | 42 |  |  | 
|  |  | 43 |  |         /// <summary> | 
|  |  | 44 |  |         ///   Indicates whether or not this <see cref="EventHubConnection"/> has been closed. | 
|  |  | 45 |  |         /// </summary> | 
|  |  | 46 |  |         /// | 
|  |  | 47 |  |         /// <value> | 
|  |  | 48 |  |         ///   <c>true</c> if the connection is closed; otherwise, <c>false</c>. | 
|  |  | 49 |  |         /// </value> | 
|  |  | 50 |  |         /// | 
|  | 28 | 51 |  |         public bool IsClosed => InnerClient.IsClosed; | 
|  |  | 52 |  |  | 
|  |  | 53 |  |         /// <summary> | 
|  |  | 54 |  |         ///   The endpoint for the Event Hubs service to which the connection is associated. | 
|  |  | 55 |  |         /// </summary> | 
|  |  | 56 |  |         /// | 
|  | 0 | 57 |  |         internal Uri ServiceEndpoint => InnerClient.ServiceEndpoint; | 
|  |  | 58 |  |  | 
|  |  | 59 |  |         /// <summary> | 
|  |  | 60 |  |         ///   The set of client options used for creation of this client. | 
|  |  | 61 |  |         /// </summary> | 
|  |  | 62 |  |         /// | 
|  | 446 | 63 |  |         private EventHubConnectionOptions Options { get; set; } | 
|  |  | 64 |  |  | 
|  |  | 65 |  |         /// <summary> | 
|  |  | 66 |  |         ///   An abstracted Event Hub Client specific to the active protocol and transport intended to perform delegated | 
|  |  | 67 |  |         /// </summary> | 
|  |  | 68 |  |         /// | 
|  | 572 | 69 |  |         private TransportClient InnerClient { get; set; } | 
|  |  | 70 |  |  | 
|  |  | 71 |  |         /// <summary> | 
|  |  | 72 |  |         ///   Initializes a new instance of the <see cref="EventHubConnection"/> class. | 
|  |  | 73 |  |         /// </summary> | 
|  |  | 74 |  |         /// | 
|  |  | 75 |  |         /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i | 
|  |  | 76 |  |         /// | 
|  |  | 77 |  |         /// <remarks> | 
|  |  | 78 |  |         ///   If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of t | 
|  |  | 79 |  |         ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]] | 
|  |  | 80 |  |         ///   connection string.  For example, ";EntityPath=telemetry-hub". | 
|  |  | 81 |  |         /// | 
|  |  | 82 |  |         ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s | 
|  |  | 83 |  |         ///   Event Hub will result in a connection string that contains the name. | 
|  |  | 84 |  |         /// </remarks> | 
|  |  | 85 |  |         /// | 
|  | 26 | 86 |  |         public EventHubConnection(string connectionString) : this(connectionString, null, connectionOptions: null) | 
|  |  | 87 |  |         { | 
|  | 10 | 88 |  |         } | 
|  |  | 89 |  |  | 
|  |  | 90 |  |         /// <summary> | 
|  |  | 91 |  |         ///   Initializes a new instance of the <see cref="EventHubConnection"/> class. | 
|  |  | 92 |  |         /// </summary> | 
|  |  | 93 |  |         /// | 
|  |  | 94 |  |         /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i | 
|  |  | 95 |  |         /// <param name="connectionOptions">A set of options to apply when configuring the connection.</param> | 
|  |  | 96 |  |         /// | 
|  |  | 97 |  |         /// <remarks> | 
|  |  | 98 |  |         ///   If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of t | 
|  |  | 99 |  |         ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]] | 
|  |  | 100 |  |         ///   connection string.  For example, ";EntityPath=telemetry-hub". | 
|  |  | 101 |  |         /// | 
|  |  | 102 |  |         ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s | 
|  |  | 103 |  |         ///   Event Hub will result in a connection string that contains the name. | 
|  |  | 104 |  |         /// </remarks> | 
|  |  | 105 |  |         /// | 
|  |  | 106 |  |         public EventHubConnection(string connectionString, | 
|  | 36 | 107 |  |                                   EventHubConnectionOptions connectionOptions) : this(connectionString, null, connection | 
|  |  | 108 |  |         { | 
|  | 30 | 109 |  |         } | 
|  |  | 110 |  |  | 
|  |  | 111 |  |         /// <summary> | 
|  |  | 112 |  |         ///   Initializes a new instance of the <see cref="EventHubConnection"/> class. | 
|  |  | 113 |  |         /// </summary> | 
|  |  | 114 |  |         /// | 
|  |  | 115 |  |         /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i | 
|  |  | 116 |  |         /// <param name="eventHubName">The name of the specific Event Hub to associate the connection with.</param> | 
|  |  | 117 |  |         /// | 
|  |  | 118 |  |         /// <remarks> | 
|  |  | 119 |  |         ///   If the connection string is copied from the Event Hub itself, it will contain the name of the desired Even | 
|  |  | 120 |  |         ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub  | 
|  |  | 121 |  |         ///   passed only once, either as part of the connection string or separately. | 
|  |  | 122 |  |         /// </remarks> | 
|  |  | 123 |  |         /// | 
|  |  | 124 |  |         public EventHubConnection(string connectionString, | 
|  | 28 | 125 |  |                                   string eventHubName) : this(connectionString, eventHubName, connectionOptions: null) | 
|  |  | 126 |  |         { | 
|  | 22 | 127 |  |         } | 
|  |  | 128 |  |  | 
|  |  | 129 |  |         /// <summary> | 
|  |  | 130 |  |         ///   Initializes a new instance of the <see cref="EventHubConnection"/> class. | 
|  |  | 131 |  |         /// </summary> | 
|  |  | 132 |  |         /// | 
|  |  | 133 |  |         /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i | 
|  |  | 134 |  |         /// <param name="eventHubName">The name of the specific Event Hub to associate the connection with.</param> | 
|  |  | 135 |  |         /// <param name="connectionOptions">A set of options to apply when configuring the connection.</param> | 
|  |  | 136 |  |         /// | 
|  |  | 137 |  |         /// <remarks> | 
|  |  | 138 |  |         ///   If the connection string is copied from the Event Hub itself, it will contain the name of the desired Even | 
|  |  | 139 |  |         ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub  | 
|  |  | 140 |  |         ///   passed only once, either as part of the connection string or separately. | 
|  |  | 141 |  |         /// </remarks> | 
|  |  | 142 |  |         /// | 
|  | 208 | 143 |  |         public EventHubConnection(string connectionString, | 
|  | 208 | 144 |  |                                   string eventHubName, | 
|  | 208 | 145 |  |                                   EventHubConnectionOptions connectionOptions) | 
|  |  | 146 |  |         { | 
|  | 208 | 147 |  |             Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString)); | 
|  |  | 148 |  |  | 
|  | 192 | 149 |  |             connectionOptions = connectionOptions?.Clone() ?? new EventHubConnectionOptions(); | 
|  | 192 | 150 |  |             ValidateConnectionOptions(connectionOptions); | 
|  |  | 151 |  |  | 
|  | 190 | 152 |  |             var connectionStringProperties = ConnectionStringParser.Parse(connectionString); | 
|  | 190 | 153 |  |             connectionStringProperties.Validate(eventHubName, nameof(connectionString)); | 
|  |  | 154 |  |  | 
|  | 146 | 155 |  |             var fullyQualifiedNamespace = connectionStringProperties.Endpoint.Host; | 
|  |  | 156 |  |  | 
|  | 146 | 157 |  |             if (string.IsNullOrEmpty(eventHubName)) | 
|  |  | 158 |  |             { | 
|  | 102 | 159 |  |                 eventHubName = connectionStringProperties.EventHubName; | 
|  |  | 160 |  |             } | 
|  |  | 161 |  |  | 
|  | 146 | 162 |  |             var sharedAccessSignature = new SharedAccessSignature | 
|  | 146 | 163 |  |             ( | 
|  | 146 | 164 |  |                  BuildAudienceResource(connectionOptions.TransportType, fullyQualifiedNamespace, eventHubName), | 
|  | 146 | 165 |  |                  connectionStringProperties.SharedAccessKeyName, | 
|  | 146 | 166 |  |                  connectionStringProperties.SharedAccessKey | 
|  | 146 | 167 |  |             ); | 
|  |  | 168 |  |  | 
|  | 146 | 169 |  |             var sharedCredentials = new SharedAccessSignatureCredential(sharedAccessSignature); | 
|  | 146 | 170 |  |             var tokenCredentials = new EventHubTokenCredential(sharedCredentials, BuildAudienceResource(connectionOption | 
|  |  | 171 |  |  | 
|  | 146 | 172 |  |             FullyQualifiedNamespace = fullyQualifiedNamespace; | 
|  | 146 | 173 |  |             EventHubName = eventHubName; | 
|  | 146 | 174 |  |             Options = connectionOptions; | 
|  |  | 175 |  |  | 
|  |  | 176 |  | #pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for t | 
|  | 146 | 177 |  |             InnerClient = CreateTransportClient(fullyQualifiedNamespace, eventHubName, tokenCredentials, connectionOptio | 
|  |  | 178 |  | #pragma warning restore CA2214 // Do not call overridable methods in constructors. | 
|  | 146 | 179 |  |         } | 
|  |  | 180 |  |  | 
|  |  | 181 |  |         /// <summary> | 
|  |  | 182 |  |         ///   Initializes a new instance of the <see cref="EventHubConnection"/> class. | 
|  |  | 183 |  |         /// </summary> | 
|  |  | 184 |  |         /// | 
|  |  | 185 |  |         /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel | 
|  |  | 186 |  |         /// <param name="eventHubName">The name of the specific Event Hub to associate the connection with.</param> | 
|  |  | 187 |  |         /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma | 
|  |  | 188 |  |         /// <param name="connectionOptions">A set of options to apply when configuring the connection.</param> | 
|  |  | 189 |  |         /// | 
|  | 294 | 190 |  |         public EventHubConnection(string fullyQualifiedNamespace, | 
|  | 294 | 191 |  |                                   string eventHubName, | 
|  | 294 | 192 |  |                                   TokenCredential credential, | 
|  | 294 | 193 |  |                                   EventHubConnectionOptions connectionOptions = default) | 
|  |  | 194 |  |         { | 
|  | 294 | 195 |  |             Argument.AssertWellFormedEventHubsNamespace(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace)); | 
|  | 288 | 196 |  |             Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); | 
|  | 284 | 197 |  |             Argument.AssertNotNull(credential, nameof(credential)); | 
|  |  | 198 |  |  | 
|  | 282 | 199 |  |             connectionOptions = connectionOptions?.Clone() ?? new EventHubConnectionOptions(); | 
|  | 282 | 200 |  |             ValidateConnectionOptions(connectionOptions); | 
|  |  | 201 |  |  | 
|  |  | 202 |  |             switch (credential) | 
|  |  | 203 |  |             { | 
|  |  | 204 |  |                 case SharedAccessSignatureCredential _: | 
|  |  | 205 |  |                     break; | 
|  |  | 206 |  |  | 
|  |  | 207 |  |                 case EventHubSharedKeyCredential sharedKeyCredential: | 
|  | 0 | 208 |  |                     credential = sharedKeyCredential.AsSharedAccessSignatureCredential(BuildAudienceResource(connectionO | 
|  |  | 209 |  |                     break; | 
|  |  | 210 |  |             } | 
|  |  | 211 |  |  | 
|  | 280 | 212 |  |             var tokenCredential = new EventHubTokenCredential(credential, BuildAudienceResource(connectionOptions.Transp | 
|  |  | 213 |  |  | 
|  | 280 | 214 |  |             FullyQualifiedNamespace = fullyQualifiedNamespace; | 
|  | 280 | 215 |  |             EventHubName = eventHubName; | 
|  | 280 | 216 |  |             Options = connectionOptions; | 
|  |  | 217 |  |  | 
|  |  | 218 |  | #pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for t | 
|  | 280 | 219 |  |             InnerClient = CreateTransportClient(fullyQualifiedNamespace, eventHubName, tokenCredential, connectionOption | 
|  |  | 220 |  | #pragma warning restore CA2214 // Do not call overridable methods in constructors. | 
|  | 280 | 221 |  |         } | 
|  |  | 222 |  |  | 
|  |  | 223 |  |         /// <summary> | 
|  |  | 224 |  |         ///   Initializes a new instance of the <see cref="EventHubConnection"/> class. | 
|  |  | 225 |  |         /// </summary> | 
|  |  | 226 |  |         /// | 
|  | 180 | 227 |  |         protected EventHubConnection() | 
|  |  | 228 |  |         { | 
|  | 180 | 229 |  |         } | 
|  |  | 230 |  |  | 
|  |  | 231 |  |         /// <summary> | 
|  |  | 232 |  |         ///   Closes the connection to the Event Hubs namespace and associated Event Hub. | 
|  |  | 233 |  |         /// </summary> | 
|  |  | 234 |  |         /// | 
|  |  | 235 |  |         /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t | 
|  |  | 236 |  |         /// | 
|  |  | 237 |  |         /// <returns>A task to be resolved on when the operation has completed.</returns> | 
|  |  | 238 |  |         /// | 
|  |  | 239 |  |         public virtual async Task CloseAsync(CancellationToken cancellationToken = default) | 
|  |  | 240 |  |         { | 
|  | 18 | 241 |  |             cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); | 
|  | 18 | 242 |  |             EventHubsEventSource.Log.ClientCloseStart(nameof(EventHubConnection), EventHubName, FullyQualifiedNamespace) | 
|  |  | 243 |  |  | 
|  |  | 244 |  |             try | 
|  |  | 245 |  |             { | 
|  | 18 | 246 |  |                 await InnerClient.CloseAsync(cancellationToken).ConfigureAwait(false); | 
|  | 18 | 247 |  |             } | 
|  | 0 | 248 |  |             catch (Exception ex) | 
|  |  | 249 |  |             { | 
|  | 0 | 250 |  |                 EventHubsEventSource.Log.ClientCloseError(nameof(EventHubConnection), EventHubName, FullyQualifiedNamesp | 
|  | 0 | 251 |  |                 throw; | 
|  |  | 252 |  |             } | 
|  |  | 253 |  |             finally | 
|  |  | 254 |  |             { | 
|  | 18 | 255 |  |                 EventHubsEventSource.Log.ClientCloseComplete(nameof(EventHubConnection), EventHubName, FullyQualifiedNam | 
|  |  | 256 |  |             } | 
|  | 18 | 257 |  |         } | 
|  |  | 258 |  |  | 
|  |  | 259 |  |         /// <summary> | 
|  |  | 260 |  |         ///   Performs the task needed to clean up resources used by the <see cref="EventHubConnection" />, | 
|  |  | 261 |  |         ///   including ensuring that the connection itself has been closed. | 
|  |  | 262 |  |         /// </summary> | 
|  |  | 263 |  |         /// | 
|  |  | 264 |  |         /// <returns>A task to be resolved on when the operation has completed.</returns> | 
|  |  | 265 |  |         /// | 
|  | 2 | 266 |  |         public virtual async ValueTask DisposeAsync() => await CloseAsync().ConfigureAwait(false); | 
|  |  | 267 |  |  | 
|  |  | 268 |  |         /// <summary> | 
|  |  | 269 |  |         ///   Determines whether the specified <see cref="System.Object" /> is equal to this instance. | 
|  |  | 270 |  |         /// </summary> | 
|  |  | 271 |  |         /// | 
|  |  | 272 |  |         /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param> | 
|  |  | 273 |  |         /// | 
|  |  | 274 |  |         /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c> | 
|  |  | 275 |  |         /// | 
|  |  | 276 |  |         [EditorBrowsable(EditorBrowsableState.Never)] | 
|  | 0 | 277 |  |         public override bool Equals(object obj) => base.Equals(obj); | 
|  |  | 278 |  |  | 
|  |  | 279 |  |         /// <summary> | 
|  |  | 280 |  |         ///   Returns a hash code for this instance. | 
|  |  | 281 |  |         /// </summary> | 
|  |  | 282 |  |         /// | 
|  |  | 283 |  |         /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha | 
|  |  | 284 |  |         /// | 
|  |  | 285 |  |         [EditorBrowsable(EditorBrowsableState.Never)] | 
|  | 0 | 286 |  |         public override int GetHashCode() => base.GetHashCode(); | 
|  |  | 287 |  |  | 
|  |  | 288 |  |         /// <summary> | 
|  |  | 289 |  |         ///   Converts the instance to string representation. | 
|  |  | 290 |  |         /// </summary> | 
|  |  | 291 |  |         /// | 
|  |  | 292 |  |         /// <returns>A <see cref="System.String" /> that represents this instance.</returns> | 
|  |  | 293 |  |         /// | 
|  |  | 294 |  |         [EditorBrowsable(EditorBrowsableState.Never)] | 
|  | 20 | 295 |  |         public override string ToString() => base.ToString(); | 
|  |  | 296 |  |  | 
|  |  | 297 |  |         /// <summary> | 
|  |  | 298 |  |         ///   Retrieves information about the Event Hub that the connection is associated with, including | 
|  |  | 299 |  |         ///   the number of partitions present and their identifiers. | 
|  |  | 300 |  |         /// </summary> | 
|  |  | 301 |  |         /// | 
|  |  | 302 |  |         /// <param name="retryPolicy">The retry policy to use as the basis for retrieving the information.</param> | 
|  |  | 303 |  |         /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t | 
|  |  | 304 |  |         /// | 
|  |  | 305 |  |         /// <returns>The set of information for the Event Hub that this connection is associated with.</returns> | 
|  |  | 306 |  |         /// | 
|  |  | 307 |  |         internal virtual async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, | 
|  | 2 | 308 |  |                                                                            CancellationToken cancellationToken = default | 
|  |  | 309 |  |  | 
|  |  | 310 |  |         /// <summary> | 
|  |  | 311 |  |         ///   Retrieves the set of identifiers for the partitions of an Event Hub. | 
|  |  | 312 |  |         /// </summary> | 
|  |  | 313 |  |         /// | 
|  |  | 314 |  |         /// <param name="retryPolicy">The retry policy to use as the basis for retrieving the information.</param> | 
|  |  | 315 |  |         /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t | 
|  |  | 316 |  |         /// | 
|  |  | 317 |  |         /// <returns>The set of identifiers for the partitions within the Event Hub that this connection is associated w | 
|  |  | 318 |  |         /// | 
|  |  | 319 |  |         /// <remarks> | 
|  |  | 320 |  |         ///   This method is synonymous with invoking <see cref="GetPropertiesAsync(EventHubsRetryPolicy, CancellationTo | 
|  |  | 321 |  |         ///   property that is returned. It is offered as a convenience for quick access to the set of partition identif | 
|  |  | 322 |  |         ///   No new or extended information is presented. | 
|  |  | 323 |  |         /// </remarks> | 
|  |  | 324 |  |         /// | 
|  |  | 325 |  |         internal virtual async Task<string[]> GetPartitionIdsAsync(EventHubsRetryPolicy retryPolicy, | 
|  |  | 326 |  |                                                                    CancellationToken cancellationToken = default) => | 
|  | 76 | 327 |  |             (await GetPropertiesAsync(retryPolicy, cancellationToken).ConfigureAwait(false)).PartitionIds; | 
|  |  | 328 |  |  | 
|  |  | 329 |  |         /// <summary> | 
|  |  | 330 |  |         ///   Retrieves information about a specific partition for an Event Hub, including elements that describe the av | 
|  |  | 331 |  |         ///   events in the partition event stream. | 
|  |  | 332 |  |         /// </summary> | 
|  |  | 333 |  |         /// | 
|  |  | 334 |  |         /// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param> | 
|  |  | 335 |  |         /// <param name="retryPolicy">The retry policy to use as the basis for retrieving the information.</param> | 
|  |  | 336 |  |         /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t | 
|  |  | 337 |  |         /// | 
|  |  | 338 |  |         /// <returns>The set of information for the requested partition under the Event Hub this connection is associate | 
|  |  | 339 |  |         /// | 
|  |  | 340 |  |         internal virtual async Task<PartitionProperties> GetPartitionPropertiesAsync(string partitionId, | 
|  |  | 341 |  |                                                                                      EventHubsRetryPolicy retryPolicy, | 
|  | 2 | 342 |  |                                                                                      CancellationToken cancellationToken | 
|  |  | 343 |  |  | 
|  |  | 344 |  |         /// <summary> | 
|  |  | 345 |  |         ///   Creates a producer strongly aligned with the active protocol and transport, | 
|  |  | 346 |  |         ///   responsible for publishing <see cref="EventData" /> to the Event Hub. | 
|  |  | 347 |  |         /// </summary> | 
|  |  | 348 |  |         /// | 
|  |  | 349 |  |         /// <param name="partitionId">The identifier of the partition to which the transport producer should be bound; i | 
|  |  | 350 |  |         /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param> | 
|  |  | 351 |  |         /// | 
|  |  | 352 |  |         /// <returns>A <see cref="TransportProducer"/> configured in the requested manner.</returns> | 
|  |  | 353 |  |         /// | 
|  |  | 354 |  |         internal virtual TransportProducer CreateTransportProducer(string partitionId, | 
|  |  | 355 |  |                                                                    EventHubsRetryPolicy retryPolicy) | 
|  |  | 356 |  |         { | 
|  | 14 | 357 |  |             Argument.AssertNotNull(retryPolicy, nameof(retryPolicy)); | 
|  | 14 | 358 |  |             return InnerClient.CreateProducer(partitionId, retryPolicy); | 
|  |  | 359 |  |         } | 
|  |  | 360 |  |  | 
|  |  | 361 |  |         /// <summary> | 
|  |  | 362 |  |         ///   Creates a consumer strongly aligned with the active protocol and transport, responsible | 
|  |  | 363 |  |         ///   for reading <see cref="EventData" /> from a specific Event Hub partition, in the context | 
|  |  | 364 |  |         ///   of a specific consumer group. | 
|  |  | 365 |  |         /// | 
|  |  | 366 |  |         ///   A consumer may be exclusive, which asserts ownership over the partition for the consumer | 
|  |  | 367 |  |         ///   group to ensure that only one consumer from that group is reading the from the partition. | 
|  |  | 368 |  |         ///   These exclusive consumers are sometimes referred to as "Epoch Consumers." | 
|  |  | 369 |  |         /// | 
|  |  | 370 |  |         ///   A consumer may also be non-exclusive, allowing multiple consumers from the same consumer | 
|  |  | 371 |  |         ///   group to be actively reading events from the partition.  These non-exclusive consumers are | 
|  |  | 372 |  |         ///   sometimes referred to as "Non-epoch Consumers." | 
|  |  | 373 |  |         /// | 
|  |  | 374 |  |         ///   Designating a consumer as exclusive may be specified by setting the <paramref name="ownerLevel" />. | 
|  |  | 375 |  |         ///   When <c>null</c>, consumers are created as non-exclusive. | 
|  |  | 376 |  |         /// </summary> | 
|  |  | 377 |  |         /// | 
|  |  | 378 |  |         /// <param name="consumerGroup">The name of the consumer group this consumer is associated with.  Events are rea | 
|  |  | 379 |  |         /// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</par | 
|  |  | 380 |  |         /// <param name="eventPosition">The position within the partition where the consumer should begin reading events | 
|  |  | 381 |  |         /// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param> | 
|  |  | 382 |  |         /// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on t | 
|  |  | 383 |  |         /// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this va | 
|  |  | 384 |  |         /// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whet | 
|  |  | 385 |  |         /// | 
|  |  | 386 |  |         /// <returns>A <see cref="TransportConsumer" /> configured in the requested manner.</returns> | 
|  |  | 387 |  |         /// | 
|  |  | 388 |  |         internal virtual TransportConsumer CreateTransportConsumer(string consumerGroup, | 
|  |  | 389 |  |                                                                    string partitionId, | 
|  |  | 390 |  |                                                                    EventPosition eventPosition, | 
|  |  | 391 |  |                                                                    EventHubsRetryPolicy retryPolicy, | 
|  |  | 392 |  |                                                                    bool trackLastEnqueuedEventProperties = true, | 
|  |  | 393 |  |                                                                    long? ownerLevel = default, | 
|  |  | 394 |  |                                                                    uint? prefetchCount = default) | 
|  |  | 395 |  |         { | 
|  | 74 | 396 |  |             Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup)); | 
|  | 70 | 397 |  |             Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId)); | 
|  | 66 | 398 |  |             Argument.AssertNotNull(retryPolicy, nameof(retryPolicy)); | 
|  |  | 399 |  |  | 
|  | 64 | 400 |  |             return InnerClient.CreateConsumer(consumerGroup, partitionId, eventPosition, retryPolicy, trackLastEnqueuedE | 
|  |  | 401 |  |         } | 
|  |  | 402 |  |  | 
|  |  | 403 |  |         /// <summary> | 
|  |  | 404 |  |         ///   Builds an Event Hub client specific to the protocol and transport specified by the | 
|  |  | 405 |  |         ///   requested connection type of the <paramref name="options" />. | 
|  |  | 406 |  |         /// </summary> | 
|  |  | 407 |  |         /// | 
|  |  | 408 |  |         /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace.  This is likely to be simila | 
|  |  | 409 |  |         /// <param name="eventHubName">The name of a specific Event Hub.</param> | 
|  |  | 410 |  |         /// <param name="credential">The Azure managed identity credential to use for authorization.</param> | 
|  |  | 411 |  |         /// <param name="options">The set of options to use for the client.</param> | 
|  |  | 412 |  |         /// | 
|  |  | 413 |  |         /// <returns>A client generalization specific to the specified protocol/transport to which operations may be del | 
|  |  | 414 |  |         /// | 
|  |  | 415 |  |         /// <remarks> | 
|  |  | 416 |  |         ///   As an internal method, only basic sanity checks are performed against arguments.  It is | 
|  |  | 417 |  |         ///   assumed that callers are trusted and have performed deep validation. | 
|  |  | 418 |  |         /// | 
|  |  | 419 |  |         ///   Parameters passed are also assumed to be owned by thee transport client and safe to mutate or dispose; | 
|  |  | 420 |  |         ///   creation of clones or otherwise protecting the parameters is assumed to be the purview of the caller. | 
|  |  | 421 |  |         /// </remarks> | 
|  |  | 422 |  |         /// | 
|  |  | 423 |  |         internal virtual TransportClient CreateTransportClient(string fullyQualifiedNamespace, | 
|  |  | 424 |  |                                                                string eventHubName, | 
|  |  | 425 |  |                                                                EventHubTokenCredential credential, | 
|  |  | 426 |  |                                                                EventHubConnectionOptions options) | 
|  |  | 427 |  |         { | 
|  | 146 | 428 |  |             switch (options.TransportType) | 
|  |  | 429 |  |             { | 
|  |  | 430 |  |                 case EventHubsTransportType.AmqpTcp: | 
|  |  | 431 |  |                 case EventHubsTransportType.AmqpWebSockets: | 
|  | 144 | 432 |  |                     return new AmqpClient(fullyQualifiedNamespace, eventHubName, credential, options); | 
|  |  | 433 |  |  | 
|  |  | 434 |  |                 default: | 
|  |  | 435 |  | #pragma warning disable CA2208 // Instantiate argument exceptions correctly.  "TransportType" is a reasonable name.  It' | 
|  | 2 | 436 |  |                     throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidTransportType | 
|  |  | 437 |  | #pragma warning restore CA2208 // Instantiate argument exceptions correctly | 
|  |  | 438 |  |             } | 
|  |  | 439 |  |         } | 
|  |  | 440 |  |  | 
|  |  | 441 |  |         /// <summary> | 
|  |  | 442 |  |         ///   Builds the audience for use in the signature. | 
|  |  | 443 |  |         /// </summary> | 
|  |  | 444 |  |         /// | 
|  |  | 445 |  |         /// <param name="transportType">The type of protocol and transport that will be used for communicating with the  | 
|  |  | 446 |  |         /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace.  This is likely to be simila | 
|  |  | 447 |  |         /// <param name="eventHubName">The name of the specific Event Hub to connect the client to.</param> | 
|  |  | 448 |  |         /// | 
|  |  | 449 |  |         /// <returns>The value to use as the audience of the signature.</returns> | 
|  |  | 450 |  |         /// | 
|  |  | 451 |  |         private static string BuildAudienceResource(EventHubsTransportType transportType, | 
|  |  | 452 |  |                                                     string fullyQualifiedNamespace, | 
|  |  | 453 |  |                                                     string eventHubName) | 
|  |  | 454 |  |         { | 
|  | 576 | 455 |  |             var builder = new UriBuilder(fullyQualifiedNamespace) | 
|  | 576 | 456 |  |             { | 
|  | 576 | 457 |  |                 Scheme = transportType.GetUriScheme(), | 
|  | 576 | 458 |  |                 Path = eventHubName, | 
|  | 576 | 459 |  |                 Port = -1, | 
|  | 576 | 460 |  |                 Fragment = string.Empty, | 
|  | 576 | 461 |  |                 Password = string.Empty, | 
|  | 576 | 462 |  |                 UserName = string.Empty, | 
|  | 576 | 463 |  |             }; | 
|  |  | 464 |  |  | 
|  | 576 | 465 |  |             if (builder.Path.EndsWith("/", StringComparison.Ordinal)) | 
|  |  | 466 |  |             { | 
|  | 2 | 467 |  |                 builder.Path = builder.Path.TrimEnd('/'); | 
|  |  | 468 |  |             } | 
|  |  | 469 |  |  | 
|  | 576 | 470 |  |             return builder.Uri.AbsoluteUri.ToLowerInvariant(); | 
|  |  | 471 |  |         } | 
|  |  | 472 |  |  | 
|  |  | 473 |  |         /// <summary> | 
|  |  | 474 |  |         ///   Performs the actions needed to validate the <see cref="EventHubConnectionOptions" /> associated | 
|  |  | 475 |  |         ///   with this client. | 
|  |  | 476 |  |         /// </summary> | 
|  |  | 477 |  |         /// | 
|  |  | 478 |  |         /// <param name="connectionOptions">The set of options to validate.</param> | 
|  |  | 479 |  |         /// | 
|  |  | 480 |  |         /// <remarks> | 
|  |  | 481 |  |         ///   In the case that the options violate an invariant or otherwise represent a combination that | 
|  |  | 482 |  |         ///   is not permissible, an appropriate exception will be thrown. | 
|  |  | 483 |  |         /// </remarks> | 
|  |  | 484 |  |         /// | 
|  |  | 485 |  |         private static void ValidateConnectionOptions(EventHubConnectionOptions connectionOptions) | 
|  |  | 486 |  |         { | 
|  |  | 487 |  |             // If there were no options passed, they cannot be in an invalid state. | 
|  |  | 488 |  |  | 
|  | 474 | 489 |  |             if (connectionOptions == null) | 
|  |  | 490 |  |             { | 
|  | 0 | 491 |  |                 return; | 
|  |  | 492 |  |             } | 
|  |  | 493 |  |  | 
|  |  | 494 |  |             // A proxy is only valid when web sockets is used as the transport. | 
|  |  | 495 |  |  | 
|  | 474 | 496 |  |             if ((!connectionOptions.TransportType.IsWebSocketTransport()) && (connectionOptions.Proxy != null)) | 
|  |  | 497 |  |             { | 
|  | 4 | 498 |  |                 throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.ProxyMustUseWebSockets), | 
|  |  | 499 |  |             } | 
|  | 470 | 500 |  |         } | 
|  |  | 501 |  |     } | 
|  |  | 502 |  | } |