< Summary

Class:Azure.Messaging.ServiceBus.Amqp.AmqpSender
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpSender.cs
Covered lines:21
Uncovered lines:221
Coverable lines:242
Total lines:641
Line coverage:8.6% (21 of 242)
Covered branches:0
Total branches:66
Branch coverage:0% (0 of 66)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_IsClosed()-0%100%
get_MaxMessageSize()-0%100%
.ctor(...)-72.41%0%
OpenManagementLinkAsync()-0%100%
CreateMessageBatchAsync()-0%100%
<CreateMessageBatchAsync()-0%100%
CreateMessageBatchInternalAsync()-0%0%
SendBatchAsync()-0%100%
<SendBatchAsync()-0%100%
SendBatchInternalAsync()-0%0%
SendAsync()-0%100%
<SendAsync()-0%100%
CloseAsync()-0%0%
OnSenderLinkClosed(...)-0%100%
OnManagementLinkClosed(...)-0%100%
ScheduleMessagesAsync()-0%0%
<ScheduleMessagesAsync()-0%100%
ScheduleMessageInternalAsync()-0%0%
CancelScheduledMessagesAsync()-0%100%
<CancelScheduledMessagesAsync()-0%100%
CancelScheduledMessageInternalAsync()-0%0%
CreateLinkAndEnsureSenderStateAsync()-0%0%
UseMinimum(...)-0%0%
HasLinkCommunicationError(...)-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpSender.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Globalization;
 7using System.Runtime.ExceptionServices;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using System.Transactions;
 11using Azure.Core;
 12using Azure.Core.Diagnostics;
 13using Azure.Messaging.ServiceBus.Core;
 14using Azure.Messaging.ServiceBus.Diagnostics;
 15using Microsoft.Azure.Amqp;
 16using Microsoft.Azure.Amqp.Encoding;
 17using Microsoft.Azure.Amqp.Framing;
 18
 19namespace Azure.Messaging.ServiceBus.Amqp
 20{
 21    /// <summary>
 22    ///   A transport sender abstraction responsible for brokering operations for AMQP-based connections.
 23    ///   It is intended that the public <see cref="ServiceBusSender" /> make use of an instance
 24    ///   via containment and delegate operations to it.
 25    /// </summary>
 26    ///
 27    /// <seealso cref="Azure.Messaging.ServiceBus.Core.TransportSender" />
 28    ///
 29#pragma warning disable CA1001 // Types that own disposable fields should be disposable. The AmqpSender doesn't own the 
 30    internal class AmqpSender : TransportSender
 31#pragma warning restore CA1001 // Types that own disposable fields should be disposable
 32    {
 33        /// <summary>Indicates whether or not this instance has been closed.</summary>
 34        private bool _closed = false;
 35
 36        /// <summary>The count of send operations performed by this instance; this is used to tag deliveries for the AMQ
 37        private int _deliveryCount = 0;
 38
 39        /// <summary>
 40        ///   Indicates whether or not this sender has been closed.
 41        /// </summary>
 42        ///
 43        /// <value>
 44        ///   <c>true</c> if the sender is closed; otherwise, <c>false</c>.
 45        /// </value>
 46        ///
 047        public override bool IsClosed => _closed;
 48
 49        /// <summary>
 50        ///   The name of the Service Bus entity to which the sender is bound.
 51        /// </summary>
 52        ///
 53        private readonly string _entityPath;
 54
 55        /// <summary>
 56        /// The identifier for the sender.
 57        /// </summary>
 58        private readonly string _identifier;
 59
 60        /// <summary>
 61        /// An optional entity path to route the message through. Useful for transactions.
 62        /// </summary>
 63        private readonly string _viaEntityPath;
 64
 65        /// <summary>
 66        ///   The policy to use for determining retry behavior for when an operation fails.
 67        /// </summary>
 68        ///
 69        private readonly ServiceBusRetryPolicy _retryPolicy;
 70
 71        /// <summary>
 72        ///   The AMQP connection scope responsible for managing transport constructs for this instance.
 73        /// </summary>
 74        ///
 75        private readonly AmqpConnectionScope _connectionScope;
 76
 77        private readonly FaultTolerantAmqpObject<SendingAmqpLink> _sendLink;
 78
 79        private readonly FaultTolerantAmqpObject<RequestResponseAmqpLink> _managementLink;
 80
 81        /// <summary>
 82        ///   The maximum size of an AMQP message allowed by the associated
 83        ///   sender link.
 84        /// </summary>
 85        ///
 86        /// <value>The maximum message size, in bytes.</value>
 87        ///
 088        private long? MaxMessageSize { get; set; }
 89
 90        /// <summary>
 91        ///   Initializes a new instance of the <see cref="AmqpSender"/> class.
 92        /// </summary>
 93        ///
 94        /// <param name="entityPath">The name of the entity to which messages will be sent.</param>
 95        /// <param name="viaEntityPath">The entity path to route the message through. Useful when using transactions.</p
 96        /// <param name="connectionScope">The AMQP connection context for operations.</param>
 97        /// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
 98        /// <param name="identifier">The identifier for the sender.</param>
 99        ///
 100        /// <remarks>
 101        ///   As an internal type, this class performs only basic sanity checks against its arguments.  It
 102        ///   is assumed that callers are trusted and have performed deep validation.
 103        ///
 104        ///   Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
 105        ///   creation of clones or otherwise protecting the parameters is assumed to be the purview of the
 106        ///   caller.
 107        /// </remarks>
 108        ///
 12109        public AmqpSender(
 12110            string entityPath,
 12111            string viaEntityPath,
 12112            AmqpConnectionScope connectionScope,
 12113            ServiceBusRetryPolicy retryPolicy,
 12114            string identifier)
 115        {
 12116            Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));
 12117            Argument.AssertNotNull(connectionScope, nameof(connectionScope));
 12118            Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
 119
 12120            _entityPath = entityPath;
 12121            _identifier = identifier;
 12122            _viaEntityPath = viaEntityPath;
 12123            _retryPolicy = retryPolicy;
 12124            _connectionScope = connectionScope;
 125
 12126            _sendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(
 0127                timeout => CreateLinkAndEnsureSenderStateAsync(timeout, CancellationToken.None),
 12128                link =>
 12129                {
 0130                    link.Session?.SafeClose();
 0131                    link.SafeClose();
 0132                });
 133
 12134            _managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
 0135                timeout => OpenManagementLinkAsync(timeout),
 12136                link =>
 12137                {
 0138                    link.Session?.SafeClose();
 0139                    link.SafeClose();
 0140                });
 12141        }
 142
 143        private async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
 144            TimeSpan timeout)
 145        {
 0146            RequestResponseAmqpLink link = await _connectionScope.OpenManagementLinkAsync(
 0147                _entityPath,
 0148                _identifier,
 0149                timeout,
 0150                CancellationToken.None).ConfigureAwait(false);
 0151            link.Closed += OnManagementLinkClosed;
 0152            return link;
 0153        }
 154
 155        /// <summary>
 156        ///   Creates a size-constraint batch to which <see cref="ServiceBusMessage" /> may be added using a try-based p
 157        ///   exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal tha
 158        ///   return value.
 159        ///
 160        ///   Because messages that would violate the size constraint cannot be added, publishing a batch will not trigg
 161        ///   attempting to send the message to the Queue/Topic.
 162        /// </summary>
 163        ///
 164        /// <param name="options">The set of options to consider when creating this batch.</param>
 165        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 166        ///
 167        /// <returns>An <see cref="ServiceBusMessageBatch" /> with the requested <paramref name="options"/>.</returns>
 168        ///
 169        public override async ValueTask<TransportMessageBatch> CreateMessageBatchAsync(
 170            CreateMessageBatchOptions options,
 171            CancellationToken cancellationToken)
 172        {
 0173            TransportMessageBatch messageBatch = null;
 0174            Task createBatchTask = _retryPolicy.RunOperation(async (timeout) =>
 0175            {
 0176                messageBatch = await CreateMessageBatchInternalAsync(
 0177                    options,
 0178                    timeout).ConfigureAwait(false);
 0179            },
 0180            _connectionScope,
 0181            cancellationToken);
 0182            await createBatchTask.ConfigureAwait(false);
 0183            return messageBatch;
 0184        }
 185
 186        internal async ValueTask<TransportMessageBatch> CreateMessageBatchInternalAsync(
 187            CreateMessageBatchOptions options,
 188            TimeSpan timeout)
 189        {
 0190            Argument.AssertNotNull(options, nameof(options));
 191
 192            // Ensure that maximum message size has been determined; this depends on the underlying
 193            // AMQP link, so if not set, requesting the link will ensure that it is populated.
 194
 0195            if (!MaxMessageSize.HasValue)
 196            {
 0197                await _sendLink.GetOrCreateAsync(timeout).ConfigureAwait(false);
 198            }
 199
 200            // Ensure that there was a maximum size populated; if none was provided,
 201            // default to the maximum size allowed by the link.
 202
 0203            options.MaxSizeInBytes ??= MaxMessageSize;
 204
 0205            Argument.AssertInRange(options.MaxSizeInBytes.Value, ServiceBusSender.MinimumBatchSizeLimit, MaxMessageSize.
 0206            return new AmqpMessageBatch(options);
 0207        }
 208
 209        /// <summary>
 210        ///   Sends a set of messages to the associated Queue/Topic using a batched approach.
 211        /// </summary>
 212        ///
 213        /// <param name="messageBatch">The set of messages to send.</param>
 214        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 215        ///
 216        /// <returns>A task to be resolved on when the operation has completed.</returns>
 217        ///
 218        public override async Task SendBatchAsync(
 219            ServiceBusMessageBatch messageBatch,
 220            CancellationToken cancellationToken)
 221        {
 0222            AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageBatch.AsEnumerable<
 0223            await _retryPolicy.RunOperation(async (timeout) =>
 0224                await SendBatchInternalAsync(
 0225                    messageFactory,
 0226                    timeout,
 0227                    cancellationToken).ConfigureAwait(false),
 0228            _connectionScope,
 0229            cancellationToken).ConfigureAwait(false);
 0230        }
 231
 232        /// <summary>
 233        ///    Sends a set of messages to the associated Queue/Topic using a batched approach.
 234        /// </summary>
 235        ///
 236        /// <param name="messageFactory"></param>
 237        /// <param name="timeout"></param>
 238        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 239        ///
 240        internal virtual async Task SendBatchInternalAsync(
 241            Func<AmqpMessage> messageFactory,
 242            TimeSpan timeout,
 243            CancellationToken cancellationToken)
 244        {
 0245            var stopWatch = ValueStopwatch.StartNew();
 0246            var link = default(SendingAmqpLink);
 247
 248            try
 249            {
 0250                using (AmqpMessage batchMessage = messageFactory())
 251                {
 252
 0253                    string messageHash = batchMessage.GetHashCode().ToString(CultureInfo.InvariantCulture);
 254
 0255                    ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
 0256                    Transaction ambientTransaction = Transaction.Current;
 0257                    if (ambientTransaction != null)
 258                    {
 0259                        transactionId = await AmqpTransactionManager.Instance.EnlistAsync(
 0260                            ambientTransaction,
 0261                            _connectionScope,
 0262                            timeout).ConfigureAwait(false);
 263                    }
 264
 0265                    link = await _sendLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).Config
 266
 267                    // Validate that the message is not too large to send.  This is done after the link is created to en
 268                    // that the maximum message size is known, as it is dictated by the service using the link.
 269
 0270                    if (batchMessage.SerializedMessageSize > MaxMessageSize)
 271                    {
 0272                        throw new ServiceBusException(string.Format(CultureInfo.InvariantCulture, Resources.MessageSizeE
 273                    }
 274
 275                    // Attempt to send the message batch.
 276
 0277                    var deliveryTag = new ArraySegment<byte>(BitConverter.GetBytes(Interlocked.Increment(ref _deliveryCo
 0278                    Outcome outcome = await link.SendMessageAsync(
 0279                        batchMessage,
 0280                        deliveryTag,
 0281                    transactionId, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
 0282                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 283
 0284                    if (outcome.DescriptorCode != Accepted.Code)
 285                    {
 0286                        throw (outcome as Rejected)?.Error.ToMessagingContractException();
 287                    }
 288
 0289                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0290                }
 0291            }
 292            catch (Exception exception)
 293            {
 0294                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
 0295                    exception,
 0296                    link?.GetTrackingId(),
 0297                    null,
 0298                    HasLinkCommunicationError(link)))
 0299                .Throw();
 300
 0301                throw; // will never be reached
 302            }
 0303        }
 304
 305        /// <summary>
 306        ///   Sends a list of messages to the associated Service Bus entity using a batched approach.
 307        ///   If the size of the messages exceed the maximum size of a single batch,
 308        ///   an exception will be triggered and the send will fail. In order to ensure that the messages
 309        ///   being sent will fit in a batch, use <see cref="SendBatchAsync"/> instead.
 310        /// </summary>
 311        ///
 312        /// <param name="messages">The list of messages to send.</param>
 313        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 314        public override async Task SendAsync(
 315            IList<ServiceBusMessage> messages,
 316            CancellationToken cancellationToken)
 317        {
 0318            AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messages);
 0319            await _retryPolicy.RunOperation(async (timeout) =>
 0320             await SendBatchInternalAsync(
 0321                    messageFactory,
 0322                    timeout,
 0323                    cancellationToken).ConfigureAwait(false),
 0324            _connectionScope,
 0325            cancellationToken).ConfigureAwait(false);
 0326        }
 327
 328        /// <summary>
 329        ///   Closes the connection to the transport sender instance.
 330        /// </summary>
 331        ///
 332        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 333        ///
 334        public override async Task CloseAsync(CancellationToken cancellationToken)
 335        {
 0336            if (_closed)
 337            {
 0338                return;
 339            }
 340
 0341            _closed = true;
 342
 343            try
 344            {
 0345                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 346
 0347                if (_sendLink?.TryGetOpenedObject(out var _) == true)
 348                {
 0349                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0350                    await _sendLink.CloseAsync().ConfigureAwait(false);
 351                }
 352
 0353                if (_managementLink?.TryGetOpenedObject(out var _) == true)
 354                {
 0355                    cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0356                    await _managementLink.CloseAsync().ConfigureAwait(false);
 357                }
 358
 0359                _sendLink?.Dispose();
 0360                _managementLink?.Dispose();
 0361            }
 0362            catch (Exception)
 363            {
 0364                _closed = false;
 0365                throw;
 366            }
 0367        }
 368
 369        private void OnSenderLinkClosed(object sender, EventArgs e) =>
 0370            ServiceBusEventSource.Log.SendLinkClosed(
 0371                _identifier,
 0372                sender);
 373
 374        private void OnManagementLinkClosed(object managementLink, EventArgs e) =>
 0375            ServiceBusEventSource.Log.ManagementLinkClosed(
 0376                _identifier,
 0377                managementLink);
 378
 379        /// <summary>
 380        ///
 381        /// </summary>
 382        /// <param name="messages"></param>
 383        /// <param name="cancellationToken"></param>
 384        /// <returns></returns>
 385        public override async Task<long[]> ScheduleMessagesAsync(
 386            IList<ServiceBusMessage> messages,
 387            CancellationToken cancellationToken = default)
 388        {
 0389            long[] seqNumbers = null;
 0390            await _retryPolicy.RunOperation(async (timeout) =>
 0391            {
 0392                seqNumbers = await ScheduleMessageInternalAsync(
 0393                    messages,
 0394                    timeout,
 0395                    cancellationToken).ConfigureAwait(false);
 0396            },
 0397            _connectionScope,
 0398            cancellationToken).ConfigureAwait(false);
 0399            return seqNumbers ?? Array.Empty<long>();
 0400        }
 401
 402        /// <summary>
 403        ///
 404        /// </summary>
 405        /// <param name="messages"></param>
 406        /// <param name="timeout"></param>
 407        /// <param name="cancellationToken"></param>
 408        /// <returns></returns>
 409        internal async Task<long[]> ScheduleMessageInternalAsync(
 410            IList<ServiceBusMessage> messages,
 411            TimeSpan timeout,
 412            CancellationToken cancellationToken = default)
 413        {
 0414            var sendLink = default(SendingAmqpLink);
 415            try
 416            {
 417
 0418                var request = AmqpRequestMessage.CreateRequest(
 0419                        ManagementConstants.Operations.ScheduleMessageOperation,
 0420                        timeout,
 0421                        null);
 422
 0423                if (_sendLink.TryGetOpenedObject(out sendLink))
 424                {
 0425                    request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = send
 426                }
 427
 0428                List<AmqpMap> entries = new List<AmqpMap>();
 0429                foreach (ServiceBusMessage message in messages)
 430                {
 0431                    using AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message);
 0432                    var entry = new AmqpMap();
 0433                    ArraySegment<byte>[] payload = amqpMessage.GetPayload();
 0434                    var buffer = new BufferListStream(payload);
 0435                    ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);
 0436                    entry[ManagementConstants.Properties.Message] = value;
 0437                    entry[ManagementConstants.Properties.MessageId] = message.MessageId;
 438
 0439                    if (!string.IsNullOrWhiteSpace(message.SessionId))
 440                    {
 0441                        entry[ManagementConstants.Properties.SessionId] = message.SessionId;
 442                    }
 443
 0444                    if (!string.IsNullOrWhiteSpace(message.PartitionKey))
 445                    {
 0446                        entry[ManagementConstants.Properties.PartitionKey] = message.PartitionKey;
 447                    }
 448
 0449                    if (!string.IsNullOrWhiteSpace(message.ViaPartitionKey))
 450                    {
 0451                        entry[ManagementConstants.Properties.ViaPartitionKey] = message.ViaPartitionKey;
 452                    }
 453
 0454                    entries.Add(entry);
 455                }
 456
 0457                request.Map[ManagementConstants.Properties.Messages] = entries;
 458
 0459                AmqpResponseMessage amqpResponseMessage = await ManagementUtilities.ExecuteRequestResponseAsync(
 0460                    _connectionScope,
 0461                    _managementLink,
 0462                    request,
 0463                    timeout).ConfigureAwait(false);
 464
 0465                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 466
 0467                if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 468                {
 0469                    var sequenceNumbers = amqpResponseMessage.GetValue<long[]>(ManagementConstants.Properties.SequenceNu
 0470                    if (sequenceNumbers == null || sequenceNumbers.Length < 1)
 471                    {
 0472                        throw new ServiceBusException(true, "Could not schedule message successfully.");
 473                    }
 474
 0475                    return sequenceNumbers;
 476
 477                }
 478                else
 479                {
 0480                    throw amqpResponseMessage.ToMessagingContractException();
 481                }
 482            }
 483            catch (Exception exception)
 484            {
 0485                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
 0486                    exception,
 0487                    sendLink?.GetTrackingId(),
 0488                    null,
 0489                    HasLinkCommunicationError(sendLink)))
 0490                .Throw();
 491
 0492                throw; // will never be reached
 493            }
 0494        }
 495
 496        /// <summary>
 497        ///
 498        /// </summary>
 499        /// <param name="sequenceNumbers"></param>
 500        /// <param name="cancellationToken"></param>
 501        /// <returns></returns>
 502        public override async Task CancelScheduledMessagesAsync(
 503            long[] sequenceNumbers,
 504            CancellationToken cancellationToken = default)
 505        {
 0506            Task cancelMessageTask = _retryPolicy.RunOperation(async (timeout) =>
 0507            {
 0508                await CancelScheduledMessageInternalAsync(
 0509                    sequenceNumbers,
 0510                    timeout,
 0511                    cancellationToken).ConfigureAwait(false);
 0512            },
 0513            _connectionScope,
 0514            cancellationToken);
 0515            await cancelMessageTask.ConfigureAwait(false);
 0516        }
 517
 518        /// <summary>
 519        ///
 520        /// </summary>
 521        /// <param name="sequenceNumbers"></param>
 522        /// <param name="timeout"></param>
 523        /// <param name="cancellationToken"></param>
 524        /// <returns></returns>
 525        internal async Task CancelScheduledMessageInternalAsync(
 526            long[] sequenceNumbers,
 527            TimeSpan timeout,
 528            CancellationToken cancellationToken = default)
 529        {
 0530            var sendLink = default(SendingAmqpLink);
 531            try
 532            {
 0533                var request = AmqpRequestMessage.CreateRequest(
 0534                    ManagementConstants.Operations.CancelScheduledMessageOperation,
 0535                    timeout,
 0536                    null);
 537
 0538                if (_sendLink.TryGetOpenedObject(out sendLink))
 539                {
 0540                    request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = send
 541                }
 542
 0543                request.Map[ManagementConstants.Properties.SequenceNumbers] = sequenceNumbers;
 544
 0545                AmqpResponseMessage amqpResponseMessage = await ManagementUtilities.ExecuteRequestResponseAsync(
 0546                        _connectionScope,
 0547                        _managementLink,
 0548                        request,
 0549                        timeout).ConfigureAwait(false);
 550
 0551                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 552
 0553                if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
 554                {
 0555                    throw amqpResponseMessage.ToMessagingContractException();
 556                }
 0557            }
 558            catch (Exception exception)
 559            {
 0560                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
 0561                    exception,
 0562                    sendLink?.GetTrackingId(),
 0563                    null,
 0564                    HasLinkCommunicationError(sendLink)))
 0565                .Throw();
 566
 0567                throw; // will never be reached
 568            }
 0569        }
 570
 571        /// <summary>
 572        ///   Creates the AMQP link to be used for sender-related operations and ensures
 573        ///   that the corresponding state for the sender has been updated based on the link
 574        ///   configuration.
 575        /// </summary>
 576        ///
 577        /// <param name="timeout">The timeout to apply when creating the link.</param>
 578        /// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
 579        ///
 580        /// <returns>The AMQP link to use for sender-related operations.</returns>
 581        ///
 582        /// <remarks>
 583        ///   This method will modify class-level state, setting those attributes that depend on the AMQP
 584        ///   link configuration.  There exists a benign race condition in doing so, as there may be multiple
 585        ///   concurrent callers.  In this case, the attributes may be set multiple times but the resulting
 586        ///   value will be the same.
 587        /// </remarks>
 588        ///
 589        protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureSenderStateAsync(
 590            TimeSpan timeout,
 591            CancellationToken cancellationToken)
 592        {
 0593            ServiceBusEventSource.Log.CreateSendLinkStart(_identifier);
 594            try
 595            {
 0596                SendingAmqpLink link = await _connectionScope.OpenSenderLinkAsync(
 0597                    _entityPath,
 0598                    _viaEntityPath,
 0599                    timeout,
 0600                    cancellationToken).ConfigureAwait(false);
 601
 0602                if (!MaxMessageSize.HasValue)
 603                {
 604                    // This delay is necessary to prevent the link from causing issues for subsequent
 605                    // operations after creating a batch.  Without it, operations using the link consistently
 606                    // timeout.  The length of the delay does not appear significant, just the act of introducing
 607                    // an asynchronous delay.
 608                    //
 609                    // For consistency the value used by the legacy Service Bus client has been brought forward and
 610                    // used here.
 611
 0612                    await Task.Delay(15, cancellationToken).ConfigureAwait(false);
 0613                    MaxMessageSize = (long)link.Settings.MaxMessageSize;
 614                }
 0615                ServiceBusEventSource.Log.CreateSendLinkComplete(_identifier);
 0616                link.Closed += OnSenderLinkClosed;
 0617                return link;
 618            }
 0619            catch (Exception ex)
 620            {
 0621                ServiceBusEventSource.Log.CreateSendLinkException(_identifier, ex.ToString());
 0622                throw;
 623            }
 0624        }
 625
 626        /// <summary>
 627        ///   Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
 628        /// </summary>
 629        ///
 630        /// <param name="firstOption">The first option to consider.</param>
 631        /// <param name="secondOption">The second option to consider.</param>
 632        ///
 633        /// <returns>The smaller of the two specified intervals.</returns>
 634        ///
 635        private static TimeSpan UseMinimum(TimeSpan firstOption,
 0636                                           TimeSpan secondOption) => (firstOption < secondOption) ? firstOption : second
 637
 638        private bool HasLinkCommunicationError(SendingAmqpLink link) =>
 0639            !_closed && (link?.IsClosing() ?? false);
 640    }
 641}