| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.Globalization; |
| | 7 | | using System.Runtime.ExceptionServices; |
| | 8 | | using System.Threading; |
| | 9 | | using System.Threading.Tasks; |
| | 10 | | using Azure.Core; |
| | 11 | | using Azure.Core.Diagnostics; |
| | 12 | | using Azure.Messaging.EventHubs.Core; |
| | 13 | | using Azure.Messaging.EventHubs.Diagnostics; |
| | 14 | | using Azure.Messaging.EventHubs.Producer; |
| | 15 | | using Microsoft.Azure.Amqp; |
| | 16 | | using Microsoft.Azure.Amqp.Framing; |
| | 17 | |
|
| | 18 | | namespace Azure.Messaging.EventHubs.Amqp |
| | 19 | | { |
| | 20 | | /// <summary> |
| | 21 | | /// A transport producer abstraction responsible for brokering operations for AMQP-based connections. |
| | 22 | | /// It is intended that the public <see cref="EventHubProducerClient" /> make use of an instance |
| | 23 | | /// via containment and delegate operations to it. |
| | 24 | | /// </summary> |
| | 25 | | /// |
| | 26 | | /// <seealso cref="Azure.Messaging.EventHubs.Core.TransportProducer" /> |
| | 27 | | /// |
| | 28 | | internal class AmqpProducer : TransportProducer |
| | 29 | | { |
| | 30 | | /// <summary>Indicates whether or not this instance has been closed.</summary> |
| | 31 | | private volatile bool _closed = false; |
| | 32 | |
|
| | 33 | | /// <summary>The count of send operations performed by this instance; this is used to tag deliveries for the AMQ |
| | 34 | | private int _deliveryCount = 0; |
| | 35 | |
|
| | 36 | | /// <summary> |
| | 37 | | /// Indicates whether or not this producer has been closed. |
| | 38 | | /// </summary> |
| | 39 | | /// |
| | 40 | | /// <value> |
| | 41 | | /// <c>true</c> if the producer is closed; otherwise, <c>false</c>. |
| | 42 | | /// </value> |
| | 43 | | /// |
| 6 | 44 | | public override bool IsClosed => _closed; |
| | 45 | |
|
| | 46 | | /// <summary> |
| | 47 | | /// The name of the Event Hub to which the producer is bound. |
| | 48 | | /// </summary> |
| | 49 | | /// |
| 304 | 50 | | private string EventHubName { get; } |
| | 51 | |
|
| | 52 | | /// <summary> |
| | 53 | | /// The identifier of the Event Hub partition that this producer is bound to, if any. If bound, events will |
| | 54 | | /// be published only to this partition. |
| | 55 | | /// </summary> |
| | 56 | | /// |
| | 57 | | /// <value>The partition to which the producer is bound; if unbound, <c>null</c>.</value> |
| | 58 | | /// |
| 36 | 59 | | private string PartitionId { get; } |
| | 60 | |
|
| | 61 | | /// <summary> |
| | 62 | | /// The policy to use for determining retry behavior for when an operation fails. |
| | 63 | | /// </summary> |
| | 64 | | /// |
| 236 | 65 | | private EventHubsRetryPolicy RetryPolicy { get; } |
| | 66 | |
|
| | 67 | | /// <summary> |
| | 68 | | /// The converter to use for translating between AMQP messages and client library |
| | 69 | | /// types. |
| | 70 | | /// </summary> |
| | 71 | | /// |
| 106 | 72 | | private AmqpMessageConverter MessageConverter { get; } |
| | 73 | |
|
| | 74 | | /// <summary> |
| | 75 | | /// The AMQP connection scope responsible for managing transport constructs for this instance. |
| | 76 | | /// </summary> |
| | 77 | | /// |
| 184 | 78 | | private AmqpConnectionScope ConnectionScope { get; } |
| | 79 | |
|
| | 80 | | /// <summary> |
| | 81 | | /// The AMQP link intended for use with publishing operations. |
| | 82 | | /// </summary> |
| | 83 | | /// |
| 154 | 84 | | private FaultTolerantAmqpObject<SendingAmqpLink> SendLink { get; } |
| | 85 | |
|
| | 86 | | /// <summary> |
| | 87 | | /// The maximum size of an AMQP message allowed by the associated |
| | 88 | | /// producer link. |
| | 89 | | /// </summary> |
| | 90 | | /// |
| | 91 | | /// <value>The maximum message size, in bytes.</value> |
| | 92 | | /// |
| 150 | 93 | | private long? MaximumMessageSize { get; set; } |
| | 94 | |
|
| | 95 | | /// <summary> |
| | 96 | | /// Initializes a new instance of the <see cref="AmqpProducer"/> class. |
| | 97 | | /// </summary> |
| | 98 | | /// |
| | 99 | | /// <param name="eventHubName">The name of the Event Hub to which events will be published.</param> |
| | 100 | | /// <param name="partitionId">The identifier of the Event Hub partition to which it is bound; if unbound, <c>nul |
| | 101 | | /// <param name="connectionScope">The AMQP connection context for operations.</param> |
| | 102 | | /// <param name="messageConverter">The converter to use for translating between AMQP messages and client types.< |
| | 103 | | /// <param name="retryPolicy">The retry policy to consider when an operation fails.</param> |
| | 104 | | /// |
| | 105 | | /// <remarks> |
| | 106 | | /// As an internal type, this class performs only basic sanity checks against its arguments. It |
| | 107 | | /// is assumed that callers are trusted and have performed deep validation. |
| | 108 | | /// |
| | 109 | | /// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose; |
| | 110 | | /// creation of clones or otherwise protecting the parameters is assumed to be the purview of the |
| | 111 | | /// caller. |
| | 112 | | /// </remarks> |
| | 113 | | /// |
| 122 | 114 | | public AmqpProducer(string eventHubName, |
| 122 | 115 | | string partitionId, |
| 122 | 116 | | AmqpConnectionScope connectionScope, |
| 122 | 117 | | AmqpMessageConverter messageConverter, |
| 122 | 118 | | EventHubsRetryPolicy retryPolicy) |
| | 119 | | { |
| 122 | 120 | | Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); |
| 118 | 121 | | Argument.AssertNotNull(connectionScope, nameof(connectionScope)); |
| 116 | 122 | | Argument.AssertNotNull(messageConverter, nameof(messageConverter)); |
| 116 | 123 | | Argument.AssertNotNull(retryPolicy, nameof(retryPolicy)); |
| | 124 | |
|
| 114 | 125 | | EventHubName = eventHubName; |
| 114 | 126 | | PartitionId = partitionId; |
| 114 | 127 | | RetryPolicy = retryPolicy; |
| 114 | 128 | | ConnectionScope = connectionScope; |
| 114 | 129 | | MessageConverter = messageConverter; |
| | 130 | |
|
| 114 | 131 | | SendLink = new FaultTolerantAmqpObject<SendingAmqpLink>( |
| 244 | 132 | | timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, timeout, CancellationToken.None), |
| 114 | 133 | | link => |
| 114 | 134 | | { |
| 122 | 135 | | link.Session?.SafeClose(); |
| 122 | 136 | | link.SafeClose(); |
| 122 | 137 | | }); |
| 114 | 138 | | } |
| | 139 | |
|
| | 140 | | /// <summary> |
| | 141 | | /// Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed |
| | 142 | | /// maximum size of a single batch, an exception will be triggered and the send will fail. |
| | 143 | | /// </summary> |
| | 144 | | /// |
| | 145 | | /// <param name="events">The set of event data to send.</param> |
| | 146 | | /// <param name="sendOptions">The set of options to consider when sending this batch.</param> |
| | 147 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 148 | | /// |
| | 149 | | public override async Task SendAsync(IEnumerable<EventData> events, |
| | 150 | | SendEventOptions sendOptions, |
| | 151 | | CancellationToken cancellationToken) |
| | 152 | | { |
| 30 | 153 | | Argument.AssertNotNull(events, nameof(events)); |
| 28 | 154 | | Argument.AssertNotClosed(_closed, nameof(AmqpProducer)); |
| | 155 | |
|
| 40 | 156 | | AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(events, sendOptions?.PartitionKey); |
| 26 | 157 | | await SendAsync(messageFactory, sendOptions?.PartitionKey, cancellationToken).ConfigureAwait(false); |
| 8 | 158 | | } |
| | 159 | |
|
| | 160 | | /// <summary> |
| | 161 | | /// Sends a set of events to the associated Event Hub using a batched approach. |
| | 162 | | /// </summary> |
| | 163 | | /// |
| | 164 | | /// <param name="eventBatch">The event batch to send.</param> |
| | 165 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 166 | | /// |
| | 167 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 168 | | /// |
| | 169 | | /// <remarks> |
| | 170 | | /// The caller is assumed to retain ownership of the <paramref name="eventBatch" /> and |
| | 171 | | /// is responsible for managing its lifespan, including disposal. |
| | 172 | | /// </remarks> |
| | 173 | | /// |
| | 174 | | public override async Task SendAsync(EventDataBatch eventBatch, |
| | 175 | | CancellationToken cancellationToken) |
| | 176 | | { |
| 32 | 177 | | Argument.AssertNotNull(eventBatch, nameof(eventBatch)); |
| 30 | 178 | | Argument.AssertNotClosed(_closed, nameof(AmqpProducer)); |
| | 179 | |
|
| | 180 | | // Make a defensive copy of the messages in the batch. |
| | 181 | |
|
| 40 | 182 | | AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(eventBatch.AsEnumerable<EventData>(), |
| 28 | 183 | | await SendAsync(messageFactory, eventBatch.SendOptions?.PartitionKey, cancellationToken).ConfigureAwait(fals |
| 10 | 184 | | } |
| | 185 | |
|
| | 186 | | /// <summary> |
| | 187 | | /// Creates a size-constraint batch to which <see cref="EventData" /> may be added using a try-based pattern. |
| | 188 | | /// exceed the maximum allowable size of the batch, the batch will not allow adding the event and signal that |
| | 189 | | /// return value. |
| | 190 | | /// |
| | 191 | | /// Because events that would violate the size constraint cannot be added, publishing a batch will not trigger |
| | 192 | | /// attempting to send the events to the Event Hubs service. |
| | 193 | | /// </summary> |
| | 194 | | /// |
| | 195 | | /// <param name="options">The set of options to consider when creating this batch.</param> |
| | 196 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 197 | | /// |
| | 198 | | /// <returns>An <see cref="EventDataBatch" /> with the requested <paramref name="options"/>.</returns> |
| | 199 | | /// |
| | 200 | | public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatchOptions options, |
| | 201 | | CancellationToken cancellationToken) |
| | 202 | | { |
| 54 | 203 | | Argument.AssertNotNull(options, nameof(options)); |
| 52 | 204 | | Argument.AssertNotClosed(_closed, nameof(AmqpProducer)); |
| | 205 | |
|
| 48 | 206 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 207 | |
|
| | 208 | | // Ensure that maximum message size has been determined; this depends on the underlying |
| | 209 | | // AMQP link, so if not set, requesting the link will ensure that it is populated. |
| | 210 | |
|
| 44 | 211 | | if (!MaximumMessageSize.HasValue) |
| | 212 | | { |
| 44 | 213 | | var failedAttemptCount = 0; |
| 44 | 214 | | var retryDelay = default(TimeSpan?); |
| 44 | 215 | | var tryTimeout = RetryPolicy.CalculateTryTimeout(0); |
| | 216 | |
|
| 62 | 217 | | while ((!cancellationToken.IsCancellationRequested) && (!_closed)) |
| | 218 | | { |
| | 219 | | try |
| | 220 | | { |
| 62 | 221 | | await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)).Configur |
| 28 | 222 | | break; |
| | 223 | | } |
| | 224 | | catch (Exception ex) |
| | 225 | | { |
| 34 | 226 | | Exception activeEx = ex.TranslateServiceException(EventHubName); |
| | 227 | |
|
| | 228 | | // Determine if there should be a retry for the next attempt; if so enforce the delay but do not |
| | 229 | | // Otherwise, bubble the exception. |
| | 230 | |
|
| 34 | 231 | | ++failedAttemptCount; |
| 34 | 232 | | retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount); |
| | 233 | |
|
| 34 | 234 | | if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellation |
| | 235 | | { |
| 18 | 236 | | await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false); |
| 18 | 237 | | tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount); |
| | 238 | | } |
| 16 | 239 | | else if (ex is AmqpException) |
| | 240 | | { |
| 0 | 241 | | ExceptionDispatchInfo.Capture(activeEx).Throw(); |
| | 242 | | } |
| | 243 | | else |
| | 244 | | { |
| 16 | 245 | | throw; |
| | 246 | | } |
| | 247 | | } |
| | 248 | | } |
| | 249 | |
|
| | 250 | | // If MaximumMessageSize has not been populated nor exception thrown |
| | 251 | | // by this point, then cancellation has been requested. |
| | 252 | |
|
| 28 | 253 | | if (!MaximumMessageSize.HasValue) |
| | 254 | | { |
| 0 | 255 | | throw new TaskCanceledException(); |
| | 256 | | } |
| 28 | 257 | | } |
| | 258 | |
|
| | 259 | | // Ensure that there was a maximum size populated; if none was provided, |
| | 260 | | // default to the maximum size allowed by the link. |
| | 261 | |
|
| 28 | 262 | | options.MaximumSizeInBytes ??= MaximumMessageSize; |
| | 263 | |
|
| 28 | 264 | | Argument.AssertInRange(options.MaximumSizeInBytes.Value, EventHubProducerClient.MinimumBatchSizeLimit, Maxim |
| 26 | 265 | | return new AmqpEventBatch(MessageConverter, options); |
| 26 | 266 | | } |
| | 267 | |
|
| | 268 | | /// <summary> |
| | 269 | | /// Closes the connection to the transport producer instance. |
| | 270 | | /// </summary> |
| | 271 | | /// |
| | 272 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 273 | | /// |
| | 274 | | public override async Task CloseAsync(CancellationToken cancellationToken) |
| | 275 | | { |
| 14 | 276 | | if (_closed) |
| | 277 | | { |
| 0 | 278 | | return; |
| | 279 | | } |
| | 280 | |
|
| 14 | 281 | | _closed = true; |
| | 282 | |
|
| 14 | 283 | | var clientId = GetHashCode().ToString(CultureInfo.InvariantCulture); |
| 14 | 284 | | var clientType = GetType().Name; |
| | 285 | |
|
| | 286 | | try |
| | 287 | | { |
| 14 | 288 | | EventHubsEventSource.Log.ClientCloseStart(clientType, EventHubName, clientId); |
| 14 | 289 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 290 | |
|
| 12 | 291 | | if (SendLink?.TryGetOpenedObject(out var _) == true) |
| | 292 | | { |
| 0 | 293 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 0 | 294 | | await SendLink.CloseAsync().ConfigureAwait(false); |
| | 295 | | } |
| | 296 | |
|
| 12 | 297 | | SendLink?.Dispose(); |
| 12 | 298 | | } |
| 2 | 299 | | catch (Exception ex) |
| | 300 | | { |
| 2 | 301 | | _closed = false; |
| 2 | 302 | | EventHubsEventSource.Log.ClientCloseError(clientType, EventHubName, clientId, ex.Message); |
| | 303 | |
|
| 2 | 304 | | throw; |
| | 305 | | } |
| | 306 | | finally |
| | 307 | | { |
| 14 | 308 | | EventHubsEventSource.Log.ClientCloseComplete(clientType, EventHubName, clientId); |
| | 309 | | } |
| 12 | 310 | | } |
| | 311 | |
|
| | 312 | | /// <summary> |
| | 313 | | /// Sends an AMQP message that contains a batch of events to the associated Event Hub. If the size of events e |
| | 314 | | /// maximum size of a single batch, an exception will be triggered and the send will fail. |
| | 315 | | /// </summary> |
| | 316 | | /// |
| | 317 | | /// <param name="messageFactory">A factory which can be used to produce an AMQP message containing the batch of |
| | 318 | | /// <param name="partitionKey">The hashing key to use for influencing the partition to which events should be ro |
| | 319 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 320 | | /// |
| | 321 | | protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory, |
| | 322 | | string partitionKey, |
| | 323 | | CancellationToken cancellationToken) |
| | 324 | | { |
| 36 | 325 | | var failedAttemptCount = 0; |
| 36 | 326 | | var logPartition = PartitionId ?? partitionKey; |
| 36 | 327 | | var operationId = Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture); |
| 36 | 328 | | var stopWatch = ValueStopwatch.StartNew(); |
| | 329 | |
|
| | 330 | | TimeSpan? retryDelay; |
| | 331 | | SendingAmqpLink link; |
| | 332 | |
|
| | 333 | | try |
| | 334 | | { |
| 36 | 335 | | var tryTimeout = RetryPolicy.CalculateTryTimeout(0); |
| | 336 | |
|
| 72 | 337 | | while (!cancellationToken.IsCancellationRequested) |
| | 338 | | { |
| | 339 | | try |
| | 340 | | { |
| 68 | 341 | | using AmqpMessage batchMessage = messageFactory(); |
| | 342 | |
|
| | 343 | | // Creation of the link happens without explicit knowledge of the cancellation token |
| | 344 | | // used for this operation; validate the token state before attempting link creation and |
| | 345 | | // again after the operation completes to provide best efforts in respecting it. |
| | 346 | |
|
| 68 | 347 | | EventHubsEventSource.Log.EventPublishStart(EventHubName, logPartition, operationId); |
| 68 | 348 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 349 | |
|
| 68 | 350 | | link = await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)).C |
| 0 | 351 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 352 | |
|
| | 353 | | // Validate that the batch of messages is not too large to send. This is done after the link is |
| | 354 | | // that the maximum message size is known, as it is dictated by the service using the link. |
| | 355 | |
|
| 0 | 356 | | if (batchMessage.SerializedMessageSize > MaximumMessageSize) |
| | 357 | | { |
| 0 | 358 | | throw new EventHubsException(EventHubName, string.Format(CultureInfo.CurrentCulture, Resourc |
| | 359 | | } |
| | 360 | |
|
| | 361 | | // Attempt to send the message batch. |
| | 362 | |
|
| 0 | 363 | | var deliveryTag = new ArraySegment<byte>(BitConverter.GetBytes(Interlocked.Increment(ref _delive |
| 0 | 364 | | var outcome = await link.SendMessageAsync(batchMessage, deliveryTag, AmqpConstants.NullBinary, t |
| 0 | 365 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| | 366 | |
|
| 0 | 367 | | if (outcome.DescriptorCode != Accepted.Code) |
| | 368 | | { |
| 0 | 369 | | throw AmqpError.CreateExceptionForError((outcome as Rejected)?.Error, EventHubName); |
| | 370 | | } |
| | 371 | |
|
| | 372 | | // The send operation should be considered successful; return to |
| | 373 | | // exit the retry loop. |
| | 374 | |
|
| 0 | 375 | | return; |
| | 376 | | } |
| | 377 | | catch (Exception ex) |
| | 378 | | { |
| 68 | 379 | | Exception activeEx = ex.TranslateServiceException(EventHubName); |
| | 380 | |
|
| | 381 | | // Determine if there should be a retry for the next attempt; if so enforce the delay but do not |
| | 382 | | // Otherwise, bubble the exception. |
| | 383 | |
|
| 68 | 384 | | ++failedAttemptCount; |
| 68 | 385 | | retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount); |
| | 386 | |
|
| 68 | 387 | | if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellation |
| | 388 | | { |
| 36 | 389 | | EventHubsEventSource.Log.EventPublishError(EventHubName, logPartition, operationId, activeEx |
| 36 | 390 | | await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false); |
| | 391 | |
|
| 36 | 392 | | tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount); |
| 36 | 393 | | stopWatch = ValueStopwatch.StartNew(); |
| | 394 | | } |
| 32 | 395 | | else if (ex is AmqpException) |
| | 396 | | { |
| 0 | 397 | | ExceptionDispatchInfo.Capture(activeEx).Throw(); |
| | 398 | | } |
| | 399 | | else |
| | 400 | | { |
| 32 | 401 | | throw; |
| | 402 | | } |
| | 403 | | } |
| | 404 | | } |
| | 405 | |
|
| | 406 | | // If no value has been returned nor exception thrown by this point, |
| | 407 | | // then cancellation has been requested. |
| | 408 | |
|
| 4 | 409 | | throw new TaskCanceledException(); |
| | 410 | | } |
| 4 | 411 | | catch (TaskCanceledException) |
| | 412 | | { |
| 4 | 413 | | throw; |
| | 414 | | } |
| 32 | 415 | | catch (Exception ex) |
| | 416 | | { |
| 32 | 417 | | EventHubsEventSource.Log.EventPublishError(EventHubName, logPartition, operationId, ex.Message); |
| 32 | 418 | | throw; |
| | 419 | | } |
| | 420 | | finally |
| | 421 | | { |
| 36 | 422 | | EventHubsEventSource.Log.EventPublishComplete(EventHubName, logPartition, operationId, failedAttemptCoun |
| | 423 | | } |
| 0 | 424 | | } |
| | 425 | |
|
| | 426 | | /// <summary> |
| | 427 | | /// Creates the AMQP link to be used for producer-related operations and ensures |
| | 428 | | /// that the corresponding state for the producer has been updated based on the link |
| | 429 | | /// configuration. |
| | 430 | | /// </summary> |
| | 431 | | /// |
| | 432 | | /// <param name="partitionId">The identifier of the Event Hub partition to which it is bound; if unbound, <c>nul |
| | 433 | | /// <param name="timeout">The timeout to apply when creating the link.</param> |
| | 434 | | /// <param name="cancellationToken">The cancellation token to consider when creating the link.</param> |
| | 435 | | /// |
| | 436 | | /// <returns>The AMQP link to use for producer-related operations.</returns> |
| | 437 | | /// |
| | 438 | | /// <remarks> |
| | 439 | | /// This method will modify class-level state, setting those attributes that depend on the AMQP |
| | 440 | | /// link configuration. There exists a benign race condition in doing so, as there may be multiple |
| | 441 | | /// concurrent callers. In this case, the attributes may be set multiple times but the resulting |
| | 442 | | /// value will be the same. |
| | 443 | | /// </remarks> |
| | 444 | | /// |
| | 445 | | protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAsync(string partitionId, |
| | 446 | | TimeSpan timeout, |
| | 447 | | CancellationToken cancellati |
| | 448 | | { |
| 0 | 449 | | var link = default(SendingAmqpLink); |
| | 450 | |
|
| | 451 | | try |
| | 452 | | { |
| 0 | 453 | | link = await ConnectionScope.OpenProducerLinkAsync(partitionId, timeout, cancellationToken).ConfigureAwa |
| | 454 | |
|
| 0 | 455 | | if (!MaximumMessageSize.HasValue) |
| | 456 | | { |
| | 457 | | // This delay is necessary to prevent the link from causing issues for subsequent |
| | 458 | | // operations after creating a batch. Without it, operations using the link consistently |
| | 459 | | // timeout. The length of the delay does not appear significant, just the act of introducing |
| | 460 | | // an asynchronous delay. |
| | 461 | | // |
| | 462 | | // For consistency the value used by the legacy Event Hubs client has been brought forward and |
| | 463 | | // used here. |
| | 464 | |
|
| 0 | 465 | | await Task.Delay(15, cancellationToken).ConfigureAwait(false); |
| 0 | 466 | | MaximumMessageSize = (long)link.Settings.MaxMessageSize; |
| | 467 | | } |
| 0 | 468 | | } |
| | 469 | | catch (Exception ex) |
| | 470 | | { |
| 0 | 471 | | ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw |
| 0 | 472 | | } |
| | 473 | |
|
| 0 | 474 | | return link; |
| 0 | 475 | | } |
| | 476 | |
|
| | 477 | | /// <summary> |
| | 478 | | /// Uses the minimum value of the two specified <see cref="TimeSpan" /> instances. |
| | 479 | | /// </summary> |
| | 480 | | /// |
| | 481 | | /// <param name="firstOption">The first option to consider.</param> |
| | 482 | | /// <param name="secondOption">The second option to consider.</param> |
| | 483 | | /// |
| | 484 | | /// <returns>The smaller of the two specified intervals.</returns> |
| | 485 | | /// |
| | 486 | | private static TimeSpan UseMinimum(TimeSpan firstOption, |
| 130 | 487 | | TimeSpan secondOption) => (firstOption < secondOption) ? firstOption : second |
| | 488 | | } |
| | 489 | | } |