|  |  | 1 |  | // Copyright (c) Microsoft. All rights reserved. | 
|  |  | 2 |  | // Licensed under the MIT license. See LICENSE file in the project root for full license information. | 
|  |  | 3 |  |  | 
|  |  | 4 |  | namespace Microsoft.Azure.ServiceBus | 
|  |  | 5 |  | { | 
|  |  | 6 |  |     using System; | 
|  |  | 7 |  |     using System.Collections.Generic; | 
|  |  | 8 |  |     using System.Diagnostics; | 
|  |  | 9 |  |     using System.Threading; | 
|  |  | 10 |  |     using System.Threading.Tasks; | 
|  |  | 11 |  |     using Microsoft.Azure.Amqp; | 
|  |  | 12 |  |     using Microsoft.Azure.ServiceBus.Amqp; | 
|  |  | 13 |  |     using Microsoft.Azure.ServiceBus.Core; | 
|  |  | 14 |  |     using Microsoft.Azure.ServiceBus.Primitives; | 
|  |  | 15 |  |  | 
|  |  | 16 |  |     /// <summary> | 
|  |  | 17 |  |     /// SubscriptionClient can be used for all basic interactions with a Service Bus Subscription. | 
|  |  | 18 |  |     /// </summary> | 
|  |  | 19 |  |     /// <example> | 
|  |  | 20 |  |     /// Create a new SubscriptionClient | 
|  |  | 21 |  |     /// <code> | 
|  |  | 22 |  |     /// ISubscriptionClient subscriptionClient = new SubscriptionClient( | 
|  |  | 23 |  |     ///     namespaceConnectionString, | 
|  |  | 24 |  |     ///     topicName, | 
|  |  | 25 |  |     ///     subscriptionName, | 
|  |  | 26 |  |     ///     ReceiveMode.PeekLock, | 
|  |  | 27 |  |     ///     RetryExponential); | 
|  |  | 28 |  |     /// </code> | 
|  |  | 29 |  |     /// | 
|  |  | 30 |  |     /// Register a message handler which will be invoked every time a message is received. | 
|  |  | 31 |  |     /// <code> | 
|  |  | 32 |  |     /// subscriptionClient.RegisterMessageHandler( | 
|  |  | 33 |  |     ///        async (message, token) => | 
|  |  | 34 |  |     ///        { | 
|  |  | 35 |  |     ///            // Process the message | 
|  |  | 36 |  |     ///            Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{ | 
|  |  | 37 |  |     /// | 
|  |  | 38 |  |     ///            // Complete the message so that it is not received again. | 
|  |  | 39 |  |     ///            // This can be done only if the subscriptionClient is opened in ReceiveMode.PeekLock mode. | 
|  |  | 40 |  |     ///            await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); | 
|  |  | 41 |  |     ///        }, | 
|  |  | 42 |  |     ///        async (exceptionEvent) => | 
|  |  | 43 |  |     ///        { | 
|  |  | 44 |  |     ///            // Process the exception | 
|  |  | 45 |  |     ///            Console.WriteLine("Exception = " + exceptionEvent.Exception); | 
|  |  | 46 |  |     ///            return Task.CompletedTask; | 
|  |  | 47 |  |     ///        }); | 
|  |  | 48 |  |     /// </code> | 
|  |  | 49 |  |     /// </example> | 
|  |  | 50 |  |     /// <remarks>It uses AMQP protocol for communicating with service bus. Use <see cref="MessageReceiver"/> for advance | 
|  |  | 51 |  |     public class SubscriptionClient : ClientEntity, ISubscriptionClient | 
|  |  | 52 |  |     { | 
|  |  | 53 |  |         int prefetchCount; | 
|  |  | 54 |  |         readonly object syncLock; | 
|  |  | 55 |  |         readonly ServiceBusDiagnosticSource diagnosticSource; | 
|  |  | 56 |  |  | 
|  |  | 57 |  |         IInnerSubscriptionClient innerSubscriptionClient; | 
|  |  | 58 |  |         SessionClient sessionClient; | 
|  |  | 59 |  |         SessionPumpHost sessionPumpHost; | 
|  |  | 60 |  |  | 
|  |  | 61 |  |         /// <summary> | 
|  |  | 62 |  |         /// Instantiates a new <see cref="SubscriptionClient"/> to perform operations on a subscription. | 
|  |  | 63 |  |         /// </summary> | 
|  |  | 64 |  |         /// <param name="connectionStringBuilder"><see cref="ServiceBusConnectionStringBuilder"/> having namespace and t | 
|  |  | 65 |  |         /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para | 
|  |  | 66 |  |         /// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Defau | 
|  |  | 67 |  |         /// <remarks>Creates a new connection to the subscription, which is opened during the first receive operation.</ | 
|  |  | 68 |  |         public SubscriptionClient(ServiceBusConnectionStringBuilder connectionStringBuilder, string subscriptionName, Re | 
|  | 0 | 69 |  |             : this(connectionStringBuilder?.GetNamespaceConnectionString(), connectionStringBuilder?.EntityPath, subscri | 
|  |  | 70 |  |         { | 
|  | 0 | 71 |  |         } | 
|  |  | 72 |  |  | 
|  |  | 73 |  |         /// <summary> | 
|  |  | 74 |  |         /// Instantiates a new <see cref="SubscriptionClient"/> to perform operations on a subscription. | 
|  |  | 75 |  |         /// </summary> | 
|  |  | 76 |  |         /// <param name="connectionString">Namespace connection string. Must not contain topic or subscription informati | 
|  |  | 77 |  |         /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para | 
|  |  | 78 |  |         /// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Defau | 
|  |  | 79 |  |         /// <remarks>Creates a new connection to the subscription, which is opened during the first receive operation.</ | 
|  |  | 80 |  |         public SubscriptionClient(string connectionString, string topicPath, string subscriptionName, ReceiveMode receiv | 
|  | 0 | 81 |  |             : this(new ServiceBusConnection(connectionString), topicPath, subscriptionName, receiveMode, retryPolicy ??  | 
|  |  | 82 |  |         { | 
|  | 0 | 83 |  |             if (string.IsNullOrWhiteSpace(connectionString)) | 
|  |  | 84 |  |             { | 
|  | 0 | 85 |  |                 throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString); | 
|  |  | 86 |  |             } | 
|  |  | 87 |  |  | 
|  | 0 | 88 |  |             this.OwnsConnection = true; | 
|  | 0 | 89 |  |         } | 
|  |  | 90 |  |  | 
|  |  | 91 |  |         /// <summary> | 
|  |  | 92 |  |         /// Creates a new instance of the Subscription client using the specified endpoint, entity path, and token provi | 
|  |  | 93 |  |         /// </summary> | 
|  |  | 94 |  |         /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus. | 
|  |  | 95 |  |         /// <param name="topicPath">Topic path.</param> | 
|  |  | 96 |  |         /// <param name="subscriptionName">Subscription name.</param> | 
|  |  | 97 |  |         /// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param> | 
|  |  | 98 |  |         /// <param name="transportType">Transport type.</param> | 
|  |  | 99 |  |         /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para | 
|  |  | 100 |  |         /// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Defau | 
|  |  | 101 |  |         /// <remarks>Creates a new connection to the subscription, which is opened during the first receive operation.</ | 
|  |  | 102 |  |         public SubscriptionClient( | 
|  |  | 103 |  |             string endpoint, | 
|  |  | 104 |  |             string topicPath, | 
|  |  | 105 |  |             string subscriptionName, | 
|  |  | 106 |  |             ITokenProvider tokenProvider, | 
|  |  | 107 |  |             TransportType transportType = TransportType.Amqp, | 
|  |  | 108 |  |             ReceiveMode receiveMode = ReceiveMode.PeekLock, | 
|  |  | 109 |  |             RetryPolicy retryPolicy = null) | 
|  | 0 | 110 |  |             : this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, topic | 
|  |  | 111 |  |         { | 
|  | 0 | 112 |  |             this.OwnsConnection = true; | 
|  | 0 | 113 |  |         } | 
|  |  | 114 |  |  | 
|  |  | 115 |  |         /// <summary> | 
|  |  | 116 |  |         /// Creates a new instance of the Subscription client on a given <see cref="ServiceBusConnection"/> | 
|  |  | 117 |  |         /// </summary> | 
|  |  | 118 |  |         /// <param name="serviceBusConnection">Connection object to the service bus namespace.</param> | 
|  |  | 119 |  |         /// <param name="topicPath">Topic path.</param> | 
|  |  | 120 |  |         /// <param name="subscriptionName">Subscription name.</param> | 
|  |  | 121 |  |         /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para | 
|  |  | 122 |  |         /// <param name="retryPolicy">Retry policy for subscription operations. Defaults to <see cref="RetryPolicy.Defau | 
|  |  | 123 |  |         public SubscriptionClient(ServiceBusConnection serviceBusConnection, string topicPath, string subscriptionName,  | 
|  | 0 | 124 |  |             : base(nameof(SubscriptionClient), $"{topicPath}/{subscriptionName}", retryPolicy) | 
|  |  | 125 |  |         { | 
|  | 0 | 126 |  |             if (string.IsNullOrWhiteSpace(topicPath)) | 
|  |  | 127 |  |             { | 
|  | 0 | 128 |  |                 throw Fx.Exception.ArgumentNullOrWhiteSpace(topicPath); | 
|  |  | 129 |  |             } | 
|  | 0 | 130 |  |             if (string.IsNullOrWhiteSpace(subscriptionName)) | 
|  |  | 131 |  |             { | 
|  | 0 | 132 |  |                 throw Fx.Exception.ArgumentNullOrWhiteSpace(subscriptionName); | 
|  |  | 133 |  |             } | 
|  |  | 134 |  |  | 
|  | 0 | 135 |  |             MessagingEventSource.Log.SubscriptionClientCreateStart(serviceBusConnection?.Endpoint.Authority, topicPath,  | 
|  |  | 136 |  |  | 
|  | 0 | 137 |  |             this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnect | 
|  | 0 | 138 |  |             this.syncLock = new object(); | 
|  | 0 | 139 |  |             this.TopicPath = topicPath; | 
|  | 0 | 140 |  |             this.SubscriptionName = subscriptionName; | 
|  | 0 | 141 |  |             this.Path = EntityNameHelper.FormatSubscriptionPath(this.TopicPath, this.SubscriptionName); | 
|  | 0 | 142 |  |             this.ReceiveMode = receiveMode; | 
|  | 0 | 143 |  |             this.diagnosticSource = new ServiceBusDiagnosticSource(this.Path, serviceBusConnection.Endpoint); | 
|  | 0 | 144 |  |             this.OwnsConnection = false; | 
|  | 0 | 145 |  |             this.ServiceBusConnection.ThrowIfClosed(); | 
|  |  | 146 |  |  | 
|  | 0 | 147 |  |             if (this.ServiceBusConnection.TokenProvider != null) | 
|  |  | 148 |  |             { | 
|  | 0 | 149 |  |                 this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBu | 
|  |  | 150 |  |             } | 
|  |  | 151 |  |             else | 
|  |  | 152 |  |             { | 
|  | 0 | 153 |  |                 throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider"); | 
|  |  | 154 |  |             } | 
|  |  | 155 |  |  | 
|  | 0 | 156 |  |             MessagingEventSource.Log.SubscriptionClientCreateStop(serviceBusConnection.Endpoint.Authority, topicPath, su | 
|  | 0 | 157 |  |         } | 
|  |  | 158 |  |  | 
|  |  | 159 |  |         /// <summary> | 
|  |  | 160 |  |         /// Gets the path of the corresponding topic. | 
|  |  | 161 |  |         /// </summary> | 
|  | 0 | 162 |  |         public string TopicPath { get; } | 
|  |  | 163 |  |  | 
|  |  | 164 |  |         /// <summary> | 
|  |  | 165 |  |         /// Gets the formatted path of the subscription client. | 
|  |  | 166 |  |         /// </summary> | 
|  |  | 167 |  |         /// <seealso cref="EntityNameHelper.FormatSubscriptionPath(string, string)"/> | 
|  | 0 | 168 |  |         public override string Path { get; } | 
|  |  | 169 |  |  | 
|  |  | 170 |  |         /// <summary> | 
|  |  | 171 |  |         /// Gets the name of the subscription. | 
|  |  | 172 |  |         /// </summary> | 
|  | 0 | 173 |  |         public string SubscriptionName { get; } | 
|  |  | 174 |  |  | 
|  |  | 175 |  |         /// <summary> | 
|  |  | 176 |  |         /// Gets the <see cref="ServiceBus.ReceiveMode"/> for the SubscriptionClient. | 
|  |  | 177 |  |         /// </summary> | 
|  | 0 | 178 |  |         public ReceiveMode ReceiveMode { get; } | 
|  |  | 179 |  |  | 
|  |  | 180 |  |         /// <summary> | 
|  |  | 181 |  |         /// Duration after which individual operations will timeout. | 
|  |  | 182 |  |         /// </summary> | 
|  |  | 183 |  |         public override TimeSpan OperationTimeout | 
|  |  | 184 |  |         { | 
|  | 0 | 185 |  |             get => this.ServiceBusConnection.OperationTimeout; | 
|  | 0 | 186 |  |             set => this.ServiceBusConnection.OperationTimeout = value; | 
|  |  | 187 |  |         } | 
|  |  | 188 |  |  | 
|  |  | 189 |  |         /// <summary> | 
|  |  | 190 |  |         /// Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when a | 
|  |  | 191 |  |         /// Setting a non-zero value prefetches PrefetchCount number of messages. | 
|  |  | 192 |  |         /// Setting the value to zero turns prefetch off. | 
|  |  | 193 |  |         /// Defaults to 0. | 
|  |  | 194 |  |         /// </summary> | 
|  |  | 195 |  |         /// <remarks> | 
|  |  | 196 |  |         /// <para> | 
|  |  | 197 |  |         /// When Prefetch is enabled, the client will quietly acquire more messages, up to the PrefetchCount limit, than | 
|  |  | 198 |  |         /// immediately asks for. The message pump will therefore acquire a message for immediate consumption | 
|  |  | 199 |  |         /// that will be returned as soon as available, and the client will proceed to acquire further messages to fill  | 
|  |  | 200 |  |         /// </para> | 
|  |  | 201 |  |         /// <para> | 
|  |  | 202 |  |         /// While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately s | 
|  |  | 203 |  |         /// replenished in the background as space becomes available.If there are no messages available for delivery, th | 
|  |  | 204 |  |         /// buffer and then wait or block as expected. | 
|  |  | 205 |  |         /// </para> | 
|  |  | 206 |  |         /// <para>Updates to this value take effect on the next receive call to the service.</para> | 
|  |  | 207 |  |         /// </remarks> | 
|  |  | 208 |  |         public int PrefetchCount | 
|  |  | 209 |  |         { | 
|  | 0 | 210 |  |             get => this.prefetchCount; | 
|  |  | 211 |  |             set | 
|  |  | 212 |  |             { | 
|  | 0 | 213 |  |                 if (value < 0) | 
|  |  | 214 |  |                 { | 
|  | 0 | 215 |  |                     throw Fx.Exception.ArgumentOutOfRange(nameof(this.PrefetchCount), value, "Value cannot be less than  | 
|  |  | 216 |  |                 } | 
|  | 0 | 217 |  |                 this.prefetchCount = value; | 
|  | 0 | 218 |  |                 if (this.innerSubscriptionClient != null) | 
|  |  | 219 |  |                 { | 
|  | 0 | 220 |  |                     this.innerSubscriptionClient.PrefetchCount = value; | 
|  |  | 221 |  |                 } | 
|  | 0 | 222 |  |                 if (this.sessionClient != null) | 
|  |  | 223 |  |                 { | 
|  | 0 | 224 |  |                     this.sessionClient.PrefetchCount = value; | 
|  |  | 225 |  |                 } | 
|  | 0 | 226 |  |             } | 
|  |  | 227 |  |         } | 
|  |  | 228 |  |  | 
|  |  | 229 |  |         /// <summary> | 
|  |  | 230 |  |         /// Connection object to the service bus namespace. | 
|  |  | 231 |  |         /// </summary> | 
|  | 0 | 232 |  |         public override ServiceBusConnection ServiceBusConnection { get; } | 
|  |  | 233 |  |  | 
|  |  | 234 |  |         internal IInnerSubscriptionClient InnerSubscriptionClient | 
|  |  | 235 |  |         { | 
|  |  | 236 |  |             get | 
|  |  | 237 |  |             { | 
|  | 0 | 238 |  |                 if (this.innerSubscriptionClient == null) | 
|  |  | 239 |  |                 { | 
|  | 0 | 240 |  |                     lock (this.syncLock) | 
|  |  | 241 |  |                     { | 
|  | 0 | 242 |  |                         this.innerSubscriptionClient = new AmqpSubscriptionClient( | 
|  | 0 | 243 |  |                             this.Path, | 
|  | 0 | 244 |  |                             this.ServiceBusConnection, | 
|  | 0 | 245 |  |                             this.RetryPolicy, | 
|  | 0 | 246 |  |                             this.CbsTokenProvider, | 
|  | 0 | 247 |  |                             this.PrefetchCount, | 
|  | 0 | 248 |  |                             this.ReceiveMode); | 
|  | 0 | 249 |  |                     } | 
|  |  | 250 |  |                 } | 
|  |  | 251 |  |  | 
|  | 0 | 252 |  |                 return this.innerSubscriptionClient; | 
|  |  | 253 |  |             } | 
|  |  | 254 |  |         } | 
|  |  | 255 |  |  | 
|  |  | 256 |  |         internal SessionClient SessionClient | 
|  |  | 257 |  |         { | 
|  |  | 258 |  |             get | 
|  |  | 259 |  |             { | 
|  | 0 | 260 |  |                 if (this.sessionClient == null) | 
|  |  | 261 |  |                 { | 
|  | 0 | 262 |  |                     lock (this.syncLock) | 
|  |  | 263 |  |                     { | 
|  | 0 | 264 |  |                         if (this.sessionClient == null) | 
|  |  | 265 |  |                         { | 
|  | 0 | 266 |  |                             this.sessionClient = new SessionClient( | 
|  | 0 | 267 |  |                                 this.ClientId, | 
|  | 0 | 268 |  |                                 this.Path, | 
|  | 0 | 269 |  |                                 MessagingEntityType.Subscriber, | 
|  | 0 | 270 |  |                                 this.ReceiveMode, | 
|  | 0 | 271 |  |                                 this.PrefetchCount, | 
|  | 0 | 272 |  |                                 this.ServiceBusConnection, | 
|  | 0 | 273 |  |                                 this.CbsTokenProvider, | 
|  | 0 | 274 |  |                                 this.RetryPolicy, | 
|  | 0 | 275 |  |                                 this.RegisteredPlugins); | 
|  |  | 276 |  |                         } | 
|  | 0 | 277 |  |                     } | 
|  |  | 278 |  |                 } | 
|  |  | 279 |  |  | 
|  | 0 | 280 |  |                 return this.sessionClient; | 
|  |  | 281 |  |             } | 
|  |  | 282 |  |         } | 
|  |  | 283 |  |  | 
|  |  | 284 |  |         internal SessionPumpHost SessionPumpHost | 
|  |  | 285 |  |         { | 
|  |  | 286 |  |             get | 
|  |  | 287 |  |             { | 
|  | 0 | 288 |  |                 if (this.sessionPumpHost == null) | 
|  |  | 289 |  |                 { | 
|  | 0 | 290 |  |                     lock (this.syncLock) | 
|  |  | 291 |  |                     { | 
|  | 0 | 292 |  |                         if (this.sessionPumpHost == null) | 
|  |  | 293 |  |                         { | 
|  | 0 | 294 |  |                             this.sessionPumpHost = new SessionPumpHost( | 
|  | 0 | 295 |  |                                 this.ClientId, | 
|  | 0 | 296 |  |                                 this.ReceiveMode, | 
|  | 0 | 297 |  |                                 this.SessionClient, | 
|  | 0 | 298 |  |                                 this.ServiceBusConnection.Endpoint); | 
|  |  | 299 |  |                         } | 
|  | 0 | 300 |  |                     } | 
|  |  | 301 |  |                 } | 
|  |  | 302 |  |  | 
|  | 0 | 303 |  |                 return this.sessionPumpHost; | 
|  |  | 304 |  |             } | 
|  |  | 305 |  |         } | 
|  |  | 306 |  |  | 
|  | 0 | 307 |  |         ICbsTokenProvider CbsTokenProvider { get; } | 
|  |  | 308 |  |  | 
|  |  | 309 |  |         /// <summary> | 
|  |  | 310 |  |         /// Completes a <see cref="Message"/> using its lock token. This will delete the message from the subscription. | 
|  |  | 311 |  |         /// </summary> | 
|  |  | 312 |  |         /// <param name="lockToken">The lock token of the corresponding message to complete.</param> | 
|  |  | 313 |  |         /// <remarks> | 
|  |  | 314 |  |         /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>, | 
|  |  | 315 |  |         /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>. | 
|  |  | 316 |  |         /// This operation can only be performed on messages that were received by this client. | 
|  |  | 317 |  |         /// </remarks> | 
|  |  | 318 |  |         public Task CompleteAsync(string lockToken) | 
|  |  | 319 |  |         { | 
|  | 0 | 320 |  |             this.ThrowIfClosed(); | 
|  | 0 | 321 |  |             return this.InnerSubscriptionClient.InnerReceiver.CompleteAsync(lockToken); | 
|  |  | 322 |  |         } | 
|  |  | 323 |  |  | 
|  |  | 324 |  |         /// <summary> | 
|  |  | 325 |  |         /// Abandons a <see cref="Message"/> using a lock token. This will make the message available again for processi | 
|  |  | 326 |  |         /// </summary> | 
|  |  | 327 |  |         /// <param name="lockToken">The lock token of the corresponding message to abandon.</param> | 
|  |  | 328 |  |         /// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</para | 
|  |  | 329 |  |         /// <remarks>A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>, | 
|  |  | 330 |  |         /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>. | 
|  |  | 331 |  |         /// Abandoning a message will increase the delivery count on the message. | 
|  |  | 332 |  |         /// This operation can only be performed on messages that were received by this client. | 
|  |  | 333 |  |         /// </remarks> | 
|  |  | 334 |  |         public Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null) | 
|  |  | 335 |  |         { | 
|  | 0 | 336 |  |             this.ThrowIfClosed(); | 
|  | 0 | 337 |  |             return this.InnerSubscriptionClient.InnerReceiver.AbandonAsync(lockToken, propertiesToModify); | 
|  |  | 338 |  |         } | 
|  |  | 339 |  |  | 
|  |  | 340 |  |         /// <summary> | 
|  |  | 341 |  |         /// Moves a message to the deadletter sub-queue. | 
|  |  | 342 |  |         /// </summary> | 
|  |  | 343 |  |         /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param> | 
|  |  | 344 |  |         /// <remarks> | 
|  |  | 345 |  |         /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>, | 
|  |  | 346 |  |         /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>. | 
|  |  | 347 |  |         /// In order to receive a message from the deadletter sub-queue, you will need a new <see cref="IMessageReceiver | 
|  |  | 348 |  |         /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this. | 
|  |  | 349 |  |         /// This operation can only be performed on messages that were received by this client. | 
|  |  | 350 |  |         /// </remarks> | 
|  |  | 351 |  |         public Task DeadLetterAsync(string lockToken) | 
|  |  | 352 |  |         { | 
|  | 0 | 353 |  |             this.ThrowIfClosed(); | 
|  | 0 | 354 |  |             return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken); | 
|  |  | 355 |  |         } | 
|  |  | 356 |  |  | 
|  |  | 357 |  |         /// <summary> | 
|  |  | 358 |  |         /// Moves a message to the deadletter sub-queue. | 
|  |  | 359 |  |         /// </summary> | 
|  |  | 360 |  |         /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param> | 
|  |  | 361 |  |         /// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param> | 
|  |  | 362 |  |         /// <remarks> | 
|  |  | 363 |  |         /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>, | 
|  |  | 364 |  |         /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>. | 
|  |  | 365 |  |         /// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, | 
|  |  | 366 |  |         /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this. | 
|  |  | 367 |  |         /// This operation can only be performed on messages that were received by this receiver. | 
|  |  | 368 |  |         /// </remarks> | 
|  |  | 369 |  |         public Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify) | 
|  |  | 370 |  |         { | 
|  | 0 | 371 |  |             this.ThrowIfClosed(); | 
|  | 0 | 372 |  |             return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken, propertiesToModify); | 
|  |  | 373 |  |         } | 
|  |  | 374 |  |  | 
|  |  | 375 |  |         /// <summary> | 
|  |  | 376 |  |         /// Moves a message to the deadletter sub-queue. | 
|  |  | 377 |  |         /// </summary> | 
|  |  | 378 |  |         /// <param name="lockToken">The lock token of the corresponding message to deadletter.</param> | 
|  |  | 379 |  |         /// <param name="deadLetterReason">The reason for deadlettering the message.</param> | 
|  |  | 380 |  |         /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param> | 
|  |  | 381 |  |         /// <remarks> | 
|  |  | 382 |  |         /// A lock token can be found in <see cref="Message.SystemPropertiesCollection.LockToken"/>, | 
|  |  | 383 |  |         /// only when <see cref="ReceiveMode"/> is set to <see cref="ServiceBus.ReceiveMode.PeekLock"/>. | 
|  |  | 384 |  |         /// In order to receive a message from the deadletter queue, you will need a new <see cref="IMessageReceiver"/>, | 
|  |  | 385 |  |         /// You can use <see cref="EntityNameHelper.FormatDeadLetterPath(string)"/> to help with this. | 
|  |  | 386 |  |         /// This operation can only be performed on messages that were received by this receiver. | 
|  |  | 387 |  |         /// </remarks> | 
|  |  | 388 |  |         public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) | 
|  |  | 389 |  |         { | 
|  | 0 | 390 |  |             this.ThrowIfClosed(); | 
|  | 0 | 391 |  |             return this.InnerSubscriptionClient.InnerReceiver.DeadLetterAsync(lockToken, deadLetterReason, deadLetterErr | 
|  |  | 392 |  |         } | 
|  |  | 393 |  |  | 
|  |  | 394 |  |         /// <summary> | 
|  |  | 395 |  |         /// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receiv | 
|  |  | 396 |  |         /// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is | 
|  |  | 397 |  |         /// </summary> | 
|  |  | 398 |  |         /// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param | 
|  |  | 399 |  |         /// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions. | 
|  |  | 400 |  |         /// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param> | 
|  |  | 401 |  |         /// <remarks>Enable prefetch to speed up the receive rate. | 
|  |  | 402 |  |         /// Use <see cref="RegisterMessageHandler(Func{Message,CancellationToken,Task}, MessageHandlerOptions)"/> to con | 
|  |  | 403 |  |         public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventAr | 
|  |  | 404 |  |         { | 
|  | 0 | 405 |  |             this.ThrowIfClosed(); | 
|  | 0 | 406 |  |             this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, exceptionReceivedHandler); | 
|  | 0 | 407 |  |         } | 
|  |  | 408 |  |  | 
|  |  | 409 |  |         /// <summary> | 
|  |  | 410 |  |         /// Receive messages continuously from the entity. Registers a message handler and begins a new thread to receiv | 
|  |  | 411 |  |         /// This handler(<see cref="Func{Message, CancellationToken, Task}"/>) is awaited on every time a new message is | 
|  |  | 412 |  |         /// </summary> | 
|  |  | 413 |  |         /// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param | 
|  |  | 414 |  |         /// <param name="messageHandlerOptions">The <see cref="MessageHandlerOptions"/> options used to configure the se | 
|  |  | 415 |  |         /// <remarks>Enable prefetch to speed up the receive rate.</remarks> | 
|  |  | 416 |  |         public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions message | 
|  |  | 417 |  |         { | 
|  | 0 | 418 |  |             this.ThrowIfClosed(); | 
|  | 0 | 419 |  |             this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions); | 
|  | 0 | 420 |  |         } | 
|  |  | 421 |  |  | 
|  |  | 422 |  |         /// <summary> | 
|  |  | 423 |  |         /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to | 
|  |  | 424 |  |         /// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time | 
|  |  | 425 |  |         /// </summary> | 
|  |  | 426 |  |         /// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes | 
|  |  | 427 |  |         /// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon | 
|  |  | 428 |  |         /// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions. | 
|  |  | 429 |  |         /// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param> | 
|  |  | 430 |  |         /// <remarks>  Enable prefetch to speed up the receive rate. | 
|  |  | 431 |  |         /// Use <see cref="RegisterSessionHandler(Func{IMessageSession,Message,CancellationToken,Task}, SessionHandlerOp | 
|  |  | 432 |  |         public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, Func<Excepti | 
|  |  | 433 |  |         { | 
|  | 0 | 434 |  |             var sessionHandlerOptions = new SessionHandlerOptions(exceptionReceivedHandler); | 
|  | 0 | 435 |  |             this.RegisterSessionHandler(handler, sessionHandlerOptions); | 
|  | 0 | 436 |  |         } | 
|  |  | 437 |  |  | 
|  |  | 438 |  |         /// <summary> | 
|  |  | 439 |  |         /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to | 
|  |  | 440 |  |         /// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time | 
|  |  | 441 |  |         /// </summary> | 
|  |  | 442 |  |         /// <param name="handler">A <see cref="Func{IMessageSession, Message, CancellationToken, Task}"/> that processes | 
|  |  | 443 |  |         /// <see cref="IMessageSession"/> contains the session information, and must be used to perform Complete/Abandon | 
|  |  | 444 |  |         /// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param> | 
|  |  | 445 |  |         /// <remarks>Enable prefetch to speed up the receive rate. </remarks> | 
|  |  | 446 |  |         public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandl | 
|  |  | 447 |  |         { | 
|  | 0 | 448 |  |             this.ThrowIfClosed(); | 
|  | 0 | 449 |  |             this.SessionPumpHost.OnSessionHandler(handler, sessionHandlerOptions); | 
|  | 0 | 450 |  |         } | 
|  |  | 451 |  |  | 
|  |  | 452 |  |         /// <summary> | 
|  |  | 453 |  |         /// Adds a rule to the current subscription to filter the messages reaching from topic to the subscription. | 
|  |  | 454 |  |         /// </summary> | 
|  |  | 455 |  |         /// <param name="filter">The filter expression against which messages will be matched.</param> | 
|  |  | 456 |  |         /// <returns>A task instance that represents the asynchronous add rule operation.</returns> | 
|  |  | 457 |  |         /// <remarks> | 
|  |  | 458 |  |         /// You can add rules to the subscription that decides which messages from the topic should reach the subscripti | 
|  |  | 459 |  |         /// A default <see cref="TrueFilter"/> rule named <see cref="RuleDescription.DefaultRuleName"/> is always added  | 
|  |  | 460 |  |         /// You can add multiple rules with distinct names to the same subscription. | 
|  |  | 461 |  |         /// Multiple filters combine with each other using logical OR condition. i.e., If any filter succeeds, the messa | 
|  |  | 462 |  |         /// Max allowed length of rule name is 50 chars. | 
|  |  | 463 |  |         /// </remarks> | 
|  |  | 464 |  |         public Task AddRuleAsync(string ruleName, Filter filter) | 
|  |  | 465 |  |         { | 
|  | 0 | 466 |  |             return this.AddRuleAsync(new RuleDescription(name: ruleName, filter: filter)); | 
|  |  | 467 |  |         } | 
|  |  | 468 |  |  | 
|  |  | 469 |  |         /// <summary> | 
|  |  | 470 |  |         /// Adds a rule to the current subscription to filter the messages reaching from topic to the subscription. | 
|  |  | 471 |  |         /// </summary> | 
|  |  | 472 |  |         /// <param name="description">The rule description that provides the rule to add.</param> | 
|  |  | 473 |  |         /// <returns>A task instance that represents the asynchronous add rule operation.</returns> | 
|  |  | 474 |  |         /// <remarks> | 
|  |  | 475 |  |         /// You can add rules to the subscription that decides which messages from the topic should reach the subscripti | 
|  |  | 476 |  |         /// A default <see cref="TrueFilter"/> rule named <see cref="RuleDescription.DefaultRuleName"/> is always added  | 
|  |  | 477 |  |         /// You can add multiple rules with distinct names to the same subscription. | 
|  |  | 478 |  |         /// Multiple filters combine with each other using logical OR condition. i.e., If any filter succeeds, the messa | 
|  |  | 479 |  |         /// </remarks> | 
|  |  | 480 |  |         public async Task AddRuleAsync(RuleDescription description) | 
|  |  | 481 |  |         { | 
|  | 0 | 482 |  |             this.ThrowIfClosed(); | 
|  |  | 483 |  |  | 
|  | 0 | 484 |  |             if (description == null) | 
|  |  | 485 |  |             { | 
|  | 0 | 486 |  |                 throw Fx.Exception.ArgumentNull(nameof(description)); | 
|  |  | 487 |  |             } | 
|  |  | 488 |  |  | 
|  | 0 | 489 |  |             EntityNameHelper.CheckValidRuleName(description.Name); | 
|  | 0 | 490 |  |             MessagingEventSource.Log.AddRuleStart(this.ClientId, description.Name); | 
|  |  | 491 |  |  | 
|  | 0 | 492 |  |             bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); | 
|  | 0 | 493 |  |             Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.AddRuleStart(description) : null; | 
|  | 0 | 494 |  |             Task addRuleTask = null; | 
|  |  | 495 |  |  | 
|  |  | 496 |  |             try | 
|  |  | 497 |  |             { | 
|  | 0 | 498 |  |                 addRuleTask = this.InnerSubscriptionClient.OnAddRuleAsync(description); | 
|  | 0 | 499 |  |                 await addRuleTask.ConfigureAwait(false); | 
|  | 0 | 500 |  |             } | 
|  | 0 | 501 |  |             catch (Exception exception) | 
|  |  | 502 |  |             { | 
|  | 0 | 503 |  |                 if (isDiagnosticSourceEnabled) | 
|  |  | 504 |  |                 { | 
|  | 0 | 505 |  |                     this.diagnosticSource.ReportException(exception); | 
|  |  | 506 |  |                 } | 
|  |  | 507 |  |  | 
|  | 0 | 508 |  |                 MessagingEventSource.Log.AddRuleException(this.ClientId, exception); | 
|  | 0 | 509 |  |                 throw; | 
|  |  | 510 |  |             } | 
|  |  | 511 |  |             finally | 
|  |  | 512 |  |             { | 
|  | 0 | 513 |  |                 this.diagnosticSource.AddRuleStop(activity, description, addRuleTask?.Status); | 
|  |  | 514 |  |             } | 
|  |  | 515 |  |  | 
|  | 0 | 516 |  |             MessagingEventSource.Log.AddRuleStop(this.ClientId); | 
|  | 0 | 517 |  |         } | 
|  |  | 518 |  |  | 
|  |  | 519 |  |         /// <summary> | 
|  |  | 520 |  |         /// Removes the rule on the subscription identified by <paramref name="ruleName" />. | 
|  |  | 521 |  |         /// </summary> | 
|  |  | 522 |  |         /// <returns>A task instance that represents the asynchronous remove rule operation.</returns> | 
|  |  | 523 |  |         public async Task RemoveRuleAsync(string ruleName) | 
|  |  | 524 |  |         { | 
|  | 0 | 525 |  |             this.ThrowIfClosed(); | 
|  |  | 526 |  |  | 
|  | 0 | 527 |  |             if (string.IsNullOrWhiteSpace(ruleName)) | 
|  |  | 528 |  |             { | 
|  | 0 | 529 |  |                 throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(ruleName)); | 
|  |  | 530 |  |             } | 
|  |  | 531 |  |  | 
|  | 0 | 532 |  |             MessagingEventSource.Log.RemoveRuleStart(this.ClientId, ruleName); | 
|  | 0 | 533 |  |             bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); | 
|  | 0 | 534 |  |             Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.RemoveRuleStart(ruleName) : null; | 
|  | 0 | 535 |  |             Task removeRuleTask = null; | 
|  |  | 536 |  |  | 
|  |  | 537 |  |             try | 
|  |  | 538 |  |             { | 
|  | 0 | 539 |  |                 removeRuleTask = this.InnerSubscriptionClient.OnRemoveRuleAsync(ruleName); | 
|  | 0 | 540 |  |                 await removeRuleTask.ConfigureAwait(false); | 
|  | 0 | 541 |  |             } | 
|  | 0 | 542 |  |             catch (Exception exception) | 
|  |  | 543 |  |             { | 
|  | 0 | 544 |  |                 if (isDiagnosticSourceEnabled) | 
|  |  | 545 |  |                 { | 
|  | 0 | 546 |  |                     this.diagnosticSource.ReportException(exception); | 
|  |  | 547 |  |                 } | 
|  |  | 548 |  |  | 
|  | 0 | 549 |  |                 MessagingEventSource.Log.RemoveRuleException(this.ClientId, exception); | 
|  |  | 550 |  |  | 
|  | 0 | 551 |  |                 throw; | 
|  |  | 552 |  |             } | 
|  |  | 553 |  |             finally | 
|  |  | 554 |  |             { | 
|  | 0 | 555 |  |                 this.diagnosticSource.RemoveRuleStop(activity, ruleName, removeRuleTask?.Status); | 
|  |  | 556 |  |             } | 
|  |  | 557 |  |  | 
|  | 0 | 558 |  |             MessagingEventSource.Log.RemoveRuleStop(this.ClientId); | 
|  | 0 | 559 |  |         } | 
|  |  | 560 |  |  | 
|  |  | 561 |  |         /// <summary> | 
|  |  | 562 |  |         /// Get all rules associated with the subscription. | 
|  |  | 563 |  |         /// </summary> | 
|  |  | 564 |  |         public async Task<IEnumerable<RuleDescription>> GetRulesAsync() | 
|  |  | 565 |  |         { | 
|  | 0 | 566 |  |             this.ThrowIfClosed(); | 
|  |  | 567 |  |  | 
|  | 0 | 568 |  |             MessagingEventSource.Log.GetRulesStart(this.ClientId); | 
|  | 0 | 569 |  |             bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); | 
|  | 0 | 570 |  |             Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.GetRulesStart() : null; | 
|  | 0 | 571 |  |             Task<IList<RuleDescription>> getRulesTask = null; | 
|  |  | 572 |  |  | 
|  | 0 | 573 |  |             var skip = 0; | 
|  | 0 | 574 |  |             var top = int.MaxValue; | 
|  | 0 | 575 |  |             IList<RuleDescription> rules = null; | 
|  |  | 576 |  |  | 
|  |  | 577 |  |             try | 
|  |  | 578 |  |             { | 
|  | 0 | 579 |  |                 getRulesTask = this.InnerSubscriptionClient.OnGetRulesAsync(top, skip); | 
|  | 0 | 580 |  |                 rules = await getRulesTask.ConfigureAwait(false); | 
|  | 0 | 581 |  |             } | 
|  | 0 | 582 |  |             catch (Exception exception) | 
|  |  | 583 |  |             { | 
|  | 0 | 584 |  |                 if (isDiagnosticSourceEnabled) | 
|  |  | 585 |  |                 { | 
|  | 0 | 586 |  |                     this.diagnosticSource.ReportException(exception); | 
|  |  | 587 |  |                 } | 
|  |  | 588 |  |  | 
|  | 0 | 589 |  |                 MessagingEventSource.Log.GetRulesException(this.ClientId, exception); | 
|  |  | 590 |  |  | 
|  | 0 | 591 |  |                 throw; | 
|  |  | 592 |  |             } | 
|  |  | 593 |  |             finally | 
|  |  | 594 |  |             { | 
|  | 0 | 595 |  |                 this.diagnosticSource.GetRulesStop(activity, rules, getRulesTask?.Status); | 
|  |  | 596 |  |             } | 
|  |  | 597 |  |  | 
|  | 0 | 598 |  |             MessagingEventSource.Log.GetRulesStop(this.ClientId); | 
|  | 0 | 599 |  |             return rules; | 
|  | 0 | 600 |  |         } | 
|  |  | 601 |  |  | 
|  |  | 602 |  |         /// <summary> | 
|  |  | 603 |  |         /// Gets a list of currently registered plugins for this SubscriptionClient. | 
|  |  | 604 |  |         /// </summary> | 
|  | 0 | 605 |  |         public override IList<ServiceBusPlugin> RegisteredPlugins => this.InnerSubscriptionClient.InnerReceiver.Register | 
|  |  | 606 |  |  | 
|  |  | 607 |  |         /// <summary> | 
|  |  | 608 |  |         /// Registers a <see cref="ServiceBusPlugin"/> to be used for receiving messages from Service Bus. | 
|  |  | 609 |  |         /// </summary> | 
|  |  | 610 |  |         /// <param name="serviceBusPlugin">The <see cref="ServiceBusPlugin"/> to register</param> | 
|  |  | 611 |  |         public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin) | 
|  |  | 612 |  |         { | 
|  | 0 | 613 |  |             this.ThrowIfClosed(); | 
|  | 0 | 614 |  |             this.InnerSubscriptionClient.InnerReceiver.RegisterPlugin(serviceBusPlugin); | 
|  | 0 | 615 |  |         } | 
|  |  | 616 |  |  | 
|  |  | 617 |  |         /// <summary> | 
|  |  | 618 |  |         /// Unregisters a <see cref="ServiceBusPlugin"/>. | 
|  |  | 619 |  |         /// </summary> | 
|  |  | 620 |  |         /// <param name="serviceBusPluginName">The name <see cref="ServiceBusPlugin.Name"/> to be unregistered</param> | 
|  |  | 621 |  |         public override void UnregisterPlugin(string serviceBusPluginName) | 
|  |  | 622 |  |         { | 
|  | 0 | 623 |  |             this.ThrowIfClosed(); | 
|  | 0 | 624 |  |             this.InnerSubscriptionClient.InnerReceiver.UnregisterPlugin(serviceBusPluginName); | 
|  | 0 | 625 |  |         } | 
|  |  | 626 |  |  | 
|  |  | 627 |  |         protected override async Task OnClosingAsync() | 
|  |  | 628 |  |         { | 
|  | 0 | 629 |  |             if (this.innerSubscriptionClient != null) | 
|  |  | 630 |  |             { | 
|  | 0 | 631 |  |                 await this.innerSubscriptionClient.CloseAsync().ConfigureAwait(false); | 
|  |  | 632 |  |             } | 
|  |  | 633 |  |  | 
|  | 0 | 634 |  |             this.sessionPumpHost?.Close(); | 
|  |  | 635 |  |  | 
|  | 0 | 636 |  |             if (this.sessionClient != null) | 
|  |  | 637 |  |             { | 
|  | 0 | 638 |  |                 await this.sessionClient.CloseAsync().ConfigureAwait(false); | 
|  |  | 639 |  |             } | 
|  | 0 | 640 |  |         } | 
|  |  | 641 |  |     } | 
|  |  | 642 |  | } |