< Summary

Class:Microsoft.Azure.ServiceBus.SessionClient
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\SessionClient.cs
Covered lines:0
Uncovered lines:130
Coverable lines:130
Total lines:403
Line coverage:0% (0 of 130)
Covered branches:0
Total branches:42
Branch coverage:0% (0 of 42)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%0%
.ctor(...)-0%0%
.ctor(...)-0%100%
.ctor(...)-0%100%
.ctor(...)-0%0%
get_ReceiveMode()-0%100%
get_EntityPath()-0%100%
get_Path()-0%100%
get_OperationTimeout()-0%100%
set_OperationTimeout(...)-0%100%
get_ServiceBusConnection()-0%100%
get_EntityType()-0%100%
get_PrefetchCount()-0%100%
get_CbsTokenProvider()-0%100%
get_RegisteredPlugins()-0%100%
AcceptMessageSessionAsync()-0%100%
AcceptMessageSessionAsync(...)-0%100%
AcceptMessageSessionAsync(...)-0%100%
AcceptMessageSessionAsync()-0%0%
RegisterPlugin(...)-0%0%
UnregisterPlugin(...)-0%0%
OnClosingAsync()-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\SessionClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Diagnostics;
 9    using System.Linq;
 10    using System.Threading.Tasks;
 11    using Amqp;
 12    using Azure.Amqp;
 13    using Core;
 14    using Primitives;
 15
 16    /// <summary>
 17    /// A session client can be used to accept session objects which can be used to interact with all messages with the 
 18    /// </summary>
 19    /// <remarks>
 20    /// You can accept any session or a given session (identified by <see cref="IMessageSession.SessionId"/> using a ses
 21    /// Once you accept a session, you can use it as a <see cref="MessageReceiver"/> which receives only messages having
 22    /// See <see cref="IMessageSession"/> for usage of session object.
 23    /// This uses AMQP protocol to communicate with the service.
 24    /// </remarks>
 25    /// <example>
 26    /// To create a new SessionClient
 27    /// <code>
 28    /// ISessionClient sessionClient = new SessionClient(
 29    ///     namespaceConnectionString,
 30    ///     queueName,
 31    ///     ReceiveMode.PeekLock);
 32    /// </code>
 33    ///
 34    /// To receive a session object for a given sessionId
 35    /// <code>
 36    /// IMessageSession session = await sessionClient.AcceptMessageSessionAsync(sessionId);
 37    /// </code>
 38    ///
 39    /// To receive any session
 40    /// <code>
 41    /// IMessageSession session = await sessionClient.AcceptMessageSessionAsync();
 42    /// </code>
 43    /// </example>
 44    /// <seealso cref="IMessageSession"/>
 45    public sealed class SessionClient : ClientEntity, ISessionClient
 46    {
 47        const int DefaultPrefetchCount = 0;
 48        readonly ServiceBusDiagnosticSource diagnosticSource;
 49
 50        /// <summary>
 51        /// Creates a new SessionClient from a <see cref="ServiceBusConnectionStringBuilder"/>
 52        /// </summary>
 53        /// <param name="connectionStringBuilder">The <see cref="ServiceBusConnectionStringBuilder"/> having entity leve
 54        /// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults 
 55        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with ServiceBus
 56        /// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages the s
 57        /// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
 58        /// <remarks>Creates a new connection to the entity, which is used for all the sessions objects accepted using t
 59        public SessionClient(
 60            ServiceBusConnectionStringBuilder connectionStringBuilder,
 61            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 62            RetryPolicy retryPolicy = null,
 63            int prefetchCount = DefaultPrefetchCount)
 064            : this(connectionStringBuilder?.GetNamespaceConnectionString(), connectionStringBuilder?.EntityPath, receive
 65        {
 066        }
 67
 68        /// <summary>
 69        /// Creates a new SessionClient from a specified connection string and entity path.
 70        /// </summary>
 71        /// <param name="connectionString">Namespace connection string used to communicate with Service Bus. Must not co
 72        /// <param name="entityPath">The path of the entity for this receiver. For Queues this will be the name, but for
 73        /// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults 
 74        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with ServiceBus
 75        /// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages the s
 76        /// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
 77        /// <remarks>Creates a new connection to the entity, which is used for all the sessions objects accepted using t
 78        public SessionClient(
 79            string connectionString,
 80            string entityPath,
 81            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 82            RetryPolicy retryPolicy = null,
 83            int prefetchCount = DefaultPrefetchCount)
 084            : this(nameof(SessionClient),
 085                  entityPath,
 086                  null,
 087                  receiveMode,
 088                  prefetchCount,
 089                  new ServiceBusConnection(connectionString),
 090                  null,
 091                  retryPolicy,
 092                  null)
 93        {
 094            if (string.IsNullOrWhiteSpace(connectionString))
 95            {
 096                throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
 97            }
 98
 099            this.OwnsConnection = true;
 0100        }
 101
 102        /// <summary>
 103        /// Creates a new SessionClient from a specified endpoint, entity path, and token provider.
 104        /// </summary>
 105        /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.
 106        /// <param name="entityPath">Queue path.</param>
 107        /// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
 108        /// <param name="transportType">Transport type.</param>
 109        /// <param name="receiveMode">Mode of receive of messages. Defaults to <see cref="ReceiveMode"/>.PeekLock.</para
 110        /// <param name="retryPolicy">Retry policy for queue operations. Defaults to <see cref="RetryPolicy.Default"/></
 111        /// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages this 
 112        /// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
 113        /// <remarks>Creates a new connection to the entity, which is opened during the first operation.</remarks>
 114        public SessionClient(
 115            string endpoint,
 116            string entityPath,
 117            ITokenProvider tokenProvider,
 118            TransportType transportType = TransportType.Amqp,
 119            ReceiveMode receiveMode = ReceiveMode.PeekLock,
 120            RetryPolicy retryPolicy = null,
 121            int prefetchCount = DefaultPrefetchCount)
 0122            : this(nameof(SessionClient),
 0123                entityPath,
 0124                null,
 0125                receiveMode,
 0126                prefetchCount,
 0127                new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider},
 0128                null,
 0129                retryPolicy,
 0130                null)
 131        {
 0132            this.OwnsConnection = true;
 0133        }
 134
 135        /// <summary>
 136        /// Creates a new SessionClient on a given <see cref="ServiceBusConnection"/>
 137        /// </summary>
 138        /// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
 139        /// <param name="entityPath">The path of the entity for this receiver. For Queues this will be the name, but for
 140        /// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults 
 141        /// <param name="retryPolicy">The <see cref="RetryPolicy"/> that will be used when communicating with ServiceBus
 142        /// <param name="prefetchCount">The <see cref="PrefetchCount"/> that specifies the upper limit of messages the s
 143        /// will actively receive regardless of whether a receive operation is pending. Defaults to 0.</param>
 144        public SessionClient(
 145            ServiceBusConnection serviceBusConnection,
 146            string entityPath,
 147            ReceiveMode receiveMode,
 148            RetryPolicy retryPolicy = null,
 149            int prefetchCount = DefaultPrefetchCount)
 0150            : this(nameof(SessionClient),
 0151                entityPath,
 0152                null,
 0153                receiveMode,
 0154                prefetchCount,
 0155                serviceBusConnection,
 0156                null,
 0157                retryPolicy,
 0158                null)
 159        {
 0160            this.OwnsConnection = false;
 0161        }
 162
 163        internal SessionClient(
 164            string clientTypeName,
 165            string entityPath,
 166            MessagingEntityType? entityType,
 167            ReceiveMode receiveMode,
 168            int prefetchCount,
 169            ServiceBusConnection serviceBusConnection,
 170            ICbsTokenProvider cbsTokenProvider,
 171            RetryPolicy retryPolicy,
 172            IList<ServiceBusPlugin> registeredPlugins)
 0173            : base(clientTypeName, entityPath, retryPolicy ?? RetryPolicy.Default)
 174        {
 0175            if (string.IsNullOrWhiteSpace(entityPath))
 176            {
 0177                throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
 178            }
 179
 0180            this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnect
 0181            this.EntityPath = entityPath;
 0182            this.EntityType = entityType;
 0183            this.ReceiveMode = receiveMode;
 0184            this.PrefetchCount = prefetchCount;
 0185            this.ServiceBusConnection.ThrowIfClosed();
 186
 0187            if (cbsTokenProvider != null)
 188            {
 0189                this.CbsTokenProvider = cbsTokenProvider;
 190            }
 0191            else if (this.ServiceBusConnection.TokenProvider != null)
 192            {
 0193                this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBu
 194            }
 195            else
 196            {
 0197                throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
 198            }
 199
 0200            this.diagnosticSource = new ServiceBusDiagnosticSource(entityPath, serviceBusConnection.Endpoint);
 201
 202            // Register plugins on the message session.
 0203            if (registeredPlugins != null)
 204            {
 0205                foreach (var serviceBusPlugin in registeredPlugins)
 206                {
 0207                    this.RegisterPlugin(serviceBusPlugin);
 208                }
 209            }
 0210        }
 211
 0212        ReceiveMode ReceiveMode { get; }
 213
 214        /// <summary>
 215        /// Gets the path of the entity. This is either the name of the queue, or the full path of the subscription.
 216        /// </summary>
 0217        public string EntityPath { get; }
 218
 219        /// <summary>
 220        /// Gets the path of the entity. This is either the name of the queue, or the full path of the subscription.
 221        /// </summary>
 0222        public override string Path => this.EntityPath;
 223
 224        /// <summary>
 225        /// Duration after which individual operations will timeout.
 226        /// </summary>
 227        public override TimeSpan OperationTimeout
 228        {
 0229            get => this.ServiceBusConnection.OperationTimeout;
 0230            set => this.ServiceBusConnection.OperationTimeout = value;
 231        }
 232
 233        /// <summary>
 234        /// Connection object to the service bus namespace.
 235        /// </summary>
 0236        public override ServiceBusConnection ServiceBusConnection { get; }
 237
 0238        MessagingEntityType? EntityType { get; }
 239
 0240        internal int PrefetchCount { get; set; }
 241
 0242        ICbsTokenProvider CbsTokenProvider { get; }
 243
 244        /// <summary>
 245        /// Gets a list of currently registered plugins.
 246        /// </summary>
 0247        public override IList<ServiceBusPlugin> RegisteredPlugins { get; } = new List<ServiceBusPlugin>();
 248
 249        /// <summary>
 250        /// Gets a session object of any <see cref="IMessageSession.SessionId"/> that can be used to receive messages fo
 251        /// </summary>
 252        /// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSes
 253        /// Individual sessions can further register additional plugins.</remarks>
 254        public Task<IMessageSession> AcceptMessageSessionAsync()
 255        {
 0256            return this.AcceptMessageSessionAsync(this.ServiceBusConnection.OperationTimeout);
 257        }
 258
 259        /// <summary>
 260        /// Gets a session object of any <see cref="IMessageSession.SessionId"/> that can be used to receive messages fo
 261        /// </summary>
 262        /// <param name="operationTimeout">Amount of time for which the call should wait to fetch the next session.</par
 263        /// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSes
 264        /// Individual sessions can further register additional plugins.</remarks>
 265        public Task<IMessageSession> AcceptMessageSessionAsync(TimeSpan operationTimeout)
 266        {
 0267            return this.AcceptMessageSessionAsync(null, operationTimeout);
 268        }
 269
 270        /// <summary>
 271        /// Gets a particular session object identified by <paramref name="sessionId"/> that can be used to receive mess
 272        /// </summary>
 273        /// <param name="sessionId">The sessionId present in all its messages.</param>
 274        /// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSes
 275        /// Individual sessions can further register additional plugins.</remarks>
 276        public Task<IMessageSession> AcceptMessageSessionAsync(string sessionId)
 277        {
 0278            return this.AcceptMessageSessionAsync(sessionId, this.ServiceBusConnection.OperationTimeout);
 279        }
 280
 281        /// <summary>
 282        /// Gets a particular session object identified by <paramref name="sessionId"/> that can be used to receive mess
 283        /// </summary>
 284        /// <param name="sessionId">The sessionId present in all its messages.</param>
 285        /// <param name="operationTimeout">Amount of time for which the call should wait to fetch the next session.</par
 286        /// <remarks>All plugins registered on <see cref="SessionClient"/> will be applied to each <see cref="MessageSes
 287        /// Individual sessions can further register additional plugins.</remarks>
 288        public async Task<IMessageSession> AcceptMessageSessionAsync(string sessionId, TimeSpan operationTimeout)
 289        {
 0290            this.ThrowIfClosed();
 291
 0292            MessagingEventSource.Log.AmqpSessionClientAcceptMessageSessionStart(
 0293                this.ClientId,
 0294                this.EntityPath,
 0295                this.ReceiveMode,
 0296                this.PrefetchCount,
 0297                sessionId);
 298
 0299            bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0300            Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.AcceptMessageSessionStart(sessionId) :
 0301            Task acceptMessageSessionTask = null;
 302
 0303            var session = new MessageSession(
 0304                this.EntityPath,
 0305                this.EntityType,
 0306                this.ReceiveMode,
 0307                this.ServiceBusConnection,
 0308                this.CbsTokenProvider,
 0309                this.RetryPolicy,
 0310                this.PrefetchCount,
 0311                sessionId,
 0312                true);
 313
 314            try
 315            {
 0316                acceptMessageSessionTask = this.RetryPolicy.RunOperation(
 0317                    () => session.GetSessionReceiverLinkAsync(operationTimeout),
 0318                    operationTimeout);
 0319                await acceptMessageSessionTask.ConfigureAwait(false);
 0320            }
 0321            catch (Exception exception)
 322            {
 0323                if (isDiagnosticSourceEnabled && !(exception is ServiceBusTimeoutException))
 324                {
 0325                    this.diagnosticSource.ReportException(exception);
 326                }
 327
 0328                MessagingEventSource.Log.AmqpSessionClientAcceptMessageSessionException(
 0329                    this.ClientId,
 0330                    this.EntityPath,
 0331                    exception);
 332
 0333                await session.CloseAsync().ConfigureAwait(false);
 0334                throw AmqpExceptionHelper.GetClientException(exception);
 335            }
 336            finally
 337            {
 0338                this.diagnosticSource.AcceptMessageSessionStop(activity, session.SessionId, acceptMessageSessionTask?.St
 339            }
 340
 0341            MessagingEventSource.Log.AmqpSessionClientAcceptMessageSessionStop(
 0342                this.ClientId,
 0343                this.EntityPath,
 0344                session.SessionIdInternal);
 345
 0346            session.UpdateClientId(ClientEntity.GenerateClientId(nameof(MessageSession), $"{this.EntityPath}_{session.Se
 347            // Register plugins on the message session.
 0348            foreach (var serviceBusPlugin in this.RegisteredPlugins)
 349            {
 0350                session.RegisterPlugin(serviceBusPlugin);
 351            }
 352
 0353            return session;
 0354        }
 355
 356        /// <summary>
 357        /// Registers a <see cref="ServiceBusPlugin"/> to be used with this receiver.
 358        /// </summary>
 359        /// <param name="serviceBusPlugin">The <see cref="ServiceBusPlugin"/> to register.</param>
 360        public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)
 361        {
 0362            this.ThrowIfClosed();
 363
 0364            if (serviceBusPlugin == null)
 365            {
 0366                throw new ArgumentNullException(nameof(serviceBusPlugin), Resources.ArgumentNullOrWhiteSpace.FormatForUs
 367            }
 0368            if (this.RegisteredPlugins.Any(p => p.Name == serviceBusPlugin.Name))
 369            {
 0370                throw new ArgumentException(nameof(serviceBusPlugin), Resources.PluginAlreadyRegistered.FormatForUser(na
 371            }
 0372            this.RegisteredPlugins.Add(serviceBusPlugin);
 0373        }
 374
 375        /// <summary>
 376        /// Unregisters a <see cref="ServiceBusPlugin"/>.
 377        /// </summary>
 378        /// <param name="serviceBusPluginName">The <see cref="ServiceBusPlugin.Name"/> of the plugin to be unregistered.
 379        public override void UnregisterPlugin(string serviceBusPluginName)
 380        {
 0381            this.ThrowIfClosed();
 382
 0383            if (this.RegisteredPlugins == null)
 384            {
 0385                return;
 386            }
 0387            if (string.IsNullOrWhiteSpace(serviceBusPluginName))
 388            {
 0389                throw new ArgumentNullException(nameof(serviceBusPluginName), Resources.ArgumentNullOrWhiteSpace.FormatF
 390            }
 0391            if (this.RegisteredPlugins.Any(p => p.Name == serviceBusPluginName))
 392            {
 0393                var plugin = this.RegisteredPlugins.First(p => p.Name == serviceBusPluginName);
 0394                this.RegisteredPlugins.Remove(plugin);
 395            }
 0396        }
 397
 398        protected override Task OnClosingAsync()
 399        {
 0400            return Task.CompletedTask;
 401        }
 402    }
 403}