< Summary

Class:Microsoft.Azure.ServiceBus.MessageReceivePump
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\MessageReceivePump.cs
Covered lines:0
Uncovered lines:155
Coverable lines:155
Total lines:298
Line coverage:0% (0 of 155)
Covered branches:0
Total branches:58
Branch coverage:0% (0 of 58)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%0%
StartPump()-0%100%
ShouldRenewLock()-0%0%
RaiseExceptionReceived(...)-0%100%
MessagePumpTaskAsync()-0%0%
MessageDispatchTaskInstrumented()-0%0%
MessageDispatchTask()-0%0%
CancelAutoRenewLock(...)-0%100%
AbandonMessageIfNeededAsync()-0%0%
CompleteMessageIfNeededAsync()-0%0%
RenewMessageLockTask()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\MessageReceivePump.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus
 5{
 6    using System;
 7    using System.Diagnostics;
 8    using System.Threading;
 9    using System.Threading.Tasks;
 10    using Core;
 11    using Primitives;
 12
 13    sealed class MessageReceivePump
 14    {
 15        readonly Func<Message, CancellationToken, Task> onMessageCallback;
 16        readonly string endpoint;
 17        readonly MessageHandlerOptions registerHandlerOptions;
 18        readonly IMessageReceiver messageReceiver;
 19        readonly CancellationToken pumpCancellationToken;
 20        readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
 21        readonly ServiceBusDiagnosticSource diagnosticSource;
 22
 023        public MessageReceivePump(IMessageReceiver messageReceiver,
 024            MessageHandlerOptions registerHandlerOptions,
 025            Func<Message, CancellationToken, Task> callback,
 026            Uri endpoint,
 027            CancellationToken pumpCancellationToken)
 28        {
 029            this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
 030            this.registerHandlerOptions = registerHandlerOptions;
 031            this.onMessageCallback = callback;
 032            this.endpoint = endpoint.Authority;
 033            this.pumpCancellationToken = pumpCancellationToken;
 034            this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls);
 035            this.diagnosticSource = new ServiceBusDiagnosticSource(messageReceiver.Path, endpoint);
 036        }
 37
 38        public void StartPump()
 39        {
 040            TaskExtensionHelper.Schedule(() => this.MessagePumpTaskAsync());
 041        }
 42
 43        bool ShouldRenewLock()
 44        {
 045            return
 046                this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock &&
 047                this.registerHandlerOptions.AutoRenewLock;
 48        }
 49
 50        Task RaiseExceptionReceived(Exception e, string action)
 51        {
 052            var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.messageReceiver.Path, this.mes
 053            return this.registerHandlerOptions.RaiseExceptionReceived(eventArgs);
 54        }
 55
 56        async Task MessagePumpTaskAsync()
 57        {
 058            while (!this.pumpCancellationToken.IsCancellationRequested)
 59            {
 60                try
 61                {
 062                    await this.maxConcurrentCallsSemaphoreSlim.WaitAsync(this.pumpCancellationToken).ConfigureAwait(fals
 63
 064                    TaskExtensionHelper.Schedule(async () =>
 065                    {
 066                        Message message = null;
 067                        try
 068                        {
 069                            message = await this.messageReceiver.ReceiveAsync(this.registerHandlerOptions.ReceiveTimeOut
 070                            if (message != null)
 071                            {
 072                                MessagingEventSource.Log.MessageReceiverPumpTaskStart(this.messageReceiver.ClientId, mes
 073
 074                                TaskExtensionHelper.Schedule(() =>
 075                                {
 076                                    if (ServiceBusDiagnosticSource.IsEnabled())
 077                                    {
 078                                        return this.MessageDispatchTaskInstrumented(message);
 079                                    }
 080                                    else
 081                                    {
 082                                        return this.MessageDispatchTask(message);
 083                                    }
 084                                });
 085                            }
 086                        }
 087                        catch (OperationCanceledException) when (pumpCancellationToken.IsCancellationRequested)
 088                        {
 089                            // Ignore as we are stopping the pump
 090                        }
 091                        catch (ObjectDisposedException) when (pumpCancellationToken.IsCancellationRequested)
 092                        {
 093                            // Ignore as we are stopping the pump
 094                        }
 095                        catch (Exception exception)
 096                        {
 097                            MessagingEventSource.Log.MessageReceivePumpTaskException(this.messageReceiver.ClientId, stri
 098                            await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Receive).Confi
 099                        }
 0100                        finally
 0101                        {
 0102                            // Either an exception or for some reason message was null, release semaphore and retry.
 0103                            if (message == null)
 0104                            {
 0105                                this.maxConcurrentCallsSemaphoreSlim.Release();
 0106                                MessagingEventSource.Log.MessageReceiverPumpTaskStop(this.messageReceiver.ClientId, this
 0107                            }
 0108                        }
 0109                    });
 0110                }
 0111                catch (OperationCanceledException) when (pumpCancellationToken.IsCancellationRequested)
 112                {
 113                    // Ignore as we are stopping the pump
 0114                }
 0115                catch (ObjectDisposedException) when (pumpCancellationToken.IsCancellationRequested)
 116                {
 117                    // Ignore as we are stopping the pump
 0118                }
 0119                catch (Exception exception)
 120                {
 0121                    MessagingEventSource.Log.MessageReceivePumpTaskException(this.messageReceiver.ClientId, string.Empty
 0122                    await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Receive).ConfigureAwai
 123                }
 124            }
 0125        }
 126
 127        async Task MessageDispatchTaskInstrumented(Message message)
 128        {
 0129            Activity activity = this.diagnosticSource.ProcessStart(message);
 0130            Task processTask = null;
 131            try
 132            {
 0133                processTask = MessageDispatchTask(message);
 0134                await processTask.ConfigureAwait(false);
 0135            }
 0136            catch (Exception e)
 137            {
 0138                this.diagnosticSource.ReportException(e);
 0139                throw;
 140            }
 141            finally
 142            {
 0143                this.diagnosticSource.ProcessStop(activity, message, processTask?.Status);
 144            }
 0145        }
 146
 147        async Task MessageDispatchTask(Message message)
 148        {
 0149            CancellationTokenSource renewLockCancellationTokenSource = null;
 0150            Timer autoRenewLockCancellationTimer = null;
 151
 0152            MessagingEventSource.Log.MessageReceiverPumpDispatchTaskStart(this.messageReceiver.ClientId, message);
 153
 0154            if (this.ShouldRenewLock())
 155            {
 0156                renewLockCancellationTokenSource = new CancellationTokenSource();
 0157                TaskExtensionHelper.Schedule(() => this.RenewMessageLockTask(message, renewLockCancellationTokenSource.T
 158
 159                // After a threshold time of renewal('AutoRenewTimeout'), create timer to cancel anymore renewals.
 0160                autoRenewLockCancellationTimer = new Timer(this.CancelAutoRenewLock, renewLockCancellationTokenSource, t
 161            }
 162
 163            try
 164            {
 0165                MessagingEventSource.Log.MessageReceiverPumpUserCallbackStart(this.messageReceiver.ClientId, message);
 0166                await this.onMessageCallback(message, this.pumpCancellationToken).ConfigureAwait(false);
 167
 0168                MessagingEventSource.Log.MessageReceiverPumpUserCallbackStop(this.messageReceiver.ClientId, message);
 0169            }
 0170            catch (Exception exception)
 171            {
 0172                MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message
 0173                await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwa
 174
 175                // Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
 0176                if (!(exception is MessageLockLostException))
 177                {
 0178                    await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
 179                }
 180
 0181                if (ServiceBusDiagnosticSource.IsEnabled())
 182                {
 0183                    this.diagnosticSource.ReportException(exception);
 184                }
 185                // AbandonMessageIfNeededAsync should take care of not throwing exception
 0186                this.maxConcurrentCallsSemaphoreSlim.Release();
 187
 0188                return;
 189            }
 190            finally
 191            {
 0192                renewLockCancellationTokenSource?.Cancel();
 0193                renewLockCancellationTokenSource?.Dispose();
 0194                autoRenewLockCancellationTimer?.Dispose();
 195            }
 196
 197            // If we've made it this far, user callback completed fine. Complete message and Release semaphore.
 0198            await this.CompleteMessageIfNeededAsync(message).ConfigureAwait(false);
 0199            this.maxConcurrentCallsSemaphoreSlim.Release();
 200
 0201            MessagingEventSource.Log.MessageReceiverPumpDispatchTaskStop(this.messageReceiver.ClientId, message, this.ma
 0202        }
 203
 204        void CancelAutoRenewLock(object state)
 205        {
 0206            var renewLockCancellationTokenSource = (CancellationTokenSource)state;
 207            try
 208            {
 0209                renewLockCancellationTokenSource.Cancel();
 0210            }
 0211            catch (ObjectDisposedException)
 212            {
 213                // Ignore this race.
 0214            }
 0215        }
 216
 217        async Task AbandonMessageIfNeededAsync(Message message)
 218        {
 219            try
 220            {
 0221                if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock)
 222                {
 0223                    await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
 224                }
 0225            }
 0226            catch (Exception exception)
 227            {
 0228                await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Abandon).ConfigureAwait(fa
 229            }
 0230        }
 231
 232        async Task CompleteMessageIfNeededAsync(Message message)
 233        {
 234            try
 235            {
 0236                if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock &&
 0237                    this.registerHandlerOptions.AutoComplete)
 238                {
 0239                    await this.messageReceiver.CompleteAsync(new[] { message.SystemProperties.LockToken }).ConfigureAwai
 240                }
 0241            }
 0242            catch (Exception exception)
 243            {
 0244                await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.Complete).ConfigureAwait(f
 245            }
 0246        }
 247
 248        async Task RenewMessageLockTask(Message message, CancellationToken renewLockCancellationToken)
 249        {
 0250            while (!this.pumpCancellationToken.IsCancellationRequested &&
 0251                   !renewLockCancellationToken.IsCancellationRequested)
 252            {
 253                try
 254                {
 0255                    var amount = MessagingUtilities.CalculateRenewAfterDuration(message.SystemProperties.LockedUntilUtc)
 0256                    MessagingEventSource.Log.MessageReceiverPumpRenewMessageStart(this.messageReceiver.ClientId, message
 257
 258                    // We're awaiting the task created by 'ContinueWith' to avoid awaiting the Delay task which may be c
 259                    // by the renewLockCancellationToken. This way we prevent a TaskCanceledException.
 0260                    var delayTask = await Task.Delay(amount, renewLockCancellationToken)
 0261                        .ContinueWith(t => t, TaskContinuationOptions.ExecuteSynchronously)
 0262                        .ConfigureAwait(false);
 0263                    if (delayTask.IsCanceled)
 264                    {
 0265                        break;
 266                    }
 267
 0268                    if (!this.pumpCancellationToken.IsCancellationRequested &&
 0269                        !renewLockCancellationToken.IsCancellationRequested)
 270                    {
 0271                        await this.messageReceiver.RenewLockAsync(message).ConfigureAwait(false);
 0272                        MessagingEventSource.Log.MessageReceiverPumpRenewMessageStop(this.messageReceiver.ClientId, mess
 273                    }
 274                    else
 275                    {
 0276                        break;
 277                    }
 0278                }
 0279                catch (Exception exception)
 280                {
 0281                    MessagingEventSource.Log.MessageReceiverPumpRenewMessageException(this.messageReceiver.ClientId, mes
 282
 283                    // ObjectDisposedException should only happen here because the CancellationToken was disposed at whi
 284                    // this renew exception is not relevant anymore. Lets not bother user with this exception.
 0285                    if (!(exception is ObjectDisposedException))
 286                    {
 0287                        await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.RenewLock).Configu
 288                    }
 289
 0290                    if (!MessagingUtilities.ShouldRetry(exception))
 291                    {
 292                        break;
 293                    }
 0294                }
 295            }
 0296        }
 297    }
 298}