< Summary

Class:Azure.Messaging.ServiceBus.ServiceBusSessionReceiver
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Receiver\ServiceBusSessionReceiver.cs
Covered lines:51
Uncovered lines:14
Coverable lines:65
Total lines:202
Line coverage:78.4% (51 of 65)
Covered branches:2
Total branches:4
Branch coverage:50% (2 of 4)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_SessionId()-100%100%
get_SessionLockedUntil()-0%100%
CreateSessionReceiverAsync()-0%100%
.ctor(...)-100%50%
.ctor()-100%100%
GetSessionStateAsync()-100%100%
SetSessionStateAsync()-100%100%
RenewSessionLockAsync()-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Receiver\ServiceBusSessionReceiver.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Threading;
 6using System.Threading.Tasks;
 7using Azure.Messaging.ServiceBus.Core;
 8using Azure.Core;
 9using Azure.Messaging.ServiceBus.Diagnostics;
 10using Azure.Core.Pipeline;
 11using Azure.Messaging.ServiceBus.Plugins;
 12using System.Collections.Generic;
 13
 14namespace Azure.Messaging.ServiceBus
 15{
 16    /// <summary>
 17    /// The <see cref="ServiceBusSessionReceiver" /> is responsible for receiving <see cref="ServiceBusReceivedMessage" 
 18    ///  and settling messages from session-enabled Queues and Subscriptions. It is constructed by calling
 19    ///  <see cref="ServiceBusClient.CreateSessionReceiverAsync(string, string, ServiceBusSessionReceiverOptions, Cancel
 20    /// </summary>
 21    public class ServiceBusSessionReceiver : ServiceBusReceiver
 22    {
 23        /// <summary>
 24        /// The Session Id associated with the receiver.
 25        /// </summary>
 2426        public string SessionId => InnerReceiver.SessionId;
 27
 28        /// <summary>
 29        /// Gets the <see cref="DateTimeOffset"/> that the receiver's session is locked until.
 30        /// </summary>
 031        public DateTimeOffset SessionLockedUntil => InnerReceiver.SessionLockedUntil;
 32
 33        /// <summary>
 34        /// Creates a session receiver which can be used to interact with all messages with the same sessionId.
 35        /// </summary>
 36        ///
 37        /// <param name="entityPath">The name of the specific queue to associate the receiver with.</param>
 38        /// <param name="connection">The <see cref="ServiceBusConnection" /> connection to use for communication with th
 39        /// <param name="plugins">The set of plugins to apply to incoming messages.</param>
 40        /// <param name="options">A set of options to apply when configuring the receiver.</param>
 41        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 42        ///
 43        ///<returns>Returns a new instance of the <see cref="ServiceBusSessionReceiver"/> class.</returns>
 44        internal static async Task<ServiceBusSessionReceiver> CreateSessionReceiverAsync(
 45            string entityPath,
 46            ServiceBusConnection connection,
 47            IList<ServiceBusPlugin> plugins,
 48            ServiceBusSessionReceiverOptions options = default,
 49            CancellationToken cancellationToken = default)
 50        {
 051            var receiver = new ServiceBusSessionReceiver(
 052                connection: connection,
 053                entityPath: entityPath,
 054                plugins: plugins,
 055                options: options);
 56            try
 57            {
 058                await receiver.OpenLinkAsync(cancellationToken).ConfigureAwait(false);
 059            }
 060            catch (Exception ex)
 61            {
 062                receiver.Logger.ClientCreateException(typeof(ServiceBusSessionReceiver), receiver.FullyQualifiedNamespac
 063                throw;
 64            }
 065            receiver.Logger.ClientCreateComplete(typeof(ServiceBusSessionReceiver), receiver.Identifier);
 066            return receiver;
 067        }
 68
 69        /// <summary>
 70        /// Initializes a new instance of the <see cref="ServiceBusReceiver"/> class.
 71        /// </summary>
 72        ///
 73        /// <param name="connection">The <see cref="ServiceBusConnection" /> connection to use for communication with th
 74        /// <param name="entityPath"></param>
 75        /// <param name="plugins">The set of plugins to apply to incoming messages.</param>
 76        /// <param name="options">A set of options to apply when configuring the consumer.</param>
 77        internal ServiceBusSessionReceiver(
 78            ServiceBusConnection connection,
 79            string entityPath,
 80            IList<ServiceBusPlugin> plugins,
 81            ServiceBusSessionReceiverOptions options) :
 1482            base(connection, entityPath, true, plugins, options?.ToReceiverOptions(), options?.SessionId)
 83        {
 1484        }
 85
 86        /// <summary>
 87        /// Initializes a new instance of the <see cref="ServiceBusReceiver"/> class for mocking.
 88        /// </summary>
 89        ///
 890        protected ServiceBusSessionReceiver() : base() { }
 91
 92        /// <summary>
 93        /// Gets the session state.
 94        /// </summary>
 95        ///
 96        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 97        ///
 98        /// <returns>The session state as byte array.</returns>
 99        public virtual async Task<byte[]> GetSessionStateAsync(CancellationToken cancellationToken = default)
 100        {
 4101            Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver));
 4102            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 4103            Logger.GetSessionStateStart(Identifier, SessionId);
 4104            using DiagnosticScope scope = ScopeFactory.CreateScope(
 4105                DiagnosticProperty.GetSessionStateActivityName,
 4106                sessionId: SessionId);
 4107            scope.Start();
 108
 4109            byte[] sessionState = null;
 110
 111            try
 112            {
 4113                sessionState = await InnerReceiver.GetStateAsync(cancellationToken).ConfigureAwait(false);
 2114            }
 2115            catch (Exception exception)
 116            {
 2117                Logger.GetSessionStateException(Identifier, exception.ToString());
 2118                scope.Failed(exception);
 2119                throw;
 120            }
 121
 2122            Logger.GetSessionStateComplete(Identifier);
 2123            return sessionState;
 2124        }
 125
 126        /// <summary>
 127        /// Set a custom state on the session which can be later retrieved using <see cref="GetSessionStateAsync"/>
 128        /// </summary>
 129        ///
 130        /// <param name="sessionState">A byte array of session state</param>
 131        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 132        ///
 133        /// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
 134        ///
 135        /// <returns>A task to be resolved on when the operation has completed.</returns>
 136        public virtual async Task SetSessionStateAsync(
 137            byte[] sessionState,
 138            CancellationToken cancellationToken = default)
 139        {
 4140            Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver));
 4141            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 4142            Logger.SetSessionStateStart(Identifier, SessionId);
 4143            using DiagnosticScope scope = ScopeFactory.CreateScope(
 4144                DiagnosticProperty.SetSessionStateActivityName,
 4145                sessionId: SessionId);
 4146            scope.Start();
 147
 148            try
 149            {
 4150                await InnerReceiver.SetStateAsync(sessionState, cancellationToken).ConfigureAwait(false);
 2151            }
 2152            catch (Exception exception)
 153            {
 2154                Logger.SetSessionStateException(Identifier, exception.ToString());
 2155                scope.Failed(exception);
 2156                throw;
 157            }
 158
 2159            Logger.SetSessionStateComplete(Identifier);
 2160        }
 161
 162        /// <summary>
 163        /// Renews the lock on the session specified by the <see cref="SessionId"/>. The lock will be renewed based on t
 164        /// </summary>
 165        ///
 166        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 167        ///
 168        /// <remarks>
 169        /// <para>
 170        /// When you get session receiver, the session is locked for this receiver by the service for a duration as spec
 171        /// If processing of the session requires longer than this duration, the session-lock needs to be renewed.
 172        /// For each renewal, it resets the time the session is locked by the LockDuration set on the Entity.
 173        /// </para>
 174        /// <para>
 175        /// Renewal of session renews all the messages in the session as well. Each individual message need not be renew
 176        /// </para>
 177        /// </remarks>
 178        public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
 179        {
 4180            Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver));
 4181            cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 4182            Logger.RenewSessionLockStart(Identifier, SessionId);
 4183            using DiagnosticScope scope = ScopeFactory.CreateScope(
 4184                DiagnosticProperty.RenewSessionLockActivityName,
 4185                sessionId: SessionId);
 4186            scope.Start();
 187
 188            try
 189            {
 4190                await InnerReceiver.RenewSessionLockAsync(cancellationToken).ConfigureAwait(false);
 2191            }
 2192            catch (Exception exception)
 193            {
 2194                Logger.RenewSessionLockException(Identifier, exception.ToString());
 2195                scope.Failed(exception);
 2196                throw;
 197            }
 198
 2199            Logger.RenewSessionLockComplete(Identifier);
 2200        }
 201    }
 202}