< Summary

Class:Microsoft.Azure.ServiceBus.QueueClient
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\QueueClient.cs
Covered lines:0
Uncovered lines:134
Coverable lines:134
Total lines:541
Line coverage:0% (0 of 134)
Covered branches:0
Total branches:46
Branch coverage:0% (0 of 46)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%0%
.ctor(...)-0%0%
.ctor(...)-0%100%
.ctor(...)-0%0%
get_QueueName()-0%100%
get_ReceiveMode()-0%100%
get_OperationTimeout()-0%100%
set_OperationTimeout(...)-0%100%
get_Path()-0%100%
get_PrefetchCount()-0%100%
set_PrefetchCount(...)-0%0%
get_RegisteredPlugins()-0%100%
get_ServiceBusConnection()-0%100%
get_InnerSender()-0%0%
get_InnerReceiver()-0%0%
get_SessionClient()-0%0%
get_SessionPumpHost()-0%0%
get_CbsTokenProvider()-0%100%
SendAsync(...)-0%100%
SendAsync(...)-0%100%
CompleteAsync(...)-0%100%
AbandonAsync(...)-0%100%
DeadLetterAsync(...)-0%100%
DeadLetterAsync(...)-0%100%
RegisterMessageHandler(...)-0%100%
RegisterMessageHandler(...)-0%100%
RegisterSessionHandler(...)-0%100%
RegisterSessionHandler(...)-0%100%
ScheduleMessageAsync(...)-0%100%
CancelScheduledMessageAsync(...)-0%100%
RegisterPlugin(...)-0%100%
UnregisterPlugin(...)-0%100%
OnClosingAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\QueueClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Threading;
 9    using System.Threading.Tasks;
 10    using Microsoft.Azure.Amqp;
 11    using Microsoft.Azure.ServiceBus.Core;
 12    using Microsoft.Azure.ServiceBus.Primitives;
 13
 14    /// <summary>
 15    /// QueueClient can be used for all basic interactions with a Service Bus Queue.
 16    /// </summary>
 17    /// <example>
 18    /// Create a new QueueClient
 19    /// <code>
 20    /// IQueueClient queueClient = new QueueClient(
 21    ///     namespaceConnectionString,
 22    ///     queueName,
 23    ///     ReceiveMode.PeekLock,
 24    ///     RetryExponential);
 25    /// </code>
 26    ///
 27    /// Send a message to the queue:
 28    /// <code>
 29    /// byte[] data = GetData();
 30    /// await queueClient.SendAsync(data);
 31    /// </code>
 32    ///
 33    /// Register a message handler which will be invoked every time a message is received.
 34    /// <code>
 35    /// queueClient.RegisterMessageHandler(
 36    ///        async (message, token) =&gt;
 37    ///        {
 38    ///            // Process the message
 39    ///            Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{
 40    ///
 41    ///            // Complete the message so that it is not received again.
 42    ///            // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
 43    ///            await queueClient.CompleteAsync(message.SystemProperties.LockToken);
 44    ///        },
 45    ///        async (exceptionEvent) =&gt;
 46    ///        {
 47    ///            // Process the exception
 48    ///            Console.WriteLine("Exception = " + exceptionEvent.Exception);
 49    ///            return Task.CompletedTask;
 50    ///        });
 51    /// </code>
 52    /// </example>
 53    /// <remarks>Use <see cref="MessageSender"/> or <see cref="MessageReceiver"/> for advanced set of functionality.
 54    /// It uses AMQP protocol for communicating with servicebus.</remarks>
 55    public class QueueClient : ClientEntity, IQueueClient
 56    {
 57        readonly object syncLock;
 58
 59        int prefetchCount;
 60        MessageSender innerSender;
 61        MessageReceiver innerReceiver;
 62        SessionClient sessionClient;
 63        SessionPumpHost sessionPumpHost;
 64
 65        /// <summary>
 66        /// Instantiates a new <see cref="QueueClient"/> to perform operations on a queue.
 67        /// </summary>
 68        /// <param name="connectionStringBuilder"><see cref="ServiceBusConnectionStringBuilder"/> having namespace and q
 69        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 70        /// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></
 71        /// <remarks>Creates a new connection to the queue, which is opened during the first send/receive operation.</re
 72        public QueueClient(ServiceBusConnectionStringBuilder connectionStringBuilder, ReceiveMode receiveMode = ReceiveM
 073            : this(connectionStringBuilder?.GetNamespaceConnectionString(), connectionStringBuilder?.EntityPath, receive
 74        {
 075        }
 76
 77        /// <summary>
 78        /// Instantiates a new <see cref="QueueClient"/> to perform operations on a queue.
 79        /// </summary>
 80        /// <param name="connectionString">Namespace connection string. Must not contain queue information.</param>
 81        /// <param name="entityPath">Name of the queue</param>
 82        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 83        /// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></
 84        /// <remarks>Creates a new connection to the queue, which is opened during the first send/receive operation.</re
 85        public QueueClient(string connectionString, string entityPath, ReceiveMode receiveMode = ReceiveMode.PeekLock, R
 086            : this(new ServiceBusConnection(connectionString), entityPath, receiveMode, retryPolicy ?? RetryPolicy.Defau
 87        {
 088            if (string.IsNullOrWhiteSpace(connectionString))
 89            {
 090                throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(connectionString));
 91            }
 92
 093            this.OwnsConnection = true;
 094        }
 95
 96        /// <summary>
 97        /// Creates a new instance of the Queue client using the specified endpoint, entity path, and token provider.
 98        /// </summary>
 99        /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.
 100        /// <param name="entityPath">Queue path.</param>
 101        /// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
 102        /// <param name="transportType">Transport type.</param>
 103        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 104        /// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></
 105        /// <remarks>Creates a new connection to the queue, which is opened during the first send/receive operation.</re
 106        public QueueClient(
 107            string endpoint,
 108            string entityPath,
 109            ITokenProvider tokenProvider,
 110            TransportType transportType = TransportType.Amqp,
 111            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 112            RetryPolicy retryPolicy = null)
 0113            : this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, entit
 114        {
 0115            this.OwnsConnection = true;
 0116        }
 117
 118        /// <summary>
 119        /// Creates a new instance of the Queue client on a given <see cref="ServiceBusConnection"/>
 120        /// </summary>
 121        /// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
 122        /// <param name="entityPath">Queue path.</param>
 123        /// <param name="receiveMode">Mode of receive of messages. Default to <see cref="ReceiveMode"/>.PeekLock.</param
 124        /// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></
 125        public QueueClient(ServiceBusConnection serviceBusConnection, string entityPath, ReceiveMode receiveMode, RetryP
 0126            : base(nameof(QueueClient), entityPath, retryPolicy)
 127        {
 0128            MessagingEventSource.Log.QueueClientCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath, receiv
 129
 0130            if (string.IsNullOrWhiteSpace(entityPath))
 131            {
 0132                throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(entityPath));
 133            }
 134
 0135            this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnect
 0136            this.syncLock = new object();
 0137            this.QueueName = entityPath;
 0138            this.ReceiveMode = receiveMode;
 0139            this.OwnsConnection = false;
 0140            this.ServiceBusConnection.ThrowIfClosed();
 141
 0142            if (this.ServiceBusConnection.TokenProvider != null)
 143            {
 0144                this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBu
 145            }
 146            else
 147            {
 0148                throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
 149            }
 150
 0151            MessagingEventSource.Log.QueueClientCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.Cli
 0152        }
 153
 154        /// <summary>
 155        /// Gets the name of the queue.
 156        /// </summary>
 0157        public string QueueName { get; }
 158
 159        /// <summary>
 160        /// Gets the <see cref="ServiceBus.ReceiveMode"/> for the QueueClient.
 161        /// </summary>
 0162        public ReceiveMode ReceiveMode { get; }
 163
 164        /// <summary>
 165        /// Duration after which individual operations will timeout.
 166        /// </summary>
 167        public override TimeSpan OperationTimeout
 168        {
 0169            get => this.ServiceBusConnection.OperationTimeout;
 0170            set => this.ServiceBusConnection.OperationTimeout = value;
 171        }
 172
 173        /// <summary>
 174        /// Gets the name of the queue.
 175        /// </summary>
 0176        public override string Path => this.QueueName;
 177
 178        /// <summary>
 179        /// Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when a
 180        /// Setting a non-zero value prefetches PrefetchCount number of messages.
 181        /// Setting the value to zero turns prefetch off.
 182        /// Defaults to 0.
 183        /// </summary>
 184        /// <remarks>
 185        /// <para>
 186        /// When Prefetch is enabled, the client will quietly acquire more messages, up to the PrefetchCount limit, than
 187        /// immediately asks for. The message pump will therefore acquire a message for immediate consumption
 188        /// that will be returned as soon as available, and the client will proceed to acquire further messages to fill 
 189        /// </para>
 190        /// <para>
 191        /// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately s
 192        /// replenished in the background as space becomes available.If there are no messages available for delivery, th
 193        /// buffer and then wait or block as expected.
 194        /// </para>
 195        /// <para>Updates to this value take effect on the next receive call to the service.</para>
 196        /// </remarks>
 197        public int PrefetchCount
 198        {
 0199            get => this.prefetchCount;
 200            set
 201            {
 0202                if (value < 0)
 203                {
 0204                    throw Fx.Exception.ArgumentOutOfRange(nameof(this.PrefetchCount), value, "Value cannot be less than 
 205                }
 0206                this.prefetchCount = value;
 0207                if (this.innerReceiver != null)
 208                {
 0209                    this.innerReceiver.PrefetchCount = value;
 210                }
 0211                if (this.sessionClient != null)
 212                {
 0213                    this.sessionClient.PrefetchCount = value;
 214                }
 0215            }
 216        }
 217
 218        /// <summary>
 219        /// Gets a list of currently registered plugins for this QueueClient.
 220        /// </summary>
 0221        public override IList<ServiceBusPlugin> RegisteredPlugins => this.InnerSender.RegisteredPlugins;
 222
 223        /// <summary>
 224        /// Connection object to the service bus namespace.
 225        /// </summary>
 0226        public override ServiceBusConnection ServiceBusConnection { get; }
 227
 228        internal MessageSender InnerSender
 229        {
 230            get
 231            {
 0232                if (this.innerSender == null)
 233                {
 0234                    lock (this.syncLock)
 235                    {
 0236                        if (this.innerSender == null)
 237                        {
 0238                            this.innerSender = new MessageSender(
 0239                                this.QueueName,
 0240                                null,
 0241                                MessagingEntityType.Queue,
 0242                                this.ServiceBusConnection,
 0243                                this.CbsTokenProvider,
 0244                                this.RetryPolicy);
 245                        }
 0246                    }
 247                }
 248
 0249                return this.innerSender;
 250            }
 251        }
 252
 253        internal MessageReceiver InnerReceiver
 254        {
 255            get
 256            {
 0257                if (this.innerReceiver == null)
 258                {
 0259                    lock (this.syncLock)
 260                    {
 0261                        if (this.innerReceiver == null)
 262                        {
 0263                            this.innerReceiver = new MessageReceiver(
 0264                                this.QueueName,
 0265                                MessagingEntityType.Queue,
 0266                                this.ReceiveMode,
 0267                                this.ServiceBusConnection,
 0268                                this.CbsTokenProvider,
 0269                                this.RetryPolicy,
 0270                                this.PrefetchCount);
 271                        }
 0272                    }
 273                }
 274
 0275                return this.innerReceiver;
 276            }
 277        }
 278
 279        internal SessionClient SessionClient
 280        {
 281            get
 282            {
 0283                if (this.sessionClient == null)
 284                {
 0285                    lock (this.syncLock)
 286                    {
 0287                        if (this.sessionClient == null)
 288                        {
 0289                            this.sessionClient = new SessionClient(
 0290                                this.ClientId,
 0291                                this.Path,
 0292                                MessagingEntityType.Queue,
 0293                                this.ReceiveMode,
 0294                                this.PrefetchCount,
 0295                                this.ServiceBusConnection,
 0296                                this.CbsTokenProvider,
 0297                                this.RetryPolicy,
 0298                                this.RegisteredPlugins);
 299
 300
 301                        }
 0302                    }
 303                }
 304
 0305                return this.sessionClient;
 306            }
 307        }
 308
 309        internal SessionPumpHost SessionPumpHost
 310        {
 311            get
 312            {
 0313                if (this.sessionPumpHost == null)
 314                {
 0315                    lock (this.syncLock)
 316                    {
 0317                        if (this.sessionPumpHost == null)
 318                        {
 0319                            this.sessionPumpHost = new SessionPumpHost(
 0320                                this.ClientId,
 0321                                this.ReceiveMode,
 0322                                this.SessionClient,
 0323                                this.ServiceBusConnection.Endpoint);
 324                        }
 0325                    }
 326                }
 327
 0328                return this.sessionPumpHost;
 329            }
 330        }
 331
 0332        ICbsTokenProvider CbsTokenProvider { get; }
 333
 334        /// <summary>
 335        /// Sends a message to Service Bus.
 336        /// </summary>
 337        public Task SendAsync(Message message)
 338        {
 0339            return this.SendAsync(new[] { message });
 340        }
 341
 342        /// <summary>
 343        /// Sends a list of messages to Service Bus.
 344        /// When called on partitioned entities, messages meant for different partitions cannot be batched together.
 345        /// </summary>
 346        public Task SendAsync(IList<Message> messageList)
 347        {
 0348            this.ThrowIfClosed();
 349
 0350            return this.InnerSender.SendAsync(messageList);
 351        }
 352
 353        /// <summary>
 354        /// Completes a <see cref="Message"/> using its lock token. This will delete the message from the queue.
 355        /// </summary>
 356        /// <param name="lockToken">The lock token of the corresponding message to complete.</param>
 357        /// <remarks>
 358        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 359        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 360        /// This operation can only be performed on messages that were received by this client.
 361        /// </remarks>
 362        public Task CompleteAsync(string lockToken)
 363        {
 0364            this.ThrowIfClosed();
 0365            return this.InnerReceiver.CompleteAsync(lockToken);
 366        }
 367
 368        /// <summary>
 369        /// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processi
 370        /// </summary>
 371        /// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
 372        /// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</para
 373        /// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 374        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 375        /// Abandoning a message will increase the delivery count on the message.
 376        /// This operation can only be performed on messages that were received by this client.
 377        /// </remarks>
 378        /// This operation can only be performed on messages that were received by this client.
 379        public Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
 380        {
 0381            this.ThrowIfClosed();
 0382            return this.InnerReceiver.AbandonAsync(lockToken, propertiesToModify);
 383        }
 384
 385        /// <summary>
 386        /// Moves a message to the deadletter sub-queue.
 387        /// </summary>
 388        /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
 389        /// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
 390        /// <remarks>
 391        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 392        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 393        /// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>,
 394        /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
 395        /// This operation can only be performed on messages that were received by this receiver.
 396        /// </remarks>
 397        public Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
 398        {
 0399            this.ThrowIfClosed();
 0400            return this.InnerReceiver.DeadLetterAsync(lockToken, propertiesToModify);
 401        }
 402
 403        /// <summary>
 404        /// Moves a message to the deadletter sub-queue.
 405        /// </summary>
 406        /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
 407        /// <param name="deadLetterReason">The reason for deadlettering the message.</param>
 408        /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
 409        /// <remarks>
 410        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 411        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 412        /// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>,
 413        /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
 414        /// This operation can only be performed on messages that were received by this receiver.
 415        /// </remarks>
 416        public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null)
 417        {
 0418            this.ThrowIfClosed();
 0419            return this.InnerReceiver.DeadLetterAsync(lockToken, deadLetterReason, deadLetterErrorDescription);
 420        }
 421
 422        /// <summary>
 423        /// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receiv
 424        /// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is
 425        /// </summary>
 426        /// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param
 427        /// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
 428        /// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
 429        /// <remarks>Enable prefetch to speed up the receive rate.
 430        /// Use <see cref="RegisterMessageHandler(Func{Message,CancellationToken,Task}, MessageHandlerOptions)"/> to con
 431        public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventAr
 432        {
 0433            this.RegisterMessageHandler(handler, new MessageHandlerOptions(exceptionReceivedHandler));
 0434        }
 435
 436        /// <summary>
 437        /// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receiv
 438        /// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is
 439        /// </summary>
 440        /// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param
 441        /// <param name="messageHandlerOptions">The <see cref="MessageHandlerOptions"/> options used to configure the se
 442        /// <remarks>Enable prefetch to speed up the receive rate.</remarks>
 443        public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions message
 444        {
 0445            this.ThrowIfClosed();
 0446            this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
 0447        }
 448
 449        /// <summary>
 450        /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to
 451        /// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time
 452        /// </summary>
 453        /// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes
 454        /// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon
 455        /// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
 456        /// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
 457        /// <remarks>Enable prefetch to speed up the receive rate.
 458        /// Use <see cref="RegisterSessionHandler(Func{IMessageSession,Message,CancellationToken,Task}, SessionHandlerOp
 459        public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, Func<Excepti
 460        {
 0461            var sessionHandlerOptions = new SessionHandlerOptions(exceptionReceivedHandler);
 0462            this.RegisterSessionHandler(handler, sessionHandlerOptions);
 0463        }
 464
 465        /// <summary>
 466        /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to
 467        /// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time
 468        /// </summary>
 469        /// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes
 470        /// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon
 471        /// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
 472        /// <remarks>Enable prefetch to speed up the receive rate. </remarks>
 473        public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandl
 474        {
 0475            this.ThrowIfClosed();
 0476            this.SessionPumpHost.OnSessionHandler(handler, sessionHandlerOptions);
 0477        }
 478
 479        /// <summary>
 480        /// Schedules a message to appear on Service Bus at a later time.
 481        /// </summary>
 482        /// <param name="scheduleEnqueueTimeUtc">The UTC time at which the message should be available for processing</p
 483        /// <returns>The sequence number of the message that was scheduled.</returns>
 484        public Task<long> ScheduleMessageAsync(Message message, DateTimeOffset scheduleEnqueueTimeUtc)
 485        {
 0486            this.ThrowIfClosed();
 0487            return this.InnerSender.ScheduleMessageAsync(message, scheduleEnqueueTimeUtc);
 488        }
 489
 490        /// <summary>
 491        /// Cancels a message that was scheduled.
 492        /// </summary>
 493        /// <param name="sequenceNumber">The <see cref="Message.SystemPropertiesCollection.SequenceNumber"/> of the mess
 494        public Task CancelScheduledMessageAsync(long sequenceNumber)
 495        {
 0496            this.ThrowIfClosed();
 0497            return this.InnerSender.CancelScheduledMessageAsync(sequenceNumber);
 498        }
 499
 500        /// <summary>
 501        /// Registers a <see cref="ServiceBusPlugin"/> to be used with this queue client.
 502        /// </summary>
 503        public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)
 504        {
 0505            this.ThrowIfClosed();
 0506            this.InnerSender.RegisterPlugin(serviceBusPlugin);
 0507            this.InnerReceiver.RegisterPlugin(serviceBusPlugin);
 0508        }
 509
 510        /// <summary>
 511        /// Unregisters a <see cref="ServiceBusPlugin"/>.
 512        /// </summary>
 513        /// <param name="serviceBusPluginName">The name <see cref="ServiceBusPlugin.Name"/> to be unregistered</param>
 514        public override void UnregisterPlugin(string serviceBusPluginName)
 515        {
 0516            this.ThrowIfClosed();
 0517            this.InnerSender.UnregisterPlugin(serviceBusPluginName);
 0518            this.InnerReceiver.UnregisterPlugin(serviceBusPluginName);
 0519        }
 520
 521        protected override async Task OnClosingAsync()
 522        {
 0523            if (this.innerSender != null)
 524            {
 0525                await this.innerSender.CloseAsync().ConfigureAwait(false);
 526            }
 527
 0528            if (this.innerReceiver != null)
 529            {
 0530                await this.innerReceiver.CloseAsync().ConfigureAwait(false);
 531            }
 532
 0533            this.sessionPumpHost?.Close();
 534
 0535            if (this.sessionClient != null)
 536            {
 0537                await this.sessionClient.CloseAsync().ConfigureAwait(false);
 538            }
 0539        }
 540    }
 541}