< Summary

Class:Microsoft.Azure.ServiceBus.SubscriptionClient
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\SubscriptionClient.cs
Covered lines:0
Uncovered lines:176
Coverable lines:176
Total lines:642
Line coverage:0% (0 of 176)
Covered branches:0
Total branches:62
Branch coverage:0% (0 of 62)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%0%
.ctor(...)-0%0%
.ctor(...)-0%100%
.ctor(...)-0%0%
get_TopicPath()-0%100%
get_Path()-0%100%
get_SubscriptionName()-0%100%
get_ReceiveMode()-0%100%
get_OperationTimeout()-0%100%
set_OperationTimeout(...)-0%100%
get_PrefetchCount()-0%100%
set_PrefetchCount(...)-0%0%
get_ServiceBusConnection()-0%100%
get_InnerSubscriptionClient()-0%0%
get_SessionClient()-0%0%
get_SessionPumpHost()-0%0%
get_CbsTokenProvider()-0%100%
CompleteAsync(...)-0%100%
AbandonAsync(...)-0%100%
DeadLetterAsync(...)-0%100%
DeadLetterAsync(...)-0%100%
DeadLetterAsync(...)-0%100%
RegisterMessageHandler(...)-0%100%
RegisterMessageHandler(...)-0%100%
RegisterSessionHandler(...)-0%100%
RegisterSessionHandler(...)-0%100%
AddRuleAsync(...)-0%100%
AddRuleAsync()-0%0%
RemoveRuleAsync()-0%0%
GetRulesAsync()-0%0%
get_RegisteredPlugins()-0%100%
RegisterPlugin(...)-0%100%
UnregisterPlugin(...)-0%100%
OnClosingAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\SubscriptionClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Diagnostics;
 9    using System.Threading;
 10    using System.Threading.Tasks;
 11    using Microsoft.Azure.Amqp;
 12    using Microsoft.Azure.ServiceBus.Amqp;
 13    using Microsoft.Azure.ServiceBus.Core;
 14    using Microsoft.Azure.ServiceBus.Primitives;
 15
 16    /// <summary>
 17    /// SubscriptionClient can be used for all basic interactions with a Service Bus Subscription.
 18    /// </summary>
 19    /// <example>
 20    /// Create a new SubscriptionClient
 21    /// <code>
 22    /// ISubscriptionClient subscriptionClient = new SubscriptionClient(
 23    ///     namespaceConnectionString,
 24    ///     topicName,
 25    ///     subscriptionName,
 26    ///     ReceiveMode.PeekLock,
 27    ///     RetryExponential);
 28    /// </code>
 29    ///
 30    /// Register a message handler which will be invoked every time a message is received.
 31    /// <code>
 32    /// subscriptionClient.RegisterMessageHandler(
 33    ///        async (message, token) =&gt;
 34    ///        {
 35    ///            // Process the message
 36    ///            Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{
 37    ///
 38    ///            // Complete the message so that it is not received again.
 39    ///            // This can be done only if the subscriptionClient is opened in ReceiveMode.PeekLock mode.
 40    ///            await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
 41    ///        },
 42    ///        async (exceptionEvent) =&gt;
 43    ///        {
 44    ///            // Process the exception
 45    ///            Console.WriteLine("Exception = " + exceptionEvent.Exception);
 46    ///            return Task.CompletedTask;
 47    ///        });
 48    /// </code>
 49    /// </example>
 50    /// <remarks>It uses AMQP protocol for communicating with service bus. Use <see cref="MessageReceiver"/> for advance
 51    public class SubscriptionClient : ClientEntity, ISubscriptionClient
 52    {
 53        int prefetchCount;
 54        readonly object syncLock;
 55        readonly ServiceBusDiagnosticSource diagnosticSource;
 56
 57        IInnerSubscriptionClient innerSubscriptionClient;
 58        SessionClient sessionClient;
 59        SessionPumpHost sessionPumpHost;
 60
 61        /// <summary>
 62        /// Instantiates a new <see cref="SubscriptionClient"/> to perform operations on a subscription.
 63        /// </summary>
 64        /// <param name="connectionStringBuilder"><see cref="ServiceBusConnectionStringBuilder"/> having namespace and t
 65        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 66        /// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Defau
 67        /// <remarks>Creates a new connection to the subscription, which is opened during the first receive operation.</
 68        public SubscriptionClient(ServiceBusConnectionStringBuilder connectionStringBuilder, string subscriptionName, Re
 069            : this(connectionStringBuilder?.GetNamespaceConnectionString(), connectionStringBuilder?.EntityPath, subscri
 70        {
 071        }
 72
 73        /// <summary>
 74        /// Instantiates a new <see cref="SubscriptionClient"/> to perform operations on a subscription.
 75        /// </summary>
 76        /// <param name="connectionString">Namespace connection string. Must not contain topic or subscription informati
 77        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 78        /// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Defau
 79        /// <remarks>Creates a new connection to the subscription, which is opened during the first receive operation.</
 80        public SubscriptionClient(string connectionString, string topicPath, string subscriptionName, ReceiveMode receiv
 081            : this(new ServiceBusConnection(connectionString), topicPath, subscriptionName, receiveMode, retryPolicy ?? 
 82        {
 083            if (string.IsNullOrWhiteSpace(connectionString))
 84            {
 085                throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
 86            }
 87
 088            this.OwnsConnection = true;
 089        }
 90
 91        /// <summary>
 92        /// Creates a new instance of the Subscription client using the specified endpoint, entity path, and token provi
 93        /// </summary>
 94        /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.
 95        /// <param name="topicPath">Topic path.</param>
 96        /// <param name="subscriptionName">Subscription name.</param>
 97        /// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
 98        /// <param name="transportType">Transport type.</param>
 99        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 100        /// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Defau
 101        /// <remarks>Creates a new connection to the subscription, which is opened during the first receive operation.</
 102        public SubscriptionClient(
 103            string endpoint,
 104            string topicPath,
 105            string subscriptionName,
 106            ITokenProvider tokenProvider,
 107            TransportType transportType = TransportType.Amqp,
 108            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 109            RetryPolicy retryPolicy = null)
 0110            : this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, topic
 111        {
 0112            this.OwnsConnection = true;
 0113        }
 114
 115        /// <summary>
 116        /// Creates a new instance of the Subscription client on a given <see cref="ServiceBusConnection"/>
 117        /// </summary>
 118        /// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
 119        /// <param name="topicPath">Topic path.</param>
 120        /// <param name="subscriptionName">Subscription name.</param>
 121        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 122        /// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Defau
 123        public SubscriptionClient(ServiceBusConnection serviceBusConnection, string topicPath, string subscriptionName, 
 0124            : base(nameof(SubscriptionClient), $"{topicPath}/{subscriptionName}", retryPolicy)
 125        {
 0126            if (string.IsNullOrWhiteSpace(topicPath))
 127            {
 0128                throw Fx.Exception.ArgumentNullOrWhiteSpace(topicPath);
 129            }
 0130            if (string.IsNullOrWhiteSpace(subscriptionName))
 131            {
 0132                throw Fx.Exception.ArgumentNullOrWhiteSpace(subscriptionName);
 133            }
 134
 0135            MessagingEventSource.Log.SubscriptionClientCreateStart(serviceBusConnection?.Endpoint.Authority, topicPath, 
 136
 0137            this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnect
 0138            this.syncLock = new object();
 0139            this.TopicPath = topicPath;
 0140            this.SubscriptionName = subscriptionName;
 0141            this.Path = EntityNameHelper.FormatSubscriptionPath(this.TopicPath, this.SubscriptionName);
 0142            this.ReceiveMode = receiveMode;
 0143            this.diagnosticSource = new ServiceBusDiagnosticSource(this.Path, serviceBusConnection.Endpoint);
 0144            this.OwnsConnection = false;
 0145            this.ServiceBusConnection.ThrowIfClosed();
 146
 0147            if (this.ServiceBusConnection.TokenProvider != null)
 148            {
 0149                this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBu
 150            }
 151            else
 152            {
 0153                throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
 154            }
 155
 0156            MessagingEventSource.Log.SubscriptionClientCreateStop(serviceBusConnection.Endpoint.Authority, topicPath, su
 0157        }
 158
 159        /// <summary>
 160        /// Gets the path of the corresponding topic.
 161        /// </summary>
 0162        public string TopicPath { get; }
 163
 164        /// <summary>
 165        /// Gets the formatted path of the subscription client.
 166        /// </summary>
 167        /// <seealso cref="EntityNameHelper.FormatSubscriptionPath(string, string)"/>
 0168        public override string Path { get; }
 169
 170        /// <summary>
 171        /// Gets the name of the subscription.
 172        /// </summary>
 0173        public string SubscriptionName { get; }
 174
 175        /// <summary>
 176        /// Gets the <see cref="ServiceBus.ReceiveMode"/> for the SubscriptionClient.
 177        /// </summary>
 0178        public ReceiveMode ReceiveMode { get; }
 179
 180        /// <summary>
 181        /// Duration after which individual operations will timeout.
 182        /// </summary>
 183        public override TimeSpan OperationTimeout
 184        {
 0185            get => this.ServiceBusConnection.OperationTimeout;
 0186            set => this.ServiceBusConnection.OperationTimeout = value;
 187        }
 188
 189        /// <summary>
 190        /// Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when a
 191        /// Setting a non-zero value prefetches PrefetchCount number of messages.
 192        /// Setting the value to zero turns prefetch off.
 193        /// Defaults to 0.
 194        /// </summary>
 195        /// <remarks>
 196        /// <para>
 197        /// When Prefetch is enabled, the client will quietly acquire more messages, up to the PrefetchCount limit, than
 198        /// immediately asks for. The message pump will therefore acquire a message for immediate consumption
 199        /// that will be returned as soon as available, and the client will proceed to acquire further messages to fill 
 200        /// </para>
 201        /// <para>
 202        /// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately s
 203        /// replenished in the background as space becomes available.If there are no messages available for delivery, th
 204        /// buffer and then wait or block as expected.
 205        /// </para>
 206        /// <para>Updates to this value take effect on the next receive call to the service.</para>
 207        /// </remarks>
 208        public int PrefetchCount
 209        {
 0210            get => this.prefetchCount;
 211            set
 212            {
 0213                if (value < 0)
 214                {
 0215                    throw Fx.Exception.ArgumentOutOfRange(nameof(this.PrefetchCount), value, "Value cannot be less than 
 216                }
 0217                this.prefetchCount = value;
 0218                if (this.innerSubscriptionClient != null)
 219                {
 0220                    this.innerSubscriptionClient.PrefetchCount = value;
 221                }
 0222                if (this.sessionClient != null)
 223                {
 0224                    this.sessionClient.PrefetchCount = value;
 225                }
 0226            }
 227        }
 228
 229        /// <summary>
 230        /// Connection object to the service bus namespace.
 231        /// </summary>
 0232        public override ServiceBusConnection ServiceBusConnection { get; }
 233
 234        internal IInnerSubscriptionClient InnerSubscriptionClient
 235        {
 236            get
 237            {
 0238                if (this.innerSubscriptionClient == null)
 239                {
 0240                    lock (this.syncLock)
 241                    {
 0242                        this.innerSubscriptionClient = new AmqpSubscriptionClient(
 0243                            this.Path,
 0244                            this.ServiceBusConnection,
 0245                            this.RetryPolicy,
 0246                            this.CbsTokenProvider,
 0247                            this.PrefetchCount,
 0248                            this.ReceiveMode);
 0249                    }
 250                }
 251
 0252                return this.innerSubscriptionClient;
 253            }
 254        }
 255
 256        internal SessionClient SessionClient
 257        {
 258            get
 259            {
 0260                if (this.sessionClient == null)
 261                {
 0262                    lock (this.syncLock)
 263                    {
 0264                        if (this.sessionClient == null)
 265                        {
 0266                            this.sessionClient = new SessionClient(
 0267                                this.ClientId,
 0268                                this.Path,
 0269                                MessagingEntityType.Subscriber,
 0270                                this.ReceiveMode,
 0271                                this.PrefetchCount,
 0272                                this.ServiceBusConnection,
 0273                                this.CbsTokenProvider,
 0274                                this.RetryPolicy,
 0275                                this.RegisteredPlugins);
 276                        }
 0277                    }
 278                }
 279
 0280                return this.sessionClient;
 281            }
 282        }
 283
 284        internal SessionPumpHost SessionPumpHost
 285        {
 286            get
 287            {
 0288                if (this.sessionPumpHost == null)
 289                {
 0290                    lock (this.syncLock)
 291                    {
 0292                        if (this.sessionPumpHost == null)
 293                        {
 0294                            this.sessionPumpHost = new SessionPumpHost(
 0295                                this.ClientId,
 0296                                this.ReceiveMode,
 0297                                this.SessionClient,
 0298                                this.ServiceBusConnection.Endpoint);
 299                        }
 0300                    }
 301                }
 302
 0303                return this.sessionPumpHost;
 304            }
 305        }
 306
 0307        ICbsTokenProvider CbsTokenProvider { get; }
 308
 309        /// <summary>
 310        /// Completes a <see cref="Message"/> using its lock token. This will delete the message from the subscription.
 311        /// </summary>
 312        /// <param name="lockToken">The lock token of the corresponding message to complete.</param>
 313        /// <remarks>
 314        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 315        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 316        /// This operation can only be performed on messages that were received by this client.
 317        /// </remarks>
 318        public Task CompleteAsync(string lockToken)
 319        {
 0320            this.ThrowIfClosed();
 0321            return this.InnerSubscriptionClient.InnerReceiver.CompleteAsync(lockToken);
 322        }
 323
 324        /// <summary>
 325        /// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processi
 326        /// </summary>
 327        /// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
 328        /// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</para
 329        /// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 330        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 331        /// Abandoning a message will increase the delivery count on the message.
 332        /// This operation can only be performed on messages that were received by this client.
 333        /// </remarks>
 334        public Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
 335        {
 0336            this.ThrowIfClosed();
 0337            return this.InnerSubscriptionClient.InnerReceiver.AbandonAsync(lockToken, propertiesToModify);
 338        }
 339
 340        /// <summary>
 341        /// Moves a message to the deadletter sub-queue.
 342        /// </summary>
 343        /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
 344        /// <remarks>
 345        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 346        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 347        /// In order to receive a message from the deadletter sub-queue, you will need a new <see cref="IMessageReceiver
 348        /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
 349        /// This operation can only be performed on messages that were received by this client.
 350        /// </remarks>
 351        public Task DeadLetterAsync(string lockToken)
 352        {
 0353            this.ThrowIfClosed();
 0354            return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken);
 355        }
 356
 357        /// <summary>
 358        /// Moves a message to the deadletter sub-queue.
 359        /// </summary>
 360        /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
 361        /// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param>
 362        /// <remarks>
 363        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 364        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 365        /// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>,
 366        /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
 367        /// This operation can only be performed on messages that were received by this receiver.
 368        /// </remarks>
 369        public Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify)
 370        {
 0371            this.ThrowIfClosed();
 0372            return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken, propertiesToModify);
 373        }
 374
 375        /// <summary>
 376        /// Moves a message to the deadletter sub-queue.
 377        /// </summary>
 378        /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param>
 379        /// <param name="deadLetterReason">The reason for deadlettering the message.</param>
 380        /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param>
 381        /// <remarks>
 382        /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>,
 383        /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>.
 384        /// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>,
 385        /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this.
 386        /// This operation can only be performed on messages that were received by this receiver.
 387        /// </remarks>
 388        public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null)
 389        {
 0390            this.ThrowIfClosed();
 0391            return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken, deadLetterReason, deadLetterErr
 392        }
 393
 394        /// <summary>
 395        /// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receiv
 396        /// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is
 397        /// </summary>
 398        /// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param
 399        /// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
 400        /// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
 401        /// <remarks>Enable prefetch to speed up the receive rate.
 402        /// Use <see cref="RegisterMessageHandler(Func{Message,CancellationToken,Task}, MessageHandlerOptions)"/> to con
 403        public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventAr
 404        {
 0405            this.ThrowIfClosed();
 0406            this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, exceptionReceivedHandler);
 0407        }
 408
 409        /// <summary>
 410        /// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receiv
 411        /// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is
 412        /// </summary>
 413        /// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param
 414        /// <param name="messageHandlerOptions">The <see cref="MessageHandlerOptions"/> options used to configure the se
 415        /// <remarks>Enable prefetch to speed up the receive rate.</remarks>
 416        public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions message
 417        {
 0418            this.ThrowIfClosed();
 0419            this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
 0420        }
 421
 422        /// <summary>
 423        /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to
 424        /// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time
 425        /// </summary>
 426        /// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes
 427        /// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon
 428        /// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
 429        /// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
 430        /// <remarks>  Enable prefetch to speed up the receive rate.
 431        /// Use <see cref="RegisterSessionHandler(Func{IMessageSession,Message,CancellationToken,Task}, SessionHandlerOp
 432        public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, Func<Excepti
 433        {
 0434            var sessionHandlerOptions = new SessionHandlerOptions(exceptionReceivedHandler);
 0435            this.RegisterSessionHandler(handler, sessionHandlerOptions);
 0436        }
 437
 438        /// <summary>
 439        /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to
 440        /// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time
 441        /// </summary>
 442        /// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes
 443        /// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon
 444        /// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
 445        /// <remarks>Enable prefetch to speed up the receive rate. </remarks>
 446        public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandl
 447        {
 0448            this.ThrowIfClosed();
 0449            this.SessionPumpHost.OnSessionHandler(handler, sessionHandlerOptions);
 0450        }
 451
 452        /// <summary>
 453        /// Adds a rule to the current subscription to filter the messages reaching from topic to the subscription.
 454        /// </summary>
 455        /// <param name="filter">The filter expression against which messages will be matched.</param>
 456        /// <returns>A task instance that represents the asynchronous add rule operation.</returns>
 457        /// <remarks>
 458        /// You can add rules to the subscription that decides which messages from the topic should reach the subscripti
 459        /// A default <see cref="TrueFilter"/> rule named <see cref="RuleDescription.DefaultRuleName"/> is always added 
 460        /// You can add multiple rules with distinct names to the same subscription.
 461        /// Multiple filters combine with each other using logical OR condition. i.e., If any filter succeeds, the messa
 462        /// Max allowed length of rule name is 50 chars.
 463        /// </remarks>
 464        public Task AddRuleAsync(string ruleName, Filter filter)
 465        {
 0466            return this.AddRuleAsync(new RuleDescription(name: ruleName, filter: filter));
 467        }
 468
 469        /// <summary>
 470        /// Adds a rule to the current subscription to filter the messages reaching from topic to the subscription.
 471        /// </summary>
 472        /// <param name="description">The rule description that provides the rule to add.</param>
 473        /// <returns>A task instance that represents the asynchronous add rule operation.</returns>
 474        /// <remarks>
 475        /// You can add rules to the subscription that decides which messages from the topic should reach the subscripti
 476        /// A default <see cref="TrueFilter"/> rule named <see cref="RuleDescription.DefaultRuleName"/> is always added 
 477        /// You can add multiple rules with distinct names to the same subscription.
 478        /// Multiple filters combine with each other using logical OR condition. i.e., If any filter succeeds, the messa
 479        /// </remarks>
 480        public async Task AddRuleAsync(RuleDescription description)
 481        {
 0482            this.ThrowIfClosed();
 483
 0484            if (description == null)
 485            {
 0486                throw Fx.Exception.ArgumentNull(nameof(description));
 487            }
 488
 0489            EntityNameHelper.CheckValidRuleName(description.Name);
 0490            MessagingEventSource.Log.AddRuleStart(this.ClientId, description.Name);
 491
 0492            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0493            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.AddRuleStart(description) : null;
 0494            Task addRuleTask = null;
 495
 496            try
 497            {
 0498                addRuleTask = this.InnerSubscriptionClient.OnAddRuleAsync(description);
 0499                await addRuleTask.ConfigureAwait(false);
 0500            }
 0501            catch (Exception exception)
 502            {
 0503                if (isDiagnosticSourceEnabled)
 504                {
 0505                    this.diagnosticSource.ReportException(exception);
 506                }
 507
 0508                MessagingEventSource.Log.AddRuleException(this.ClientId, exception);
 0509                throw;
 510            }
 511            finally
 512            {
 0513                this.diagnosticSource.AddRuleStop(activity, description, addRuleTask?.Status);
 514            }
 515
 0516            MessagingEventSource.Log.AddRuleStop(this.ClientId);
 0517        }
 518
 519        /// <summary>
 520        /// Removes the rule on the subscription identified by <paramref name="ruleName" />.
 521        /// </summary>
 522        /// <returns>A task instance that represents the asynchronous remove rule operation.</returns>
 523        public async Task RemoveRuleAsync(string ruleName)
 524        {
 0525            this.ThrowIfClosed();
 526
 0527            if (string.IsNullOrWhiteSpace(ruleName))
 528            {
 0529                throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(ruleName));
 530            }
 531
 0532            MessagingEventSource.Log.RemoveRuleStart(this.ClientId, ruleName);
 0533            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0534            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.RemoveRuleStart(ruleName) : null;
 0535            Task removeRuleTask = null;
 536
 537            try
 538            {
 0539                removeRuleTask = this.InnerSubscriptionClient.OnRemoveRuleAsync(ruleName);
 0540                await removeRuleTask.ConfigureAwait(false);
 0541            }
 0542            catch (Exception exception)
 543            {
 0544                if (isDiagnosticSourceEnabled)
 545                {
 0546                    this.diagnosticSource.ReportException(exception);
 547                }
 548
 0549                MessagingEventSource.Log.RemoveRuleException(this.ClientId, exception);
 550
 0551                throw;
 552            }
 553            finally
 554            {
 0555                this.diagnosticSource.RemoveRuleStop(activity, ruleName, removeRuleTask?.Status);
 556            }
 557
 0558            MessagingEventSource.Log.RemoveRuleStop(this.ClientId);
 0559        }
 560
 561        /// <summary>
 562        /// Get all rules associated with the subscription.
 563        /// </summary>
 564        public async Task<IEnumerable<RuleDescription>> GetRulesAsync()
 565        {
 0566            this.ThrowIfClosed();
 567
 0568            MessagingEventSource.Log.GetRulesStart(this.ClientId);
 0569            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0570            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.GetRulesStart() : null;
 0571            Task<IList<RuleDescription>> getRulesTask = null;
 572
 0573            var skip = 0;
 0574            var top = int.MaxValue;
 0575            IList<RuleDescription> rules = null;
 576
 577            try
 578            {
 0579                getRulesTask = this.InnerSubscriptionClient.OnGetRulesAsync(top, skip);
 0580                rules = await getRulesTask.ConfigureAwait(false);
 0581            }
 0582            catch (Exception exception)
 583            {
 0584                if (isDiagnosticSourceEnabled)
 585                {
 0586                    this.diagnosticSource.ReportException(exception);
 587                }
 588
 0589                MessagingEventSource.Log.GetRulesException(this.ClientId, exception);
 590
 0591                throw;
 592            }
 593            finally
 594            {
 0595                this.diagnosticSource.GetRulesStop(activity, rules, getRulesTask?.Status);
 596            }
 597
 0598            MessagingEventSource.Log.GetRulesStop(this.ClientId);
 0599            return rules;
 0600        }
 601
 602        /// <summary>
 603        /// Gets a list of currently registered plugins for this SubscriptionClient.
 604        /// </summary>
 0605        public override IList<ServiceBusPlugin> RegisteredPlugins => this.InnerSubscriptionClient.InnerReceiver.Register
 606
 607        /// <summary>
 608        /// Registers a <see cref="ServiceBusPlugin"/> to be used for receiving messages from Service Bus.
 609        /// </summary>
 610        /// <param name="serviceBusPlugin">The <see cref="ServiceBusPlugin"/> to register</param>
 611        public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)
 612        {
 0613            this.ThrowIfClosed();
 0614            this.InnerSubscriptionClient.InnerReceiver.RegisterPlugin(serviceBusPlugin);
 0615        }
 616
 617        /// <summary>
 618        /// Unregisters a <see cref="ServiceBusPlugin"/>.
 619        /// </summary>
 620        /// <param name="serviceBusPluginName">The name <see cref="ServiceBusPlugin.Name"/> to be unregistered</param>
 621        public override void UnregisterPlugin(string serviceBusPluginName)
 622        {
 0623            this.ThrowIfClosed();
 0624            this.InnerSubscriptionClient.InnerReceiver.UnregisterPlugin(serviceBusPluginName);
 0625        }
 626
 627        protected override async Task OnClosingAsync()
 628        {
 0629            if (this.innerSubscriptionClient != null)
 630            {
 0631                await this.innerSubscriptionClient.CloseAsync().ConfigureAwait(false);
 632            }
 633
 0634            this.sessionPumpHost?.Close();
 635
 0636            if (this.sessionClient != null)
 637            {
 0638                await this.sessionClient.CloseAsync().ConfigureAwait(false);
 639            }
 0640        }
 641    }
 642}