< Summary

Class:Microsoft.Azure.ServiceBus.SessionReceivePump
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\SessionReceivePump.cs
Covered lines:0
Uncovered lines:152
Coverable lines:152
Total lines:341
Line coverage:0% (0 of 152)
Covered branches:0
Total branches:96
Branch coverage:0% (0 of 96)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%0%
get_ReceiveMode()-0%100%
StartPump()-0%0%
CancelAutoRenewLock(...)-0%100%
ShouldRenewSessionLock()-0%0%
RaiseExceptionReceived(...)-0%100%
CompleteMessageIfNeededAsync()-0%0%
AbandonMessageIfNeededAsync()-0%0%
SessionPumpTaskAsync()-0%0%
MessagePumpTaskAsync()-0%0%
CloseSessionIfNeededAsync()-0%0%
RenewSessionLockTaskAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\SessionReceivePump.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus
 5{
 6    using System;
 7    using System.Diagnostics;
 8    using System.Threading;
 9    using System.Threading.Tasks;
 10    using Primitives;
 11
 12    sealed class SessionReceivePump
 13    {
 14        readonly string clientId;
 15        readonly ISessionClient client;
 16        readonly Func<IMessageSession, Message, CancellationToken, Task> userOnSessionCallback;
 17        readonly SessionHandlerOptions sessionHandlerOptions;
 18        readonly string endpoint;
 19        readonly string entityPath;
 20        readonly CancellationToken pumpCancellationToken;
 21        readonly SemaphoreSlim maxConcurrentSessionsSemaphoreSlim;
 22        readonly SemaphoreSlim maxPendingAcceptSessionsSemaphoreSlim;
 23        private readonly ServiceBusDiagnosticSource diagnosticSource;
 24
 025        public SessionReceivePump(string clientId,
 026            ISessionClient client,
 027            ReceiveMode receiveMode,
 028            SessionHandlerOptions sessionHandlerOptions,
 029            Func<IMessageSession, Message, CancellationToken, Task> callback,
 030            Uri endpoint,
 031            CancellationToken token)
 32        {
 033            this.client = client ?? throw new ArgumentException(nameof(client));
 034            this.clientId = clientId;
 035            this.ReceiveMode = receiveMode;
 036            this.sessionHandlerOptions = sessionHandlerOptions;
 037            this.userOnSessionCallback = callback;
 038            this.endpoint = endpoint.Authority;
 039            this.entityPath = client.EntityPath;
 040            this.pumpCancellationToken = token;
 041            this.maxConcurrentSessionsSemaphoreSlim = new SemaphoreSlim(this.sessionHandlerOptions.MaxConcurrentSessions
 042            this.maxPendingAcceptSessionsSemaphoreSlim = new SemaphoreSlim(this.sessionHandlerOptions.MaxConcurrentAccep
 043            this.diagnosticSource = new ServiceBusDiagnosticSource(client.EntityPath, endpoint);
 044        }
 45
 046        ReceiveMode ReceiveMode { get; }
 47
 48        public void StartPump()
 49        {
 50            // Schedule Tasks for doing PendingAcceptSession calls
 051            for (var i = 0; i < this.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls; i++)
 52            {
 053                TaskExtensionHelper.Schedule(this.SessionPumpTaskAsync);
 54            }
 055        }
 56
 57        static void CancelAutoRenewLock(object state)
 58        {
 059            var renewCancellationTokenSource = (CancellationTokenSource)state;
 60
 61            try
 62            {
 063                renewCancellationTokenSource.Cancel();
 064            }
 065            catch (ObjectDisposedException)
 66            {
 67                // Ignore this race.
 068            }
 069        }
 70
 71        bool ShouldRenewSessionLock()
 72        {
 073            return
 074                this.ReceiveMode == ReceiveMode.PeekLock &&
 075                this.sessionHandlerOptions.AutoRenewLock;
 76        }
 77
 78        Task RaiseExceptionReceived(Exception e, string action)
 79        {
 080            var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.entityPath, this.clientId);
 081            return this.sessionHandlerOptions.RaiseExceptionReceived(eventArgs);
 82        }
 83
 84        async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message)
 85        {
 86            try
 87            {
 088                if (this.ReceiveMode == ReceiveMode.PeekLock &&
 089                    this.sessionHandlerOptions.AutoComplete)
 90                {
 091                    await session.CompleteAsync(new[] { message.SystemProperties.LockToken }).ConfigureAwait(false);
 92                }
 093            }
 094            catch (Exception exception)
 95            {
 096                await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Complete).ConfigureAwait(f
 97            }
 098        }
 99
 100        async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message)
 101        {
 102            try
 103            {
 0104                if (session.ReceiveMode == ReceiveMode.PeekLock)
 105                {
 0106                    await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
 107                }
 0108            }
 0109            catch (Exception exception)
 110            {
 0111                await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Abandon).ConfigureAwait(fa
 112            }
 0113        }
 114
 115        async Task SessionPumpTaskAsync()
 116        {
 0117            while (!this.pumpCancellationToken.IsCancellationRequested)
 118            {
 0119                var concurrentSessionSemaphoreAcquired = false;
 120                try
 121                {
 0122                    await this.maxConcurrentSessionsSemaphoreSlim.WaitAsync(this.pumpCancellationToken).ConfigureAwait(f
 0123                    concurrentSessionSemaphoreAcquired = true;
 124
 0125                    await this.maxPendingAcceptSessionsSemaphoreSlim.WaitAsync(this.pumpCancellationToken).ConfigureAwai
 0126                    var session = await this.client.AcceptMessageSessionAsync().ConfigureAwait(false);
 0127                    if (session == null)
 128                    {
 0129                        await Task.Delay(Constants.NoMessageBackoffTimeSpan, this.pumpCancellationToken).ConfigureAwait(
 0130                        continue;
 131                    }
 132
 133                    // `session` needs to be copied to another local variable before passing to Schedule
 134                    // because of the way variables are captured. (Refer 'Captured variables')
 0135                    var messageSession = session;
 0136                    TaskExtensionHelper.Schedule(() => this.MessagePumpTaskAsync(messageSession));
 0137                }
 0138                catch (Exception exception)
 139                {
 0140                    MessagingEventSource.Log.SessionReceivePumpSessionReceiveException(this.clientId, exception);
 141
 0142                    if (concurrentSessionSemaphoreAcquired)
 143                    {
 0144                        this.maxConcurrentSessionsSemaphoreSlim.Release();
 145                    }
 146
 0147                    if (exception is ServiceBusTimeoutException)
 148                    {
 0149                        await Task.Delay(Constants.NoMessageBackoffTimeSpan, this.pumpCancellationToken).ConfigureAwait(
 150                    }
 151                    else
 152                    {
 0153                        if (!(exception is ObjectDisposedException && this.pumpCancellationToken.IsCancellationRequested
 154                        {
 0155                            await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.AcceptMessageS
 156                        }
 0157                        if (!MessagingUtilities.ShouldRetry(exception))
 158                        {
 0159                            await Task.Delay(Constants.NoMessageBackoffTimeSpan, this.pumpCancellationToken).ConfigureAw
 160                        }
 161                    }
 0162                }
 163                finally
 164                {
 0165                    this.maxPendingAcceptSessionsSemaphoreSlim.Release();
 166                }
 167            }
 0168        }
 169
 170        async Task MessagePumpTaskAsync(IMessageSession session)
 171        {
 0172            if (session == null)
 173            {
 0174                return;
 175            }
 176
 0177            var renewLockCancellationTokenSource = new CancellationTokenSource();
 0178            if (this.ShouldRenewSessionLock())
 179            {
 0180                TaskExtensionHelper.Schedule(() => this.RenewSessionLockTaskAsync(session, renewLockCancellationTokenSou
 181            }
 182
 0183            var autoRenewLockCancellationTimer = new Timer(
 0184                CancelAutoRenewLock,
 0185                renewLockCancellationTokenSource,
 0186                Timeout.Infinite,
 0187                Timeout.Infinite);
 188
 189            try
 190            {
 0191                while (!this.pumpCancellationToken.IsCancellationRequested && !session.IsClosedOrClosing)
 192                {
 193                    Message message;
 194                    try
 195                    {
 0196                        message = await session.ReceiveAsync(this.sessionHandlerOptions.MessageWaitTimeout).ConfigureAwa
 0197                    }
 0198                    catch (Exception exception)
 199                    {
 0200                        MessagingEventSource.Log.MessageReceivePumpTaskException(this.clientId, session.SessionId, excep
 0201                        if (exception is ServiceBusTimeoutException)
 202                        {
 203                            // Timeout Exceptions are pretty common. Not alerting the User on this.
 204                            continue;
 205                        }
 206
 0207                        if (!(exception is ObjectDisposedException && this.pumpCancellationToken.IsCancellationRequested
 208                        {
 0209                            await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Receive).Confi
 210                        }
 0211                        break;
 212                    }
 213
 0214                    if (message == null)
 215                    {
 0216                        MessagingEventSource.Log.SessionReceivePumpSessionEmpty(this.clientId, session.SessionId);
 0217                        break;
 218                    }
 219
 0220                    bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
 0221                    Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.ProcessSessionStart(session, m
 0222                    Task processTask = null;
 223
 224                    try
 225                    {
 226                        // Set the timer
 0227                        autoRenewLockCancellationTimer.Change(this.sessionHandlerOptions.MaxAutoRenewDuration,
 0228                            TimeSpan.FromMilliseconds(-1));
 0229                        var callbackExceptionOccurred = false;
 230                        try
 231                        {
 0232                            processTask = this.userOnSessionCallback(session, message, this.pumpCancellationToken);
 0233                            await processTask.ConfigureAwait(false);
 0234                        }
 0235                        catch (Exception exception)
 236                        {
 0237                            if (isDiagnosticSourceEnabled)
 238                            {
 0239                                this.diagnosticSource.ReportException(exception);
 240                            }
 241
 0242                            MessagingEventSource.Log.MessageReceivePumpTaskException(this.clientId, session.SessionId, e
 0243                            await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).
 0244                            callbackExceptionOccurred = true;
 0245                            if (!(exception is MessageLockLostException || exception is SessionLockLostException))
 246                            {
 0247                                await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
 248                            }
 0249                        }
 250                        finally
 251                        {
 0252                            autoRenewLockCancellationTimer.Change(Timeout.Infinite, Timeout.Infinite);
 253                        }
 254
 0255                        if (!callbackExceptionOccurred)
 256                        {
 0257                            await this.CompleteMessageIfNeededAsync(session, message).ConfigureAwait(false);
 258                        }
 0259                        else if (session.IsClosedOrClosing)
 260                        {
 261                            // If User closed the session as part of the callback, break out of the loop
 0262                            break;
 263                        }
 0264                    }
 265                    finally
 266                    {
 0267                        this.diagnosticSource.ProcessSessionStop(activity, session, message, processTask?.Status);
 268                    }
 0269                }
 270            }
 271            finally
 272            {
 0273                renewLockCancellationTokenSource.Cancel();
 0274                renewLockCancellationTokenSource.Dispose();
 0275                autoRenewLockCancellationTimer.Dispose();
 276
 0277                await this.CloseSessionIfNeededAsync(session).ConfigureAwait(false);
 0278                this.maxConcurrentSessionsSemaphoreSlim.Release();
 279            }
 0280        }
 281
 282        async Task CloseSessionIfNeededAsync(IMessageSession session)
 283        {
 0284            if (!session.IsClosedOrClosing)
 285            {
 286                try
 287                {
 0288                    await session.CloseAsync().ConfigureAwait(false);
 0289                    MessagingEventSource.Log.SessionReceivePumpSessionClosed(this.clientId, session.SessionId);
 0290                }
 0291                catch (Exception exception)
 292                {
 0293                    MessagingEventSource.Log.SessionReceivePumpSessionCloseException(this.clientId, session.SessionId, e
 0294                    await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.CloseMessageSession).C
 295                }
 296            }
 0297        }
 298
 299        async Task RenewSessionLockTaskAsync(IMessageSession session, CancellationToken renewLockCancellationToken)
 300        {
 0301            while (!this.pumpCancellationToken.IsCancellationRequested &&
 0302                   !renewLockCancellationToken.IsCancellationRequested)
 303            {
 304                try
 305                {
 0306                    var amount = MessagingUtilities.CalculateRenewAfterDuration(session.LockedUntilUtc);
 307
 0308                    MessagingEventSource.Log.SessionReceivePumpSessionRenewLockStart(this.clientId, session.SessionId, a
 0309                    await Task.Delay(amount, renewLockCancellationToken).ConfigureAwait(false);
 310
 0311                    if (!this.pumpCancellationToken.IsCancellationRequested &&
 0312                        !renewLockCancellationToken.IsCancellationRequested)
 313                    {
 0314                        await session.RenewSessionLockAsync().ConfigureAwait(false);
 0315                        MessagingEventSource.Log.SessionReceivePumpSessionRenewLockStop(this.clientId, session.SessionId
 316                    }
 317                    else
 318                    {
 0319                        break;
 320                    }
 0321                }
 0322                catch (Exception exception)
 323                {
 0324                    MessagingEventSource.Log.SessionReceivePumpSessionRenewLockException(this.clientId, session.SessionI
 325
 326                    // TaskCanceled is expected here as renewTasks will be cancelled after the Complete call is made.
 327                    // ObjectDisposedException should only happen here because the CancellationToken was disposed at whi
 328                    // this renew exception is not relevant anymore. Lets not bother user with this exception.
 0329                    if (!(exception is TaskCanceledException) && !(exception is ObjectDisposedException))
 330                    {
 0331                        await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.RenewLock).Configu
 332                    }
 0333                    if (!MessagingUtilities.ShouldRetry(exception))
 334                    {
 335                        break;
 336                    }
 0337                }
 338            }
 0339        }
 340    }
 341}