< Summary

Class:Azure.Messaging.EventHubs.Amqp.AmqpProducer
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpProducer.cs
Covered lines:107
Uncovered lines:26
Coverable lines:133
Total lines:489
Line coverage:80.4% (107 of 133)
Covered branches:44
Total branches:66
Branch coverage:66.6% (44 of 66)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_IsClosed()-100%100%
get_EventHubName()-100%100%
get_PartitionId()-100%100%
get_RetryPolicy()-100%100%
get_MessageConverter()-100%100%
get_ConnectionScope()-100%100%
get_SendLink()-100%100%
get_MaximumMessageSize()-100%100%
.ctor(...)-100%50%
SendAsync()-100%50%
SendAsync()-100%50%
CreateBatchAsync()-92.31%86.36%
CloseAsync()-83.33%50%
SendAsync()-71.05%63.64%
CreateLinkAndEnsureProducerStateAsync()-0%0%
UseMinimum(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpProducer.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Globalization;
 7using System.Runtime.ExceptionServices;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using Azure.Core;
 11using Azure.Core.Diagnostics;
 12using Azure.Messaging.EventHubs.Core;
 13using Azure.Messaging.EventHubs.Diagnostics;
 14using Azure.Messaging.EventHubs.Producer;
 15using Microsoft.Azure.Amqp;
 16using Microsoft.Azure.Amqp.Framing;
 17
 18namespace Azure.Messaging.EventHubs.Amqp
 19{
 20    /// <summary>
 21    ///   A transport producer abstraction responsible for brokering operations for AMQP-based connections.
 22    ///   It is intended that the public <see cref="EventHubProducerClient" /> make use of an instance
 23    ///   via containment and delegate operations to it.
 24    /// </summary>
 25    ///
 26    /// <seealso cref="Azure.Messaging.EventHubs.Core.TransportProducer" />
 27    ///
 28    internal class AmqpProducer : TransportProducer
 29    {
 30        /// <summary>Indicates whether or not this instance has been closed.</summary>
 31        private volatile bool _closed = false;
 32
 33        /// <summary>The count of send operations performed by this instance; this is used to tag deliveries for the AMQ
 34        private int _deliveryCount = 0;
 35
 36        /// <summary>
 37        ///   Indicates whether or not this producer has been closed.
 38        /// </summary>
 39        ///
 40        /// <value>
 41        ///   <c>true</c> if the producer is closed; otherwise, <c>false</c>.
 42        /// </value>
 43        ///
 644        public override bool IsClosed => _closed;
 45
 46        /// <summary>
 47        ///   The name of the Event Hub to which the producer is bound.
 48        /// </summary>
 49        ///
 30450        private string EventHubName { get; }
 51
 52        /// <summary>
 53        ///   The identifier of the Event Hub partition that this producer is bound to, if any.  If bound, events will
 54        ///   be published only to this partition.
 55        /// </summary>
 56        ///
 57        /// <value>The partition to which the producer is bound; if unbound, <c>null</c>.</value>
 58        ///
 3659        private string PartitionId { get; }
 60
 61        /// <summary>
 62        ///   The policy to use for determining retry behavior for when an operation fails.
 63        /// </summary>
 64        ///
 23665        private EventHubsRetryPolicy RetryPolicy { get; }
 66
 67        /// <summary>
 68        ///   The converter to use for translating between AMQP messages and client library
 69        ///   types.
 70        /// </summary>
 71        ///
 10672        private AmqpMessageConverter MessageConverter { get; }
 73
 74        /// <summary>
 75        ///   The AMQP connection scope responsible for managing transport constructs for this instance.
 76        /// </summary>
 77        ///
 18478        private AmqpConnectionScope ConnectionScope { get; }
 79
 80        /// <summary>
 81        ///   The AMQP link intended for use with publishing operations.
 82        /// </summary>
 83        ///
 15484        private FaultTolerantAmqpObject<SendingAmqpLink> SendLink { get; }
 85
 86        /// <summary>
 87        ///   The maximum size of an AMQP message allowed by the associated
 88        ///   producer link.
 89        /// </summary>
 90        ///
 91        /// <value>The maximum message size, in bytes.</value>
 92        ///
 15093        private long? MaximumMessageSize { get; set; }
 94
 95        /// <summary>
 96        ///   Initializes a new instance of the <see cref="AmqpProducer"/> class.
 97        /// </summary>
 98        ///
 99        /// <param name="eventHubName">The name of the Event Hub to which events will be published.</param>
 100        /// <param name="partitionId">The identifier of the Event Hub partition to which it is bound; if unbound, <c>nul
 101        /// <param name="connectionScope">The AMQP connection context for operations.</param>
 102        /// <param name="messageConverter">The converter to use for translating between AMQP messages and client types.<
 103        /// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
 104        ///
 105        /// <remarks>
 106        ///   As an internal type, this class performs only basic sanity checks against its arguments.  It
 107        ///   is assumed that callers are trusted and have performed deep validation.
 108        ///
 109        ///   Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
 110        ///   creation of clones or otherwise protecting the parameters is assumed to be the purview of the
 111        ///   caller.
 112        /// </remarks>
 113        ///
 122114        public AmqpProducer(string eventHubName,
 122115                            string partitionId,
 122116                            AmqpConnectionScope connectionScope,
 122117                            AmqpMessageConverter messageConverter,
 122118                            EventHubsRetryPolicy retryPolicy)
 119        {
 122120            Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
 118121            Argument.AssertNotNull(connectionScope, nameof(connectionScope));
 116122            Argument.AssertNotNull(messageConverter, nameof(messageConverter));
 116123            Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
 124
 114125            EventHubName = eventHubName;
 114126            PartitionId = partitionId;
 114127            RetryPolicy = retryPolicy;
 114128            ConnectionScope = connectionScope;
 114129            MessageConverter = messageConverter;
 130
 114131            SendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(
 244132                timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, timeout, CancellationToken.None),
 114133                link =>
 114134                {
 122135                    link.Session?.SafeClose();
 122136                    link.SafeClose();
 122137                });
 114138        }
 139
 140        /// <summary>
 141        ///   Sends a set of events to the associated Event Hub using a batched approach.  If the size of events exceed 
 142        ///   maximum size of a single batch, an exception will be triggered and the send will fail.
 143        /// </summary>
 144        ///
 145        /// <param name="events">The set of event data to send.</param>
 146        /// <param name="sendOptions">The set of options to consider when sending this batch.</param>
 147        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 148        ///
 149        public override async Task SendAsync(IEnumerable<EventData> events,
 150                                             SendEventOptions sendOptions,
 151                                             CancellationToken cancellationToken)
 152        {
 30153            Argument.AssertNotNull(events, nameof(events));
 28154            Argument.AssertNotClosed(_closed, nameof(AmqpProducer));
 155
 40156            AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(events, sendOptions?.PartitionKey);
 26157            await SendAsync(messageFactory, sendOptions?.PartitionKey, cancellationToken).ConfigureAwait(false);
 8158        }
 159
 160        /// <summary>
 161        ///   Sends a set of events to the associated Event Hub using a batched approach.
 162        /// </summary>
 163        ///
 164        /// <param name="eventBatch">The event batch to send.</param>
 165        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 166        ///
 167        /// <returns>A task to be resolved on when the operation has completed.</returns>
 168        ///
 169        /// <remarks>
 170        ///   The caller is assumed to retain ownership of the <paramref name="eventBatch" /> and
 171        ///   is responsible for managing its lifespan, including disposal.
 172        /// </remarks>
 173        ///
 174        public override async Task SendAsync(EventDataBatch eventBatch,
 175                                             CancellationToken cancellationToken)
 176        {
 32177            Argument.AssertNotNull(eventBatch, nameof(eventBatch));
 30178            Argument.AssertNotClosed(_closed, nameof(AmqpProducer));
 179
 180            // Make a defensive copy of the messages in the batch.
 181
 40182            AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(eventBatch.AsEnumerable<EventData>(),
 28183            await SendAsync(messageFactory, eventBatch.SendOptions?.PartitionKey, cancellationToken).ConfigureAwait(fals
 10184        }
 185
 186        /// <summary>
 187        ///   Creates a size-constraint batch to which <see cref="EventData" /> may be added using a try-based pattern. 
 188        ///   exceed the maximum allowable size of the batch, the batch will not allow adding the event and signal that 
 189        ///   return value.
 190        ///
 191        ///   Because events that would violate the size constraint cannot be added, publishing a batch will not trigger
 192        ///   attempting to send the events to the Event Hubs service.
 193        /// </summary>
 194        ///
 195        /// <param name="options">The set of options to consider when creating this batch.</param>
 196        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 197        ///
 198        /// <returns>An <see cref="EventDataBatch" /> with the requested <paramref name="options"/>.</returns>
 199        ///
 200        public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatchOptions options,
 201                                                                              CancellationToken cancellationToken)
 202        {
 54203            Argument.AssertNotNull(options, nameof(options));
 52204            Argument.AssertNotClosed(_closed, nameof(AmqpProducer));
 205
 48206            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 207
 208            // Ensure that maximum message size has been determined; this depends on the underlying
 209            // AMQP link, so if not set, requesting the link will ensure that it is populated.
 210
 44211            if (!MaximumMessageSize.HasValue)
 212            {
 44213                var failedAttemptCount = 0;
 44214                var retryDelay = default(TimeSpan?);
 44215                var tryTimeout = RetryPolicy.CalculateTryTimeout(0);
 216
 62217                while ((!cancellationToken.IsCancellationRequested) && (!_closed))
 218                {
 219                    try
 220                    {
 62221                        await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)).Configur
 28222                        break;
 223                    }
 224                    catch (Exception ex)
 225                    {
 34226                        Exception activeEx = ex.TranslateServiceException(EventHubName);
 227
 228                        // Determine if there should be a retry for the next attempt; if so enforce the delay but do not
 229                        // Otherwise, bubble the exception.
 230
 34231                        ++failedAttemptCount;
 34232                        retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
 233
 34234                        if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellation
 235                        {
 18236                            await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
 18237                            tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
 238                        }
 16239                        else if (ex is AmqpException)
 240                        {
 0241                            ExceptionDispatchInfo.Capture(activeEx).Throw();
 242                        }
 243                        else
 244                        {
 16245                            throw;
 246                        }
 247                    }
 248                }
 249
 250                // If MaximumMessageSize has not been populated nor exception thrown
 251                // by this point, then cancellation has been requested.
 252
 28253                if (!MaximumMessageSize.HasValue)
 254                {
 0255                    throw new TaskCanceledException();
 256                }
 28257            }
 258
 259            // Ensure that there was a maximum size populated; if none was provided,
 260            // default to the maximum size allowed by the link.
 261
 28262            options.MaximumSizeInBytes ??= MaximumMessageSize;
 263
 28264            Argument.AssertInRange(options.MaximumSizeInBytes.Value, EventHubProducerClient.MinimumBatchSizeLimit, Maxim
 26265            return new AmqpEventBatch(MessageConverter, options);
 26266        }
 267
 268        /// <summary>
 269        ///   Closes the connection to the transport producer instance.
 270        /// </summary>
 271        ///
 272        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 273        ///
 274        public override async Task CloseAsync(CancellationToken cancellationToken)
 275        {
 14276            if (_closed)
 277            {
 0278                return;
 279            }
 280
 14281            _closed = true;
 282
 14283            var clientId = GetHashCode().ToString(CultureInfo.InvariantCulture);
 14284            var clientType = GetType().Name;
 285
 286            try
 287            {
 14288                EventHubsEventSource.Log.ClientCloseStart(clientType, EventHubName, clientId);
 14289                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 290
 12291                if (SendLink?.TryGetOpenedObject(out var _) == true)
 292                {
 0293                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0294                    await SendLink.CloseAsync().ConfigureAwait(false);
 295                }
 296
 12297                SendLink?.Dispose();
 12298            }
 2299            catch (Exception ex)
 300            {
 2301                _closed = false;
 2302                EventHubsEventSource.Log.ClientCloseError(clientType, EventHubName, clientId, ex.Message);
 303
 2304                throw;
 305            }
 306            finally
 307            {
 14308                EventHubsEventSource.Log.ClientCloseComplete(clientType, EventHubName, clientId);
 309            }
 12310        }
 311
 312        /// <summary>
 313        ///   Sends an AMQP message that contains a batch of events to the associated Event Hub. If the size of events e
 314        ///   maximum size of a single batch, an exception will be triggered and the send will fail.
 315        /// </summary>
 316        ///
 317        /// <param name="messageFactory">A factory which can be used to produce an AMQP message containing the batch of 
 318        /// <param name="partitionKey">The hashing key to use for influencing the partition to which events should be ro
 319        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 320        ///
 321        protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
 322                                               string partitionKey,
 323                                               CancellationToken cancellationToken)
 324        {
 36325            var failedAttemptCount = 0;
 36326            var logPartition = PartitionId ?? partitionKey;
 36327            var operationId = Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture);
 36328            var stopWatch = ValueStopwatch.StartNew();
 329
 330            TimeSpan? retryDelay;
 331            SendingAmqpLink link;
 332
 333            try
 334            {
 36335                var tryTimeout = RetryPolicy.CalculateTryTimeout(0);
 336
 72337                while (!cancellationToken.IsCancellationRequested)
 338                {
 339                    try
 340                    {
 68341                        using AmqpMessage batchMessage = messageFactory();
 342
 343                        // Creation of the link happens without explicit knowledge of the cancellation token
 344                        // used for this operation; validate the token state before attempting link creation and
 345                        // again after the operation completes to provide best efforts in respecting it.
 346
 68347                        EventHubsEventSource.Log.EventPublishStart(EventHubName, logPartition, operationId);
 68348                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 349
 68350                        link = await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)).C
 0351                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 352
 353                        // Validate that the batch of messages is not too large to send.  This is done after the link is
 354                        // that the maximum message size is known, as it is dictated by the service using the link.
 355
 0356                        if (batchMessage.SerializedMessageSize > MaximumMessageSize)
 357                        {
 0358                            throw new EventHubsException(EventHubName, string.Format(CultureInfo.CurrentCulture, Resourc
 359                        }
 360
 361                        // Attempt to send the message batch.
 362
 0363                        var deliveryTag = new ArraySegment<byte>(BitConverter.GetBytes(Interlocked.Increment(ref _delive
 0364                        var outcome = await link.SendMessageAsync(batchMessage, deliveryTag, AmqpConstants.NullBinary, t
 0365                        cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 366
 0367                        if (outcome.DescriptorCode != Accepted.Code)
 368                        {
 0369                            throw AmqpError.CreateExceptionForError((outcome as Rejected)?.Error, EventHubName);
 370                        }
 371
 372                        // The send operation should be considered successful; return to
 373                        // exit the retry loop.
 374
 0375                        return;
 376                    }
 377                    catch (Exception ex)
 378                    {
 68379                        Exception activeEx = ex.TranslateServiceException(EventHubName);
 380
 381                        // Determine if there should be a retry for the next attempt; if so enforce the delay but do not
 382                        // Otherwise, bubble the exception.
 383
 68384                        ++failedAttemptCount;
 68385                        retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
 386
 68387                        if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellation
 388                        {
 36389                            EventHubsEventSource.Log.EventPublishError(EventHubName, logPartition, operationId, activeEx
 36390                            await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
 391
 36392                            tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
 36393                            stopWatch = ValueStopwatch.StartNew();
 394                        }
 32395                        else if (ex is AmqpException)
 396                        {
 0397                            ExceptionDispatchInfo.Capture(activeEx).Throw();
 398                        }
 399                        else
 400                        {
 32401                            throw;
 402                        }
 403                    }
 404                }
 405
 406                // If no value has been returned nor exception thrown by this point,
 407                // then cancellation has been requested.
 408
 4409                throw new TaskCanceledException();
 410            }
 4411            catch (TaskCanceledException)
 412            {
 4413                throw;
 414            }
 32415            catch (Exception ex)
 416            {
 32417                EventHubsEventSource.Log.EventPublishError(EventHubName, logPartition, operationId, ex.Message);
 32418                throw;
 419            }
 420            finally
 421            {
 36422                EventHubsEventSource.Log.EventPublishComplete(EventHubName, logPartition, operationId, failedAttemptCoun
 423            }
 0424        }
 425
 426        /// <summary>
 427        ///   Creates the AMQP link to be used for producer-related operations and ensures
 428        ///   that the corresponding state for the producer has been updated based on the link
 429        ///   configuration.
 430        /// </summary>
 431        ///
 432        /// <param name="partitionId">The identifier of the Event Hub partition to which it is bound; if unbound, <c>nul
 433        /// <param name="timeout">The timeout to apply when creating the link.</param>
 434        /// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
 435        ///
 436        /// <returns>The AMQP link to use for producer-related operations.</returns>
 437        ///
 438        /// <remarks>
 439        ///   This method will modify class-level state, setting those attributes that depend on the AMQP
 440        ///   link configuration.  There exists a benign race condition in doing so, as there may be multiple
 441        ///   concurrent callers.  In this case, the attributes may be set multiple times but the resulting
 442        ///   value will be the same.
 443        /// </remarks>
 444        ///
 445        protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAsync(string partitionId,
 446                                                                                            TimeSpan timeout,
 447                                                                                            CancellationToken cancellati
 448        {
 0449            var link = default(SendingAmqpLink);
 450
 451            try
 452            {
 0453                link = await ConnectionScope.OpenProducerLinkAsync(partitionId, timeout, cancellationToken).ConfigureAwa
 454
 0455                if (!MaximumMessageSize.HasValue)
 456                {
 457                    // This delay is necessary to prevent the link from causing issues for subsequent
 458                    // operations after creating a batch.  Without it, operations using the link consistently
 459                    // timeout.  The length of the delay does not appear significant, just the act of introducing
 460                    // an asynchronous delay.
 461                    //
 462                    // For consistency the value used by the legacy Event Hubs client has been brought forward and
 463                    // used here.
 464
 0465                    await Task.Delay(15, cancellationToken).ConfigureAwait(false);
 0466                    MaximumMessageSize = (long)link.Settings.MaxMessageSize;
 467                }
 0468            }
 469            catch (Exception ex)
 470            {
 0471               ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw
 0472            }
 473
 0474            return link;
 0475        }
 476
 477        /// <summary>
 478        ///   Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
 479        /// </summary>
 480        ///
 481        /// <param name="firstOption">The first option to consider.</param>
 482        /// <param name="secondOption">The second option to consider.</param>
 483        ///
 484        /// <returns>The smaller of the two specified intervals.</returns>
 485        ///
 486        private static TimeSpan UseMinimum(TimeSpan firstOption,
 130487                                           TimeSpan secondOption) => (firstOption < secondOption) ? firstOption : second
 488    }
 489}