< Summary

Class:Azure.Messaging.EventHubs.Producer.EventHubProducerClient
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Producer\EventHubProducerClient.cs
Covered lines:172
Uncovered lines:9
Coverable lines:181
Total lines:762
Line coverage:95% (172 of 181)
Covered branches:62
Total branches:68
Branch coverage:91.1% (62 of 68)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-100%100%
get_FullyQualifiedNamespace()-100%100%
get_EventHubName()-100%100%
get_IsClosed()-100%100%
set_IsClosed(...)-100%100%
get_OwnsConnection()-100%100%
get_RetryPolicy()-100%100%
get_Connection()-100%100%
get_PartitionProducerPool()-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor(...)-100%100%
.ctor()-100%100%
GetEventHubPropertiesAsync()-100%100%
GetPartitionIdsAsync()-100%100%
GetPartitionPropertiesAsync()-100%100%
SendAsync()-100%100%
SendAsync()-100%100%
SendAsync()-100%100%
SendAsync()-96.43%83.33%
SendAsync()-95.65%80%
CreateBatchAsync()-100%100%
CreateBatchAsync()-100%100%
CloseAsync()-82.61%83.33%
DisposeAsync()-0%100%
Equals(...)-0%100%
GetHashCode()-100%100%
ToString()-0%100%
CreateDiagnosticScope(...)-100%100%
InstrumentMessages(...)-100%100%
AssertSinglePartitionReference(...)-100%100%
ShouldRecreateProducer(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Producer\EventHubProducerClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.ComponentModel;
 7using System.Diagnostics.CodeAnalysis;
 8using System.Globalization;
 9using System.Linq;
 10using System.Runtime.ExceptionServices;
 11using System.Threading;
 12using System.Threading.Tasks;
 13using Azure.Core;
 14using Azure.Core.Pipeline;
 15using Azure.Messaging.EventHubs.Core;
 16using Azure.Messaging.EventHubs.Diagnostics;
 17
 18namespace Azure.Messaging.EventHubs.Producer
 19{
 20    /// <summary>
 21    ///   A client responsible for publishing <see cref="EventData" /> to a specific Event Hub,
 22    ///   grouped together in batches.  Depending on the options specified when sending, events data
 23    ///   may be automatically routed to an available partition or sent to a specifically requested partition.
 24    /// </summary>
 25    ///
 26    /// <remarks>
 27    ///   Allowing automatic routing of partitions is recommended when:
 28    ///   <para>- The sending of events needs to be highly available.</para>
 29    ///   <para>- The event data should be evenly distributed among all available partitions.</para>
 30    ///
 31    ///   If no partition is specified, the following rules are used for automatically selecting one:
 32    ///   <para>1) Distribute the events equally amongst all available partitions using a round-robin approach.</para>
 33    ///   <para>2) If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward t
 34    /// </remarks>
 35    ///
 36    public class EventHubProducerClient : IAsyncDisposable
 37    {
 38        /// <summary>The maximum number of attempts that may be made to get a <see cref="TransportProducer" /> from the 
 39        internal const int MaximumCreateProducerAttempts = 3;
 40
 41        /// <summary>The minimum allowable size, in bytes, for a batch to be sent.</summary>
 42        internal const int MinimumBatchSizeLimit = 24;
 43
 44        /// <summary>The set of default publishing options to use when no specific options are requested.</summary>
 245        private static readonly SendEventOptions DefaultSendOptions = new SendEventOptions();
 46
 47        /// <summary>Sets how long a dedicated <see cref="TransportProducer" /> would sit in memory before its <see cref
 248        private static readonly TimeSpan PartitionProducerLifespan = TimeSpan.FromMinutes(5);
 49
 50        /// <summary>Indicates whether or not this instance has been closed.</summary>
 51        private volatile bool _closed = false;
 52
 53        /// <summary>
 54        ///   The fully qualified Event Hubs namespace that the producer is associated with.  This is likely
 55        ///   to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
 56        /// </summary>
 57        ///
 9258        public string FullyQualifiedNamespace => Connection.FullyQualifiedNamespace;
 59
 60        /// <summary>
 61        ///   The name of the Event Hub that the producer is connected to, specific to the
 62        ///   Event Hubs namespace that contains it.
 63        /// </summary>
 64        ///
 11065        public string EventHubName => Connection.EventHubName;
 66
 67        /// <summary>
 68        ///   Indicates whether or not this <see cref="EventHubProducerClient" /> has been closed.
 69        /// </summary>
 70        ///
 71        /// <value>
 72        ///   <c>true</c> if the client is closed; otherwise, <c>false</c>.
 73        /// </value>
 74        ///
 75        public bool IsClosed
 76        {
 3477            get => _closed;
 1278            protected set => _closed = value;
 79        }
 80
 81        /// <summary>
 82        ///   Indicates whether the client has ownership of the associated <see cref="EventHubConnection" />
 83        ///   and should take responsibility for managing its lifespan.
 84        /// </summary>
 85        ///
 16286        private bool OwnsConnection { get; } = true;
 87
 88        /// <summary>
 89        ///   The policy to use for determining retry behavior for when an operation fails.
 90        /// </summary>
 91        ///
 9492        private EventHubsRetryPolicy RetryPolicy { get; }
 93
 94        /// <summary>
 95        ///   The active connection to the Azure Event Hubs service, enabling client communications for metadata
 96        ///   about the associated Event Hub and access to a transport-aware producer.
 97        /// </summary>
 98        ///
 30499        private EventHubConnection Connection { get; }
 100
 101        /// <summary>
 102        ///   A <see cref="TransportProducerPool" /> used to manage a set of partition specific <see cref="TransportProd
 103        /// </summary>
 104        ///
 94105        private TransportProducerPool PartitionProducerPool { get; }
 106
 107        /// <summary>
 108        ///   Initializes a new instance of the <see cref="EventHubProducerClient" /> class.
 109        /// </summary>
 110        ///
 111        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 112        ///
 113        /// <remarks>
 114        ///   If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of t
 115        ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]
 116        ///   connection string.  For example, ";EntityPath=telemetry-hub".
 117        ///
 118        ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s
 119        ///   Event Hub will result in a connection string that contains the name.
 120        /// </remarks>
 121        ///
 122        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 123        ///
 16124        public EventHubProducerClient(string connectionString) : this(connectionString, null, null)
 125        {
 4126        }
 127
 128        /// <summary>
 129        ///   Initializes a new instance of the <see cref="EventHubProducerClient" /> class.
 130        /// </summary>
 131        ///
 132        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 133        /// <param name="clientOptions">The set of options to use for this consumer.</param>
 134        ///
 135        /// <remarks>
 136        ///   If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of t
 137        ///   which is needed.  In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]
 138        ///   connection string.  For example, ";EntityPath=telemetry-hub".
 139        ///
 140        ///   If you have defined a shared access policy directly on the Event Hub itself, then copying the connection s
 141        ///   Event Hub will result in a connection string that contains the name.
 142        /// </remarks>
 143        ///
 144        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 145        ///
 146        public EventHubProducerClient(string connectionString,
 2147                                      EventHubProducerClientOptions clientOptions) : this(connectionString, null, client
 148        {
 2149        }
 150
 151        /// <summary>
 152        ///   Initializes a new instance of the <see cref="EventHubProducerClient" /> class.
 153        /// </summary>
 154        ///
 155        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 156        /// <param name="eventHubName">The name of the specific Event Hub to associate the producer with.</param>
 157        ///
 158        /// <remarks>
 159        ///   If the connection string is copied from the Event Hub itself, it will contain the name of the desired Even
 160        ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub 
 161        ///   passed only once, either as part of the connection string or separately.
 162        /// </remarks>
 163        ///
 164        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 165        ///
 166        public EventHubProducerClient(string connectionString,
 8167                                      string eventHubName) : this(connectionString, eventHubName, null)
 168        {
 2169        }
 170
 171        /// <summary>
 172        ///   Initializes a new instance of the <see cref="EventHubProducerClient" /> class.
 173        /// </summary>
 174        ///
 175        /// <param name="connectionString">The connection string to use for connecting to the Event Hubs namespace; it i
 176        /// <param name="eventHubName">The name of the specific Event Hub to associate the producer with.</param>
 177        /// <param name="clientOptions">A set of options to apply when configuring the producer.</param>
 178        ///
 179        /// <remarks>
 180        ///   If the connection string is copied from the Event Hub itself, it will contain the name of the desired Even
 181        ///   and can be used directly without passing the <paramref name="eventHubName" />.  The name of the Event Hub 
 182        ///   passed only once, either as part of the connection string or separately.
 183        /// </remarks>
 184        ///
 185        /// <seealso href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string"/>
 186        ///
 30187        public EventHubProducerClient(string connectionString,
 30188                                      string eventHubName,
 30189                                      EventHubProducerClientOptions clientOptions)
 190        {
 30191            Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString));
 22192            clientOptions = clientOptions?.Clone() ?? new EventHubProducerClientOptions();
 193
 22194            OwnsConnection = true;
 22195            Connection = new EventHubConnection(connectionString, eventHubName, clientOptions.ConnectionOptions);
 8196            RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
 8197            PartitionProducerPool = new TransportProducerPool(Connection, RetryPolicy);
 8198        }
 199
 200        /// <summary>
 201        ///   Initializes a new instance of the <see cref="EventHubProducerClient" /> class.
 202        /// </summary>
 203        ///
 204        /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to connect to.  This is likel
 205        /// <param name="eventHubName">The name of the specific Event Hub to associate the producer with.</param>
 206        /// <param name="credential">The Azure managed identity credential to use for authorization.  Access controls ma
 207        /// <param name="clientOptions">A set of options to apply when configuring the producer.</param>
 208        ///
 16209        public EventHubProducerClient(string fullyQualifiedNamespace,
 16210                                      string eventHubName,
 16211                                      TokenCredential credential,
 16212                                      EventHubProducerClientOptions clientOptions = default)
 213        {
 16214            Argument.AssertWellFormedEventHubsNamespace(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace));
 10215            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 6216            Argument.AssertNotNull(credential, nameof(credential));
 217
 4218            clientOptions = clientOptions?.Clone() ?? new EventHubProducerClientOptions();
 219
 4220            OwnsConnection = true;
 4221            Connection = new EventHubConnection(fullyQualifiedNamespace, eventHubName, credential, clientOptions.Connect
 4222            RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
 4223            PartitionProducerPool = new TransportProducerPool(Connection, RetryPolicy);
 4224        }
 225
 226        /// <summary>
 227        ///   Initializes a new instance of the <see cref="EventHubProducerClient" /> class.
 228        /// </summary>
 229        ///
 230        /// <param name="connection">The <see cref="EventHubConnection" /> connection to use for communication with the 
 231        /// <param name="clientOptions">A set of options to apply when configuring the producer.</param>
 232        ///
 54233        public EventHubProducerClient(EventHubConnection connection,
 54234                                      EventHubProducerClientOptions clientOptions = default)
 235        {
 54236            Argument.AssertNotNull(connection, nameof(connection));
 52237            clientOptions = clientOptions?.Clone() ?? new EventHubProducerClientOptions();
 238
 52239            OwnsConnection = false;
 52240            Connection = connection;
 52241            RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
 52242            PartitionProducerPool = new TransportProducerPool(Connection, RetryPolicy);
 52243        }
 244
 245        /// <summary>
 246        ///   Initializes a new instance of the <see cref="EventHubProducerClient" /> class.
 247        /// </summary>
 248        ///
 249        /// <param name="connection">The connection to use as the basis for delegation of client-type operations.</param
 250        /// <param name="transportProducer">The transport producer instance to use as the basis for service communicatio
 251        /// <param name="partitionProducerPool">A <see cref="TransportProducerPool" /> used to manage a set of partition
 252        ///
 253        /// <remarks>
 254        ///   This constructor is intended to be used internally for functional
 255        ///   testing only.
 256        /// </remarks>
 257        ///
 50258        internal EventHubProducerClient(EventHubConnection connection,
 50259                                        TransportProducer transportProducer,
 50260                                        TransportProducerPool partitionProducerPool = default)
 261        {
 50262            Argument.AssertNotNull(connection, nameof(connection));
 50263            Argument.AssertNotNull(transportProducer, nameof(transportProducer));
 264
 50265            OwnsConnection = false;
 50266            Connection = connection;
 50267            RetryPolicy = new EventHubsRetryOptions().ToRetryPolicy();
 50268            PartitionProducerPool = partitionProducerPool ?? new TransportProducerPool(Connection, RetryPolicy, eventHub
 50269        }
 270
 271        /// <summary>
 272        ///   Initializes a new instance of the <see cref="EventHubProducerClient" /> class.
 273        /// </summary>
 274        ///
 4275        protected EventHubProducerClient()
 276        {
 4277            OwnsConnection = false;
 4278        }
 279
 280        /// <summary>
 281        ///   Retrieves information about the Event Hub that the connection is associated with, including
 282        ///   the number of partitions present and their identifiers.
 283        /// </summary>
 284        ///
 285        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 286        ///
 287        /// <returns>The set of information for the Event Hub that this client is associated with.</returns>
 288        ///
 289        public virtual async Task<EventHubProperties> GetEventHubPropertiesAsync(CancellationToken cancellationToken = d
 290        {
 2291            Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient));
 2292            return await Connection.GetPropertiesAsync(RetryPolicy, cancellationToken).ConfigureAwait(false);
 2293        }
 294
 295        /// <summary>
 296        ///   Retrieves the set of identifiers for the partitions of an Event Hub.
 297        /// </summary>
 298        ///
 299        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 300        ///
 301        /// <returns>The set of identifiers for the partitions within the Event Hub that this client is associated with.
 302        ///
 303        /// <remarks>
 304        ///   This method is synonymous with invoking <see cref="GetEventHubPropertiesAsync(CancellationToken)" /> and r
 305        ///   property that is returned. It is offered as a convenience for quick access to the set of partition identif
 306        ///   No new or extended information is presented.
 307        /// </remarks>
 308        ///
 309        public virtual async Task<string[]> GetPartitionIdsAsync(CancellationToken cancellationToken = default)
 310        {
 311
 2312            Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient));
 2313            return await Connection.GetPartitionIdsAsync(RetryPolicy, cancellationToken).ConfigureAwait(false);
 2314        }
 315
 316        /// <summary>
 317        ///   Retrieves information about a specific partition for an Event Hub, including elements that describe the av
 318        ///   events in the partition event stream.
 319        /// </summary>
 320        ///
 321        /// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
 322        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 323        ///
 324        /// <returns>The set of information for the requested partition under the Event Hub this client is associated wi
 325        ///
 326        public virtual async Task<PartitionProperties> GetPartitionPropertiesAsync(string partitionId,
 327                                                                                   CancellationToken cancellationToken =
 328        {
 2329            Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient));
 2330            return await Connection.GetPartitionPropertiesAsync(partitionId, RetryPolicy, cancellationToken).ConfigureAw
 2331        }
 332
 333        /// <summary>
 334        ///   Sends an event to the associated Event Hub using a batched approach.  If the size of the event exceeds the
 335        ///   maximum size of a single batch, an exception will be triggered and the send will fail.
 336        /// </summary>
 337        ///
 338        /// <param name="eventData">The event data to send.</param>
 339        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 340        ///
 341        /// <returns>A task to be resolved on when the operation has completed.</returns>
 342        ///
 343        /// <seealso cref="SendAsync(EventData, SendEventOptions, CancellationToken)" />
 344        /// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
 345        /// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
 346        /// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
 347        ///
 348        internal virtual async Task SendAsync(EventData eventData,
 349                                              CancellationToken cancellationToken = default)
 350        {
 6351            Argument.AssertNotNull(eventData, nameof(eventData));
 4352            await SendAsync(new[] { eventData }, null, cancellationToken).ConfigureAwait(false);
 4353        }
 354
 355        /// <summary>
 356        ///   Sends an event to the associated Event Hub using a batched approach.  If the size of the event exceeds the
 357        ///   maximum size of a single batch, an exception will be triggered and the send will fail.
 358        /// </summary>
 359        ///
 360        /// <param name="eventData">The event data to send.</param>
 361        /// <param name="options">The set of options to consider when sending this batch.</param>
 362        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 363        ///
 364        /// <returns>A task to be resolved on when the operation has completed.</returns>
 365        ///
 366        /// <seealso cref="SendAsync(EventData, CancellationToken)" />
 367        /// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
 368        /// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
 369        /// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
 370        ///
 371        internal virtual async Task SendAsync(EventData eventData,
 372                                              SendEventOptions options,
 373                                              CancellationToken cancellationToken = default)
 374        {
 4375            Argument.AssertNotNull(eventData, nameof(eventData));
 2376            await SendAsync(new[] { eventData }, options, cancellationToken).ConfigureAwait(false);
 2377        }
 378
 379        /// <summary>
 380        ///   Sends a set of events to the associated Event Hub using a batched approach.  Because the batch is implicit
 381        ///   validated until this method is invoked.  The call will fail if the size of the specified set of events exc
 382        /// </summary>
 383        ///
 384        /// <param name="eventBatch">The set of event data to send.</param>
 385        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 386        ///
 387        /// <returns>A task to be resolved on when the operation has completed.</returns>
 388        ///
 389        /// <exception cref="EventHubsException">
 390        ///   Occurs when the set of events exceeds the maximum size allowed in a single batch, as determined by the Eve
 391        ///   <see cref="EventHubsException.FailureReason.MessageSizeExceeded"/> in this case.
 392        /// </exception>
 393        ///
 394        /// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
 395        /// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
 396        /// <seealso cref="CreateBatchAsync(CancellationToken)" />
 397        ///
 398        public virtual async Task SendAsync(IEnumerable<EventData> eventBatch,
 8399                                            CancellationToken cancellationToken = default) => await SendAsync(eventBatch
 400
 401        /// <summary>
 402        ///   Sends a set of events to the associated Event Hub using a batched approach.  Because the batch is implicit
 403        ///   validated until this method is invoked.  The call will fail if the size of the specified set of events exc
 404        /// </summary>
 405        ///
 406        /// <param name="eventBatch">The set of event data to send.</param>
 407        /// <param name="options">The set of options to consider when sending this batch.</param>
 408        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 409        ///
 410        /// <returns>A task to be resolved on when the operation has completed.</returns>
 411        ///
 412        /// <exception cref="EventHubsException">
 413        ///   Occurs when the set of events exceeds the maximum size allowed in a single batch, as determined by the Eve
 414        ///   <see cref="EventHubsException.FailureReason.MessageSizeExceeded"/> in this case.
 415        /// </exception>
 416        ///
 417        /// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
 418        /// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
 419        /// <seealso cref="CreateBatchAsync(CreateBatchOptions, CancellationToken)" />
 420        ///
 421        public virtual async Task SendAsync(IEnumerable<EventData> eventBatch,
 422                                            SendEventOptions options,
 423                                            CancellationToken cancellationToken = default)
 424        {
 36425            options ??= DefaultSendOptions;
 426
 36427            Argument.AssertNotNull(eventBatch, nameof(eventBatch));
 32428            AssertSinglePartitionReference(options.PartitionId, options.PartitionKey);
 429
 30430            int attempts = 0;
 431
 30432            eventBatch = (eventBatch as IList<EventData>) ?? eventBatch.ToList();
 30433            InstrumentMessages(eventBatch);
 434
 30435            var diagnosticIdentifiers = new List<string>();
 436
 84437            foreach (var eventData in eventBatch)
 438            {
 12439                if (EventDataInstrumentation.TryExtractDiagnosticId(eventData, out var identifier))
 440                {
 10441                    diagnosticIdentifiers.Add(identifier);
 442                }
 443            }
 444
 30445            using DiagnosticScope scope = CreateDiagnosticScope(diagnosticIdentifiers);
 446
 30447            var pooledProducer = PartitionProducerPool.GetPooledProducer(options.PartitionId, PartitionProducerLifespan)
 448
 34449            while (!cancellationToken.IsCancellationRequested)
 450            {
 451                try
 452                {
 32453                    await using var _ = pooledProducer.ConfigureAwait(false);
 32454                    await pooledProducer.TransportProducer.SendAsync(eventBatch, options, cancellationToken).ConfigureAw
 455
 18456                    return;
 0457                }
 458                catch (EventHubsException eventHubException)
 12459                    when (eventHubException.Reason == EventHubsException.FailureReason.ClientClosed && ShouldRecreatePro
 460                {
 6461                    if (++attempts >= MaximumCreateProducerAttempts)
 462                    {
 2463                        scope.Failed(eventHubException);
 2464                        throw;
 465                    }
 466
 4467                    pooledProducer = PartitionProducerPool.GetPooledProducer(options.PartitionId, PartitionProducerLifes
 4468                }
 8469                catch (Exception ex)
 470                {
 8471                    scope.Failed(ex);
 8472                    throw;
 473                }
 474            }
 475
 2476            throw new TaskCanceledException();
 18477        }
 478
 479        /// <summary>
 480        ///   Sends a set of events to the associated Event Hub using a batched approach.
 481        /// </summary>
 482        ///
 483        /// <param name="eventBatch">The set of event data to send. A batch may be created using <see cref="CreateBatchA
 484        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 485        ///
 486        /// <returns>A task to be resolved on when the operation has completed.</returns>
 487        ///
 488        /// <seealso cref="CreateBatchAsync(CancellationToken)" />
 489        ///
 490        public virtual async Task SendAsync(EventDataBatch eventBatch,
 491                                            CancellationToken cancellationToken = default)
 492        {
 40493            Argument.AssertNotNull(eventBatch, nameof(eventBatch));
 38494            AssertSinglePartitionReference(eventBatch.SendOptions.PartitionId, eventBatch.SendOptions.PartitionKey);
 495
 36496            using DiagnosticScope scope = CreateDiagnosticScope(eventBatch.GetEventDiagnosticIdentifiers());
 497
 36498            var attempts = 0;
 36499            var pooledProducer = PartitionProducerPool.GetPooledProducer(eventBatch.SendOptions.PartitionId, PartitionPr
 500
 40501            while (!cancellationToken.IsCancellationRequested)
 502            {
 503                try
 504                {
 38505                    await using var _ = pooledProducer.ConfigureAwait(false);
 506
 38507                    eventBatch.Lock();
 38508                    await pooledProducer.TransportProducer.SendAsync(eventBatch, cancellationToken).ConfigureAwait(false
 509
 22510                    return;
 0511                }
 512                catch (EventHubsException eventHubException)
 12513                    when (eventHubException.Reason == EventHubsException.FailureReason.ClientClosed && ShouldRecreatePro
 514                {
 6515                    if (++attempts >= MaximumCreateProducerAttempts)
 516                    {
 2517                        scope.Failed(eventHubException);
 2518                        throw;
 519                    }
 520
 4521                    pooledProducer = PartitionProducerPool.GetPooledProducer(eventBatch.SendOptions.PartitionId, Partiti
 4522                }
 10523                catch (Exception ex)
 524                {
 10525                    scope.Failed(ex);
 10526                    throw;
 527                }
 528                finally
 529                {
 38530                    eventBatch.Unlock();
 531                }
 532            }
 533
 2534            throw new TaskCanceledException();
 22535        }
 536
 537        /// <summary>
 538        ///   Creates a size-constraint batch to which <see cref="EventData" /> may be added using a try-based pattern. 
 539        ///   exceed the maximum allowable size of the batch, the batch will not allow adding the event and signal that 
 540        ///   return value.
 541        ///
 542        ///   Because events that would violate the size constraint cannot be added, publishing a batch will not trigger
 543        ///   attempting to send the events to the Event Hubs service.
 544        /// </summary>
 545        ///
 546        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 547        ///
 548        /// <returns>An <see cref="EventDataBatch" /> with the default batch options.</returns>
 549        ///
 550        /// <seealso cref="CreateBatchAsync(CreateBatchOptions, CancellationToken)" />
 551        /// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
 552        ///
 8553        public virtual async ValueTask<EventDataBatch> CreateBatchAsync(CancellationToken cancellationToken = default) =
 554
 555        /// <summary>
 556        ///   Creates a size-constraint batch to which <see cref="EventData" /> may be added using a try-based pattern. 
 557        ///   exceed the maximum allowable size of the batch, the batch will not allow adding the event and signal that 
 558        ///   return value.
 559        ///
 560        ///   Because events that would violate the size constraint cannot be added, publishing a batch will not trigger
 561        ///   attempting to send the events to the Event Hubs service.
 562        /// </summary>
 563        ///
 564        /// <param name="options">The set of options to consider when creating this batch.</param>
 565        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 566        ///
 567        /// <returns>An <see cref="EventDataBatch" /> with the requested <paramref name="options"/>.</returns>
 568        ///
 569        /// <seealso cref="CreateBatchAsync(CancellationToken)" />
 570        /// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
 571        ///
 572        public virtual async ValueTask<EventDataBatch> CreateBatchAsync(CreateBatchOptions options,
 573                                                                        CancellationToken cancellationToken = default)
 574        {
 14575            options = options?.Clone() ?? new CreateBatchOptions();
 14576            AssertSinglePartitionReference(options.PartitionId, options.PartitionKey);
 577
 12578            TransportEventBatch transportBatch = await PartitionProducerPool.EventHubProducer.CreateBatchAsync(options, 
 12579            return new EventDataBatch(transportBatch, FullyQualifiedNamespace, EventHubName, options);
 12580        }
 581
 582        /// <summary>
 583        ///   Closes the producer.
 584        /// </summary>
 585        ///
 586        /// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request 
 587        ///
 588        /// <returns>A task to be resolved on when the operation has completed.</returns>
 589        ///
 590        public virtual async Task CloseAsync(CancellationToken cancellationToken = default)
 591        {
 8592            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 593
 8594            if (IsClosed)
 595            {
 0596                return;
 597            }
 598
 8599            IsClosed = true;
 600
 8601            var identifier = GetHashCode().ToString(CultureInfo.InvariantCulture);
 8602            EventHubsEventSource.Log.ClientCloseStart(nameof(EventHubProducerClient), EventHubName, identifier);
 603
 604            // Attempt to close the pool of producers.  In the event that an exception is encountered,
 605            // it should not impact the attempt to close the connection, assuming ownership.
 606
 8607            var transportProducerPoolException = default(Exception);
 608
 609            try
 610            {
 8611                await PartitionProducerPool.CloseAsync(cancellationToken).ConfigureAwait(false);
 6612            }
 2613            catch (Exception ex)
 614            {
 2615                EventHubsEventSource.Log.ClientCloseError(nameof(EventHubProducerClient), EventHubName, identifier, ex.M
 2616                transportProducerPoolException = ex;
 2617            }
 618
 619            // An exception when closing the connection supersedes one observed when closing the
 620            // individual transport clients.
 621
 622            try
 623            {
 8624                if (OwnsConnection)
 625                {
 2626                    await Connection.CloseAsync().ConfigureAwait(false);
 627                }
 8628            }
 0629            catch (Exception ex)
 630            {
 0631                EventHubsEventSource.Log.ClientCloseError(nameof(EventHubProducerClient), EventHubName, identifier, ex.M
 0632                throw;
 633            }
 634            finally
 635            {
 8636                EventHubsEventSource.Log.ClientCloseComplete(nameof(EventHubProducerClient), EventHubName, identifier);
 637            }
 638
 639            // If there was an active exception pending from closing the
 640            // transport producer pool, surface it now.
 641
 8642            if (transportProducerPoolException != default)
 643            {
 2644                ExceptionDispatchInfo.Capture(transportProducerPoolException).Throw();
 645            }
 6646        }
 647
 648        /// <summary>
 649        ///   Performs the task needed to clean up resources used by the <see cref="EventHubProducerClient" />,
 650        ///   including ensuring that the client itself has been closed.
 651        /// </summary>
 652        ///
 653        /// <returns>A task to be resolved on when the operation has completed.</returns>
 654        ///
 655        [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju
 0656        public virtual async ValueTask DisposeAsync() => await CloseAsync().ConfigureAwait(false);
 657
 658        /// <summary>
 659        ///   Determines whether the specified <see cref="System.Object" /> is equal to this instance.
 660        /// </summary>
 661        ///
 662        /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param>
 663        ///
 664        /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c>
 665        ///
 666        [EditorBrowsable(EditorBrowsableState.Never)]
 0667        public override bool Equals(object obj) => base.Equals(obj);
 668
 669        /// <summary>
 670        ///   Returns a hash code for this instance.
 671        /// </summary>
 672        ///
 673        /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha
 674        ///
 675        [EditorBrowsable(EditorBrowsableState.Never)]
 8676        public override int GetHashCode() => base.GetHashCode();
 677
 678        /// <summary>
 679        ///   Converts the instance to string representation.
 680        /// </summary>
 681        ///
 682        /// <returns>A <see cref="System.String" /> that represents this instance.</returns>
 683        ///
 684        [EditorBrowsable(EditorBrowsableState.Never)]
 0685        public override string ToString() => base.ToString();
 686
 687        /// <summary>
 688        ///   Creates and configures a diagnostics scope to be used for instrumenting
 689        ///   events.
 690        /// </summary>
 691        ///
 692        /// <param name="diagnosticIdentifiers">The set of diagnostic identifiers to which the scope will be linked.</pa
 693        ///
 694        /// <returns>The requested <see cref="DiagnosticScope" />.</returns>
 695        ///
 696        private DiagnosticScope CreateDiagnosticScope(IEnumerable<string> diagnosticIdentifiers)
 697        {
 66698            DiagnosticScope scope = EventDataInstrumentation.ScopeFactory.CreateScope(DiagnosticProperty.ProducerActivit
 66699            scope.AddAttribute(DiagnosticProperty.KindAttribute, DiagnosticProperty.ClientKind);
 66700            scope.AddAttribute(DiagnosticProperty.ServiceContextAttribute, DiagnosticProperty.EventHubsServiceContext);
 66701            scope.AddAttribute(DiagnosticProperty.EventHubAttribute, EventHubName);
 66702            scope.AddAttribute(DiagnosticProperty.EndpointAttribute, FullyQualifiedNamespace);
 703
 66704            if (scope.IsEnabled)
 705            {
 40706                foreach (var identifier in diagnosticIdentifiers)
 707                {
 12708                    scope.AddLink(identifier);
 709                }
 710            }
 711
 66712            scope.Start();
 713
 66714            return scope;
 715        }
 716
 717        /// <summary>
 718        ///   Performs the actions needed to instrument a set of events.
 719        /// </summary>
 720        ///
 721        /// <param name="events">The events to instrument.</param>
 722        ///
 723        private void InstrumentMessages(IEnumerable<EventData> events)
 724        {
 84725            foreach (EventData eventData in events)
 726            {
 12727                EventDataInstrumentation.InstrumentEvent(eventData, FullyQualifiedNamespace, EventHubName);
 728            }
 30729        }
 730
 731        /// <summary>
 732        ///   Ensures that no more than a single partition reference is active.
 733        /// </summary>
 734        ///
 735        /// <param name="partitionId">The identifier of the partition to which the producer is bound.</param>
 736        /// <param name="partitionKey">The hash key for partition routing that was requested for a publish operation.</p
 737        ///
 738        private static void AssertSinglePartitionReference(string partitionId,
 739                                                           string partitionKey)
 740        {
 84741            if ((!string.IsNullOrEmpty(partitionId)) && (!string.IsNullOrEmpty(partitionKey)))
 742            {
 6743                throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotSendWithPa
 744            }
 78745        }
 746
 747        /// <summary>
 748        ///   Checks if the <see cref="TransportProducer" /> returned by the <see cref="TransportProducerPool" /> is sti
 749        /// </summary>
 750        ///
 751        /// <param name="producer">The <see cref="TransportProducer" /> that has being checked.</param>
 752        /// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
 753        ///
 754        /// <returns><c>true</c> if the specified <see cref="TransportProducer" /> is closed; otherwise, <c>false</c>.</
 755        ///
 756        private bool ShouldRecreateProducer(TransportProducer producer,
 24757                                            string partitionId) => !string.IsNullOrEmpty(partitionId)
 24758                                                                   && producer.IsClosed
 24759                                                                   && !IsClosed
 24760                                                                   && !Connection.IsClosed;
 761    }
 762}