< Summary

Class:Microsoft.Azure.EventHubs.Processor.EventHubPartitionPump
Assembly:Microsoft.Azure.EventHubs.Processor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\EventHubPartitionPump.cs
Covered lines:0
Uncovered lines:90
Coverable lines:90
Total lines:209
Line coverage:0% (0 of 90)
Covered branches:0
Total branches:24
Branch coverage:0% (0 of 24)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%100%
OnOpenAsync()-0%0%
OpenClientsAsync()-0%0%
CleanUpClientsAsync()-0%0%
OnClosingAsync(...)-0%100%
.ctor(...)-0%100%
get_MaxBatchSize()-0%100%
ProcessEventsAsync(...)-0%100%
ProcessErrorAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\EventHubPartitionPump.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.EventHubs.Processor
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Threading.Tasks;
 9
 10    class EventHubPartitionPump : PartitionPump
 11    {
 12        EventHubClient eventHubClient;
 13        PartitionReceiver partitionReceiver;
 14        PartitionReceiveHandler partitionReceiveHandler;
 15
 16        public EventHubPartitionPump(EventProcessorHost host, Lease lease)
 017            : base(host, lease)
 18        {
 019        }
 20
 21        protected override async Task OnOpenAsync()
 22        {
 023            bool openedOK = false;
 024            int retryCount = 0;
 025            Exception lastException = null;
 26            do
 27            {
 28                try
 29                {
 030                    await OpenClientsAsync().ConfigureAwait(false);
 031                    openedOK = true;
 032                }
 033                catch (Exception e)
 34                {
 035                    lastException = e;
 036                    ProcessorEventSource.Log.PartitionPumpWarning(
 037                        this.Host.HostName, this.PartitionContext.PartitionId, "Failure creating client or receiver, ret
 38
 39                    // Don't retry if we already lost the lease.
 040                    if (e is ReceiverDisconnectedException)
 41                    {
 042                        break;
 43                    }
 44
 045                    retryCount++;
 046                }
 47            }
 048            while (!openedOK && (retryCount < 5));
 49
 050            if (!openedOK)
 51            {
 52                // IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for exec
 53                // so we can report this error to it instead of the general error handler.
 054                await this.Processor.ProcessErrorAsync(this.PartitionContext, lastException).ConfigureAwait(false);
 055                this.PumpStatus = PartitionPumpStatus.OpenFailed;
 56            }
 57
 058            if (this.PumpStatus == PartitionPumpStatus.Opening)
 59            {
 060                this.partitionReceiveHandler = new PartitionReceiveHandler(this);
 61                // IEventProcessor.OnOpen is called from the base PartitionPump and must have returned in order for exec
 62                // meaning it is safe to set the handler and start calling IEventProcessor.OnEvents.
 63                // Set the status to running before setting the client handler, so the IEventProcessor.OnEvents can neve
 064                this.PumpStatus = PartitionPumpStatus.Running;
 065                this.partitionReceiver.SetReceiveHandler(
 066                    this.partitionReceiveHandler,
 067                    this.Host.EventProcessorOptions.InvokeProcessorAfterReceiveTimeout);
 68            }
 69
 070            if (this.PumpStatus == PartitionPumpStatus.OpenFailed)
 71            {
 072                this.PumpStatus = PartitionPumpStatus.Closing;
 073                await this.CleanUpClientsAsync().ConfigureAwait(false);
 074                this.PumpStatus = PartitionPumpStatus.Closed;
 75            }
 076        }
 77
 78        async Task OpenClientsAsync() // throws EventHubsException, IOException, InterruptedException, ExecutionExceptio
 79        {
 80            // Create new clients
 081            EventPosition eventPosition = await this.PartitionContext.GetInitialOffsetAsync().ConfigureAwait(false);
 082            long epoch = this.Lease.Epoch;
 083            ProcessorEventSource.Log.PartitionPumpCreateClientsStart(this.Host.HostName, this.PartitionContext.Partition
 084                $"Offset:{eventPosition.Offset}, SequenceNumber:{eventPosition.SequenceNumber}, DateTime:{eventPosition.
 085            this.eventHubClient = this.Host.CreateEventHubClient();
 086            this.eventHubClient.WebProxy = this.Host.EventProcessorOptions.WebProxy;
 87
 088            var receiverOptions = new ReceiverOptions()
 089            {
 090                // Enable receiver metrics?
 091                EnableReceiverRuntimeMetric = this.Host.EventProcessorOptions.EnableReceiverRuntimeMetric,
 092
 093                // Use host name as the identifier for debugging purpose
 094                // Shorten host name if name is longer than max allowed lenght.
 095                Identifier = this.Host.HostName.Length > ClientConstants.MaxReceiverIdentifierLength ?
 096                    this.Host.HostName.Substring(0, ClientConstants.MaxReceiverIdentifierLength) : this.Host.HostName
 097            };
 98
 99            // Create new receiver and set options
 0100            this.partitionReceiver = this.eventHubClient.CreateEpochReceiver(
 0101                this.PartitionContext.ConsumerGroupName,
 0102                this.PartitionContext.PartitionId,
 0103                eventPosition,
 0104                epoch,
 0105                receiverOptions);
 106
 0107            this.partitionReceiver.PrefetchCount = this.Host.EventProcessorOptions.PrefetchCount;
 108
 0109            ProcessorEventSource.Log.PartitionPumpCreateClientsStop(this.Host.HostName, this.PartitionContext.PartitionI
 0110        }
 111
 112        async Task CleanUpClientsAsync() // swallows all exceptions
 113        {
 0114            if (this.partitionReceiver != null)
 115            {
 116                // Taking the lock means that there is no ProcessEventsAsync call in progress.
 117                Task closeTask;
 0118                using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false))
 119                {
 120                    // Calling PartitionReceiver.CloseAsync will gracefully close the IPartitionReceiveHandler we have i
 0121                    ProcessorEventSource.Log.PartitionPumpInfo(this.Host.HostName, this.PartitionContext.PartitionId, "C
 0122                    closeTask = this.partitionReceiver.CloseAsync();
 0123                }
 124
 0125                await closeTask.ConfigureAwait(false);
 0126                this.partitionReceiver = null;
 127            }
 128
 0129            if (this.eventHubClient != null)
 130            {
 0131                ProcessorEventSource.Log.PartitionPumpInfo(this.Host.HostName, this.PartitionContext.PartitionId, "Closi
 0132                await this.eventHubClient.CloseAsync().ConfigureAwait(false);
 0133                this.eventHubClient = null;
 134            }
 0135        }
 136
 137        protected override Task OnClosingAsync(CloseReason reason)
 138        {
 139            // Close the EH clients. Errors are swallowed, nothing we could do about them anyway.
 0140            return CleanUpClientsAsync();
 141        }
 142
 143        class PartitionReceiveHandler : IPartitionReceiveHandler
 144        {
 145            readonly EventHubPartitionPump eventHubPartitionPump;
 146
 0147            public PartitionReceiveHandler(EventHubPartitionPump eventHubPartitionPump)
 148            {
 0149                this.eventHubPartitionPump = eventHubPartitionPump;
 0150                this.MaxBatchSize = eventHubPartitionPump.Host.EventProcessorOptions.MaxBatchSize;
 0151            }
 152
 0153            public int MaxBatchSize { get; set; }
 154
 155            public Task ProcessEventsAsync(IEnumerable<EventData> events)
 156            {
 157                // This method is called on the thread that the EH client uses to run the pump.
 158                // There is one pump per EventHubClient. Since each PartitionPump creates a new EventHubClient,
 159                // using that thread to call OnEvents does no harm. Even if OnEvents is slow, the pump will
 160                // get control back each time OnEvents returns, and be able to receive a new batch of messages
 161                // with which to make the next OnEvents call. The pump gains nothing by running faster than OnEvents.
 0162                return this.eventHubPartitionPump.ProcessEventsAsync(events);
 163            }
 164
 165            public async Task ProcessErrorAsync(Exception error)
 166            {
 167                bool faultPump;
 168
 0169                if (error == null)
 170                {
 0171                    error = new Exception("No error info supplied by EventHub client");
 172                }
 173
 0174                if (error is ReceiverDisconnectedException)
 175                {
 176                    // Trace as warning since ReceiverDisconnectedException is part of lease stealing logic.
 0177                    ProcessorEventSource.Log.PartitionPumpWarning(
 0178                        this.eventHubPartitionPump.Host.HostName, this.eventHubPartitionPump.PartitionContext.PartitionI
 0179                        "EventHub client disconnected, probably another host took the partition", error.Message);
 180
 181                    // Shutdown the message pump when receiver is disconnected.
 0182                    faultPump = true;
 183                }
 184                else
 185                {
 0186                    ProcessorEventSource.Log.PartitionPumpError(
 0187                        this.eventHubPartitionPump.Host.HostName, this.eventHubPartitionPump.PartitionContext.PartitionI
 188
 189                    // No need to fault the pump, we expect receiver to recover on its own.
 0190                    faultPump = false;
 191                }
 192
 193                try
 194                {
 195                    // We would like to deliver all errors in the pump to error handler.
 0196                    await this.eventHubPartitionPump.ProcessErrorAsync(error).ConfigureAwait(false);
 0197                }
 198                finally
 199                {
 200                    // Fault pump only when needed.
 0201                    if (faultPump)
 202                    {
 0203                        this.eventHubPartitionPump.PumpStatus = PartitionPumpStatus.Errored;
 204                    }
 205                }
 0206            }
 207        }
 208    }
 209}