< Summary

Class:Microsoft.Azure.ServiceBus.MessageSession
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\MessageSession.cs
Covered lines:0
Uncovered lines:86
Coverable lines:86
Total lines:254
Line coverage:0% (0 of 86)
Covered branches:0
Total branches:30
Branch coverage:0% (0 of 30)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%100%
get_LockedUntilUtc()-0%100%
set_LockedUntilUtc(...)-0%100%
get_SessionId()-0%100%
GetStateAsync()-0%0%
SetStateAsync(...)-0%0%
RenewSessionLockAsync()-0%0%
OnMessageHandler(...)-0%100%
OnRenewLockAsync(...)-0%100%
OnGetStateAsync()-0%0%
OnSetStateAsync()-0%0%
OnRenewSessionLockAsync()-0%0%
ThrowIfClosed()-0%0%
OnGetStateInstrumentedAsync()-0%0%
OnSetStateInstrumentedAsync()-0%0%
OnRenewSessionLockInstrumentedAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\MessageSession.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus
 5{
 6    using System;
 7    using System.Diagnostics;
 8    using System.Threading;
 9    using System.Threading.Tasks;
 10    using Amqp;
 11    using Azure.Amqp;
 12    using Core;
 13    using Primitives;
 14
 15    internal class MessageSession : MessageReceiver, IMessageSession
 16    {
 17        private readonly ServiceBusDiagnosticSource diagnosticSource;
 18
 19        public MessageSession(
 20            string entityPath,
 21            MessagingEntityType? entityType,
 22            ReceiveMode receiveMode,
 23            ServiceBusConnection serviceBusConnection,
 24            ICbsTokenProvider cbsTokenProvider,
 25            RetryPolicy retryPolicy,
 26            int prefetchCount = Constants.DefaultClientPrefetchCount,
 27            string sessionId = null,
 28            bool isSessionReceiver = false)
 029            : base(entityPath, entityType, receiveMode, serviceBusConnection, cbsTokenProvider, retryPolicy, prefetchCou
 30        {
 031            this.diagnosticSource = new ServiceBusDiagnosticSource(entityPath, serviceBusConnection.Endpoint);
 032        }
 33
 34        /// <summary>
 35        /// Gets the time that the session identified by <see cref="SessionId"/> is locked until for this client.
 36        /// </summary>
 37        public DateTime LockedUntilUtc
 38        {
 039            get => this.LockedUntilUtcInternal;
 040            internal set => this.LockedUntilUtcInternal = value;
 41        }
 42
 43        /// <summary>
 44        /// Gets the SessionId.
 45        /// </summary>
 046        public string SessionId => this.SessionIdInternal;
 47
 48        public Task<byte[]> GetStateAsync()
 49        {
 050            this.ThrowIfClosed();
 051            return ServiceBusDiagnosticSource.IsEnabled() ? this.OnGetStateInstrumentedAsync() : this.OnGetStateAsync();
 52        }
 53
 54        public Task SetStateAsync(byte[] sessionState)
 55        {
 056            this.ThrowIfClosed();
 057            return ServiceBusDiagnosticSource.IsEnabled() ? this.OnSetStateInstrumentedAsync(sessionState) : this.OnSetS
 58        }
 59
 60        public Task RenewSessionLockAsync()
 61        {
 062            this.ThrowIfClosed();
 063            return ServiceBusDiagnosticSource.IsEnabled() ? this.OnRenewSessionLockInstrumentedAsync() : this.OnRenewSes
 64        }
 65
 66        protected override void OnMessageHandler(MessageHandlerOptions registerHandlerOptions, Func<Message, Cancellatio
 67        {
 068            throw new InvalidOperationException($"{nameof(RegisterMessageHandler)} is not supported for Sessions.");
 69        }
 70
 71        protected override Task<DateTime> OnRenewLockAsync(string lockToken)
 72        {
 073            throw new InvalidOperationException($"{nameof(RenewLockAsync)} is not supported for Session. Use {nameof(Ren
 74        }
 75
 76        protected async Task<byte[]> OnGetStateAsync()
 77        {
 78            try
 79            {
 080                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.GetSessionState
 81
 082                if (this.ReceiveLinkManager.TryGetOpenedObject(out var receiveLink))
 83                {
 084                    amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkN
 85                }
 86
 087                amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
 88
 089                var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(fals
 90
 091                byte[] sessionState = null;
 092                if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 93                {
 094                    if (amqpResponseMessage.Map[ManagementConstants.Properties.SessionState] != null)
 95                    {
 096                        sessionState = amqpResponseMessage.GetValue<ArraySegment<byte>>(ManagementConstants.Properties.S
 97                    }
 98                }
 99                else
 100                {
 0101                    throw amqpResponseMessage.ToMessagingContractException();
 102                }
 103
 0104                return sessionState;
 105            }
 106            catch (Exception exception)
 107            {
 0108                throw AmqpExceptionHelper.GetClientException(exception);
 109            }
 0110        }
 111
 112        protected async Task OnSetStateAsync(byte[] sessionState)
 113        {
 114            try
 115            {
 0116                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.SetSessionState
 117
 0118                if (this.ReceiveLinkManager.TryGetOpenedObject(out var receiveLink))
 119                {
 0120                    amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkN
 121                }
 122
 0123                amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
 124
 0125                if (sessionState != null)
 126                {
 0127                    var value = new ArraySegment<byte>(sessionState);
 0128                    amqpRequestMessage.Map[ManagementConstants.Properties.SessionState] = value;
 129                }
 130                else
 131                {
 0132                    amqpRequestMessage.Map[ManagementConstants.Properties.SessionState] = null;
 133                }
 134
 0135                var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(fals
 0136                if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
 137                {
 0138                    throw amqpResponseMessage.ToMessagingContractException();
 139                }
 0140            }
 141            catch (Exception exception)
 142            {
 0143                throw AmqpExceptionHelper.GetClientException(exception);
 144            }
 0145        }
 146
 147        protected async Task OnRenewSessionLockAsync()
 148        {
 149            try
 150            {
 0151                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.RenewSessionLoc
 152
 0153                if (this.ReceiveLinkManager.TryGetOpenedObject(out var receiveLink))
 154                {
 0155                    amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkN
 156                }
 157
 0158                amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = this.SessionIdInternal;
 159
 0160                var amqpResponseMessage = await this.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(fals
 161
 0162                if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
 163                {
 0164                    this.LockedUntilUtcInternal = amqpResponseMessage.GetValue<DateTime>(ManagementConstants.Properties.
 165                }
 166                else
 167                {
 0168                    throw amqpResponseMessage.ToMessagingContractException();
 169                }
 0170            }
 171            catch (Exception exception)
 172            {
 0173                throw AmqpExceptionHelper.GetClientException(exception);
 174            }
 0175        }
 176
 177        /// <summary>
 178        /// Throw an OperationCanceledException if the object is Closing.
 179        /// </summary>
 180        protected override void ThrowIfClosed()
 181        {
 0182            if (this.IsClosedOrClosing)
 183            {
 0184                throw new ObjectDisposedException($"MessageSession with Id '{this.ClientId}' has already been closed. Pl
 185            }
 0186        }
 187
 188        private async Task<byte[]> OnGetStateInstrumentedAsync()
 189        {
 0190            Activity activity = this.diagnosticSource.GetSessionStateStart(this.SessionId);
 0191            Task<byte[]> getStateTask = null;
 0192            byte[] state = null;
 193
 194            try
 195            {
 0196                getStateTask = this.OnGetStateAsync();
 0197                state = await getStateTask.ConfigureAwait(false);
 0198                return state;
 199            }
 0200            catch (Exception ex)
 201            {
 0202                this.diagnosticSource.ReportException(ex);
 0203                throw;
 204            }
 205            finally
 206            {
 0207                this.diagnosticSource.GetSessionStateStop(activity, this.SessionId, state, getStateTask?.Status);
 208            }
 0209        }
 210
 211        private async Task OnSetStateInstrumentedAsync(byte[] sessionState)
 212        {
 0213            Activity activity = this.diagnosticSource.SetSessionStateStart(this.SessionId, sessionState);
 0214            Task setStateTask = null;
 215
 216            try
 217            {
 0218                setStateTask = this.OnSetStateAsync(sessionState);
 0219                await setStateTask.ConfigureAwait(false);
 0220            }
 0221            catch (Exception ex)
 222            {
 0223                this.diagnosticSource.ReportException(ex);
 0224                throw;
 225            }
 226            finally
 227            {
 0228                this.diagnosticSource.SetSessionStateStop(activity, sessionState, this.SessionId, setStateTask?.Status);
 229            }
 0230        }
 231
 232        private async Task OnRenewSessionLockInstrumentedAsync()
 233        {
 0234            Activity activity = this.diagnosticSource.RenewSessionLockStart(this.SessionId);
 0235            Task renewTask = null;
 236
 237            try
 238            {
 0239                renewTask = this.OnRenewSessionLockAsync();
 0240                await renewTask.ConfigureAwait(false);
 0241            }
 0242            catch (Exception ex)
 243            {
 0244                this.diagnosticSource.ReportException(ex);
 0245                throw;
 246            }
 247            finally
 248            {
 0249                this.diagnosticSource.RenewSessionLockStop(activity, this.SessionId, renewTask?.Status);
 250            }
 0251        }
 252
 253    }
 254}