< Summary

Class:Microsoft.Azure.EventHubs.Processor.PartitionPump
Assembly:Microsoft.Azure.EventHubs.Processor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\PartitionPump.cs
Covered lines:0
Uncovered lines:89
Coverable lines:89
Total lines:197
Line coverage:0% (0 of 89)
Covered branches:0
Total branches:32
Branch coverage:0% (0 of 32)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%100%
get_Host()-0%100%
get_Lease()-0%100%
get_Processor()-0%100%
get_PartitionContext()-0%100%
get_ProcessingAsyncLock()-0%100%
SetLeaseToken(...)-0%100%
OpenAsync()-0%0%
get_PumpStatus()-0%100%
get_IsClosing()-0%0%
CloseAsync()-0%0%
ProcessEventsAsync()-0%0%
ProcessErrorAsync(...)-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\PartitionPump.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.EventHubs.Processor
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Linq;
 9    using System.Threading;
 10    using System.Threading.Tasks;
 11
 12    abstract class PartitionPump
 13    {
 14        CancellationTokenSource cancellationTokenSource;
 15
 016        protected PartitionPump(EventProcessorHost host, Lease lease)
 17        {
 018            this.Host = host;
 019            this.Lease = lease;
 020            this.ProcessingAsyncLock = new AsyncLock();
 021            this.PumpStatus = PartitionPumpStatus.Uninitialized;
 022        }
 23
 024        protected EventProcessorHost Host { get; }
 25
 026        protected internal Lease Lease { get; }
 27
 028        protected IEventProcessor Processor { get; private set; }
 29
 030        protected PartitionContext PartitionContext { get; private set; }
 31
 032        protected AsyncLock ProcessingAsyncLock { get; }
 33
 34        internal void SetLeaseToken(string newToken)
 35        {
 036            this.PartitionContext.Lease.Token = newToken;
 037        }
 38
 39        public async Task OpenAsync()
 40        {
 041            this.PumpStatus = PartitionPumpStatus.Opening;
 42
 043            this.cancellationTokenSource = new CancellationTokenSource();
 44
 045            this.PartitionContext = new PartitionContext(
 046                this.Host,
 047                this.Lease.PartitionId,
 048                this.Host.EventHubPath,
 049                this.Host.ConsumerGroupName,
 050                this.cancellationTokenSource.Token);
 051            this.PartitionContext.Lease = this.Lease;
 52
 053            if (this.PumpStatus == PartitionPumpStatus.Opening)
 54            {
 055                string action = EventProcessorHostActionStrings.CreatingEventProcessor;
 56                try
 57                {
 058                    this.Processor = this.Host.ProcessorFactory.CreateEventProcessor(this.PartitionContext);
 059                    action = EventProcessorHostActionStrings.OpeningEventProcessor;
 060                    ProcessorEventSource.Log.PartitionPumpInvokeProcessorOpenStart(this.Host.HostName, this.PartitionCon
 061                    await this.Processor.OpenAsync(this.PartitionContext).ConfigureAwait(false);
 062                    ProcessorEventSource.Log.PartitionPumpInvokeProcessorOpenStop(this.Host.HostName, this.PartitionCont
 063                }
 064                catch (Exception e)
 65                {
 66                    // If the processor won't create or open, only thing we can do here is pass the buck.
 67                    // Null it out so we don't try to operate on it further.
 068                    ProcessorEventSource.Log.PartitionPumpError(this.Host.HostName, this.PartitionContext.PartitionId, "
 069                    this.Processor = null;
 070                    this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.Partitio
 071                    this.PumpStatus = PartitionPumpStatus.OpenFailed;
 072                }
 073            }
 74
 075            if (this.PumpStatus == PartitionPumpStatus.Opening)
 76            {
 077                await this.OnOpenAsync().ConfigureAwait(false);
 78            }
 079        }
 80
 81        protected abstract Task OnOpenAsync();
 82
 083        protected internal PartitionPumpStatus PumpStatus { get; protected set; }
 84
 85        internal bool IsClosing
 86        {
 87            get
 88            {
 089                return (this.PumpStatus == PartitionPumpStatus.Closing || this.PumpStatus == PartitionPumpStatus.Closed)
 90            }
 91        }
 92
 93        public async Task CloseAsync(CloseReason reason)
 94        {
 095            ProcessorEventSource.Log.PartitionPumpCloseStart(this.Host.HostName, this.PartitionContext.PartitionId, reas
 096            this.PumpStatus = PartitionPumpStatus.Closing;
 97
 98            // Release lease as the first thing since closing receiver can take up to operation timeout.
 99            // This helps other available hosts discover lease available sooner.
 0100            if (reason != CloseReason.LeaseLost)
 101            {
 102                // Since this pump is dead, release the lease.
 103                try
 104                {
 0105                    await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
 0106                }
 0107                catch (Exception e)
 108                {
 109                    // Log and ignore any failure since expired lease will be picked by another host.
 0110                    this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.Partitio
 0111                }
 112            }
 113
 114            try
 115            {
 0116                this.cancellationTokenSource.Cancel();
 117
 0118                await this.OnClosingAsync(reason).ConfigureAwait(false);
 119
 0120                if (this.Processor != null)
 121                {
 0122                    using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false))
 123                    {
 124                        // When we take the lock, any existing ProcessEventsAsync call has finished.
 125                        // Because the client has been closed, there will not be any more
 126                        // calls to onEvents in the future. Therefore we can safely call CloseAsync.
 0127                        ProcessorEventSource.Log.PartitionPumpInvokeProcessorCloseStart(this.Host.HostName, this.Partiti
 0128                        await this.Processor.CloseAsync(this.PartitionContext, reason).ConfigureAwait(false);
 0129                        ProcessorEventSource.Log.PartitionPumpInvokeProcessorCloseStop(this.Host.HostName, this.Partitio
 0130                    }
 131                }
 0132            }
 0133            catch (Exception e)
 134            {
 0135                ProcessorEventSource.Log.PartitionPumpCloseError(this.Host.HostName, this.PartitionContext.PartitionId, 
 136                // If closing the processor has failed, the state of the processor is suspect.
 137                // Report the failure to the general error handler instead.
 0138                this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId,
 0139            }
 140
 0141            this.PumpStatus = PartitionPumpStatus.Closed;
 0142            ProcessorEventSource.Log.PartitionPumpCloseStop(this.Host.HostName, this.PartitionContext.PartitionId);
 0143        }
 144
 145        protected abstract Task OnClosingAsync(CloseReason reason);
 146
 147        protected async Task ProcessEventsAsync(IEnumerable<EventData> events)
 148        {
 0149            if (events == null)
 150            {
 0151                events = Enumerable.Empty<EventData>();
 152            }
 153
 154            // Synchronize to serialize calls to the processor.
 155            // The handler is not installed until after OpenAsync returns, so ProcessEventsAsync cannot conflict with Op
 156            // There could be a conflict between ProcessEventsAsync and CloseAsync, however. All calls to CloseAsync are
 157            // protected by synchronizing too.
 0158            using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false))
 159            {
 0160                ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStart(this.Host.HostName,
 0161                    this.PartitionContext.PartitionId, events?.Count() ?? 0);
 162                try
 163                {
 0164                    EventData last = events?.LastOrDefault();
 0165                    if (last != null)
 166                    {
 0167                        ProcessorEventSource.Log.PartitionPumpInfo(
 0168                            this.Host.HostName,
 0169                            this.PartitionContext.PartitionId,
 0170                            "Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "
 0171                        this.PartitionContext.SetOffsetAndSequenceNumber(last);
 0172                        if (this.Host.EventProcessorOptions.EnableReceiverRuntimeMetric)
 173                        {
 0174                            this.PartitionContext.RuntimeInformation.Update(last);
 175                        }
 176                    }
 177
 0178                    await this.Processor.ProcessEventsAsync(this.PartitionContext, events).ConfigureAwait(false);
 0179                }
 0180                catch (Exception e)
 181                {
 0182                    ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsError(this.Host.HostName, this.PartitionC
 0183                    await this.ProcessErrorAsync(e).ConfigureAwait(false);
 184                }
 185                finally
 186                {
 0187                    ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStop(this.Host.HostName, this.PartitionCo
 188                }
 0189            }
 0190        }
 191
 192        protected Task ProcessErrorAsync(Exception error)
 193        {
 0194            return this.Processor.ProcessErrorAsync(this.PartitionContext, error);
 195        }
 196    }
 197}