< Summary

Class:Microsoft.Azure.EventHubs.ServiceFabricProcessor.ServiceFabricProcessor
Assembly:Microsoft.Azure.EventHubs.ServiceFabricProcessor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.ServiceFabricProcessor\src\ServiceFabricProcessor.cs
Covered lines:148
Uncovered lines:67
Coverable lines:215
Total lines:521
Line coverage:68.8% (148 of 215)
Covered branches:53
Total branches:86
Branch coverage:61.6% (53 of 86)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-80%56.25%
get_EventHubClientFactory()-100%100%
get_TestMode()-100%100%
get_MockMode()-100%100%
RunAsync()-83.33%50%
InnerRunAsync()-72.37%66.67%
RetryWrapper(...)-26.32%25%
get_MaxBatchSize()-100%100%
Microsoft-Azure-EventHubs-IPartitionReceiveHandler-ProcessEventsAsync()-73.68%83.33%
Microsoft.Azure.EventHubs.IPartitionReceiveHandler.ProcessErrorAsync(...)-0%0%
SafeProcessError(...)-0%100%
CheckpointStartup()-81.82%75%
GetServicePartitionId()-100%75%
MetricsHandler()-93.33%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.ServiceFabricProcessor\src\ServiceFabricProcessor.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
 3
 4namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Fabric;
 9    using System.Fabric.Description;
 10    using System.Fabric.Query;
 11    using System.Threading;
 12    using System.Threading.Tasks;
 13    using Microsoft.ServiceFabric.Data;
 14
 15    /// <summary>
 16    /// Base class that implements event processor functionality.
 17    /// </summary>
 18    public class ServiceFabricProcessor : IPartitionReceiveHandler
 19    {
 20        // Service Fabric objects initialized in constructor
 21        private readonly IReliableStateManager serviceStateManager;
 22        private readonly Uri serviceFabricServiceName;
 23        private readonly Guid serviceFabricPartitionId;
 24        private readonly IStatefulServicePartition servicePartition;
 25
 26        // ServiceFabricProcessor settings initialized in constructor
 27        private readonly IEventProcessor userEventProcessor;
 28        private readonly EventProcessorOptions options;
 29        private readonly ICheckpointMananger checkpointManager;
 30
 31        // Initialized during RunAsync startup
 1232        private int fabricPartitionOrdinal = -1;
 1233        private int servicePartitionCount = -1;
 34        private string hubPartitionId;
 35        private PartitionContext partitionContext;
 36        private string initialOffset;
 37        private CancellationTokenSource internalCanceller;
 38        private Exception internalFatalException;
 39        private CancellationToken linkedCancellationToken;
 40        private EventHubsConnectionStringBuilder ehConnectionString;
 41        private string consumerGroupName;
 42
 43        // Value managed by RunAsync
 44        private int running = 0;
 45
 46
 47        /// <summary>
 48        /// Constructor. Arguments break down into three groups: (1) Service Fabric objects so this library can access
 49        /// Service Fabric facilities, (2) Event Hub-related arguments which indicate what event hub to receive from and
 50        /// how to process the events, and (3) advanced, which right now consists only of the ability to replace the def
 51        /// reliable dictionary-based checkpoint manager with a user-provided implementation.
 52        /// </summary>
 53        /// <param name="serviceFabricServiceName">Service Fabric Uri found in StatefulServiceContext</param>
 54        /// <param name="serviceFabricPartitionId">Service Fabric partition id found in StatefulServiceContext</param>
 55        /// <param name="stateManager">Service Fabric-provided state manager, provides access to reliable dictionaries</
 56        /// <param name="partition">Service Fabric-provided partition information</param>
 57        /// <param name="userEventProcessor">User's event processor implementation</param>
 58        /// <param name="eventHubConnectionString">Connection string for user's event hub</param>
 59        /// <param name="eventHubConsumerGroup">Name of event hub consumer group to receive from</param>
 60        /// <param name="options">Optional: Options structure for ServiceFabricProcessor library</param>
 61        /// <param name="checkpointManager">Very advanced/optional: user-provided checkpoint manager implementation</par
 1262        public ServiceFabricProcessor(Uri serviceFabricServiceName, Guid serviceFabricPartitionId, IReliableStateManager
 1263            string eventHubConnectionString, string eventHubConsumerGroup,
 1264            EventProcessorOptions options = null, ICheckpointMananger checkpointManager = null)
 65        {
 1266            if (serviceFabricServiceName == null)
 67            {
 068                throw new ArgumentNullException("serviceFabricServiceName is null");
 69            }
 70            if (serviceFabricPartitionId == null)
 71            {
 72                throw new ArgumentNullException("serviceFabricPartitionId is null");
 73            }
 1274            if (stateManager == null)
 75            {
 076                throw new ArgumentNullException("stateManager is null");
 77            }
 1278            if (partition == null)
 79            {
 080                throw new ArgumentNullException("partition is null");
 81            }
 1282            if (userEventProcessor == null)
 83            {
 084                throw new ArgumentNullException("userEventProcessor is null");
 85            }
 1286            if (string.IsNullOrEmpty(eventHubConnectionString))
 87            {
 088                throw new ArgumentException("eventHubConnectionString is null or empty");
 89            }
 1290            if (string.IsNullOrEmpty(eventHubConsumerGroup))
 91            {
 092                throw new ArgumentException("eventHubConsumerGroup is null or empty");
 93            }
 94
 1295            this.serviceFabricServiceName = serviceFabricServiceName;
 1296            this.serviceFabricPartitionId = serviceFabricPartitionId;
 1297            this.serviceStateManager = stateManager;
 1298            this.servicePartition = partition;
 99
 12100            this.userEventProcessor = userEventProcessor;
 101
 12102            this.ehConnectionString = new EventHubsConnectionStringBuilder(eventHubConnectionString);
 12103            this.consumerGroupName = eventHubConsumerGroup;
 104
 12105            this.options = options ?? new EventProcessorOptions();
 12106            this.checkpointManager = checkpointManager ?? new ReliableDictionaryCheckpointMananger(this.serviceStateMana
 107
 12108            this.EventHubClientFactory = new EventHubWrappers.EventHubClientFactory();
 12109            this.TestMode = false;
 12110            this.MockMode = null;
 12111        }
 112
 113        /// <summary>
 114        /// For testing purposes. Do not change after calling RunAsync.
 115        /// </summary>
 36116        public EventHubWrappers.IEventHubClientFactory EventHubClientFactory { get; set; }
 117
 118        /// <summary>
 119        /// For testing purposes. Do not change after calling RunAsync.
 120        /// </summary>
 24121        public bool TestMode { get; set; }
 122
 123        /// <summary>
 124        /// For testing purposes. Do not change after calling RunAsync.
 125        /// </summary>
 36126        public IFabricPartitionLister MockMode { get; set; }
 127
 128        /// <summary>
 129        /// Starts processing of events.
 130        /// </summary>
 131        /// <param name="fabricCancellationToken">Cancellation token provided by Service Fabric, assumed to indicate ins
 132        /// <returns>Task that completes when event processing shuts down.</returns>
 133        public async Task RunAsync(CancellationToken fabricCancellationToken)
 134        {
 12135            if (Interlocked.Exchange(ref this.running, 1) == 1)
 136            {
 0137                EventProcessorEventSource.Current.Message("Already running");
 0138                throw new InvalidOperationException("EventProcessorService.RunAsync has already been called.");
 139            }
 140
 12141            this.internalCanceller = new CancellationTokenSource();
 12142            this.internalFatalException = null;
 143
 144            try
 145            {
 12146                using (CancellationTokenSource linkedCanceller = CancellationTokenSource.CreateLinkedTokenSource(fabricC
 147                {
 12148                    this.linkedCancellationToken = linkedCanceller.Token;
 149
 12150                    await InnerRunAsync().ConfigureAwait(false);
 151
 8152                    this.options.NotifyOnShutdown(null);
 8153                }
 8154            }
 4155            catch (Exception e)
 156            {
 157                // If InnerRunAsync throws, that is intended to be a fatal exception for this instance.
 158                // Catch it here just long enough to log and notify, then rethrow.
 159
 4160                EventProcessorEventSource.Current.Message("THROWING OUT: {0}", e);
 4161                if (e.InnerException != null)
 162                {
 0163                    EventProcessorEventSource.Current.Message("THROWING OUT INNER: {0}", e.InnerException);
 164                }
 4165                this.options.NotifyOnShutdown(e);
 4166                throw e;
 167            }
 8168        }
 169
 170        private async Task InnerRunAsync()
 171        {
 12172            EventHubWrappers.IEventHubClient ehclient = null;
 12173            EventHubWrappers.IPartitionReceiver receiver = null;
 12174            bool processorOpened = false;
 175
 176            try
 177            {
 178                //
 179                // Get Service Fabric partition information.
 180                //
 12181                await GetServicePartitionId(this.linkedCancellationToken).ConfigureAwait(false);
 182
 183                //
 184                // Create EventHubClient and check partition count.
 185                //
 12186                Exception lastException = null;
 12187                EventProcessorEventSource.Current.Message("Creating event hub client");
 36188                lastException = RetryWrapper(() => { ehclient = this.EventHubClientFactory.Create(this.ehConnectionStrin
 12189                if (ehclient == null)
 190                {
 0191                    EventProcessorEventSource.Current.Message("Out of retries event hub client");
 0192                    throw new Exception("Out of retries creating EventHubClient", lastException);
 193                }
 12194                EventProcessorEventSource.Current.Message("Event hub client OK");
 12195                EventProcessorEventSource.Current.Message("Getting event hub info");
 12196                EventHubRuntimeInformation ehInfo = null;
 197                // Lambda MUST be synchronous to work with RetryWrapper!
 36198                lastException = RetryWrapper(() => { ehInfo = ehclient.GetRuntimeInformationAsync().Result; });
 12199                if (ehInfo == null)
 200                {
 0201                    EventProcessorEventSource.Current.Message("Out of retries getting event hub info");
 0202                    throw new Exception("Out of retries getting event hub runtime info", lastException);
 203                }
 12204                if (this.TestMode)
 205                {
 0206                    if (this.servicePartitionCount > ehInfo.PartitionCount)
 207                    {
 0208                        EventProcessorEventSource.Current.Message("TestMode requires event hub partition count larger th
 0209                        throw new EventProcessorConfigurationException("TestMode requires event hub partition count larg
 210                    }
 0211                    else if (this.servicePartitionCount < ehInfo.PartitionCount)
 212                    {
 0213                        EventProcessorEventSource.Current.Message("TestMode: receiving from subset of event hub");
 214                    }
 215                }
 12216                else if (ehInfo.PartitionCount != this.servicePartitionCount)
 217                {
 4218                    EventProcessorEventSource.Current.Message($"Service partition count {this.servicePartitionCount} doe
 4219                    throw new EventProcessorConfigurationException($"Service partition count {this.servicePartitionCount
 220                }
 8221                this.hubPartitionId = ehInfo.PartitionIds[this.fabricPartitionOrdinal];
 222
 223                //
 224                // Generate a PartitionContext now that the required info is available.
 225                //
 8226                this.partitionContext = new PartitionContext(this.linkedCancellationToken, this.hubPartitionId, this.ehC
 227
 228                //
 229                // Start up checkpoint manager and get checkpoint, if any.
 230                //
 8231                await CheckpointStartup(this.linkedCancellationToken).ConfigureAwait(false);
 232
 233                //
 234                // If there was a checkpoint, the offset is in this.initialOffset, so convert it to an EventPosition.
 235                // If no checkpoint, get starting point from user-supplied provider.
 236                //
 8237                EventPosition initialPosition = null;
 8238                if (this.initialOffset != null)
 239                {
 2240                    EventProcessorEventSource.Current.Message($"Initial position from checkpoint, offset {this.initialOf
 2241                    initialPosition = EventPosition.FromOffset(this.initialOffset);
 242                }
 243                else
 244                {
 6245                    initialPosition = this.options.InitialPositionProvider(this.hubPartitionId);
 6246                    EventProcessorEventSource.Current.Message("Initial position from provider");
 247                }
 248
 249                //
 250                // Create receiver.
 251                //
 8252                EventProcessorEventSource.Current.Message("Creating receiver");
 16253                lastException = RetryWrapper(() => { receiver = ehclient.CreateEpochReceiver(this.consumerGroupName, thi
 24254                    Constants.FixedReceiverEpoch, this.options.ClientReceiverOptions); });
 8255                if (receiver == null)
 256                {
 0257                    EventProcessorEventSource.Current.Message("Out of retries creating receiver");
 0258                    throw new Exception("Out of retries creating event hub receiver", lastException);
 259                }
 8260                receiver.PrefetchCount = this.options.PrefetchCount;
 261
 262                //
 263                // Call Open on user's event processor instance.
 264                // If user's Open code fails, treat that as a fatal exception and let it throw out.
 265                //
 8266                EventProcessorEventSource.Current.Message("Creating event processor");
 8267                await this.userEventProcessor.OpenAsync(this.linkedCancellationToken, this.partitionContext).ConfigureAw
 8268                processorOpened = true;
 8269                EventProcessorEventSource.Current.Message("Event processor created and opened OK");
 270
 271                //
 272                // Start metrics reporting. This runs as a separate background thread.
 273                //
 8274                Thread t = new Thread(this.MetricsHandler);
 8275                t.Start();
 276
 277                //
 278                // Receive pump.
 279                //
 8280                EventProcessorEventSource.Current.Message("RunAsync setting handler and waiting");
 8281                this.MaxBatchSize = this.options.MaxBatchSize;
 8282                receiver.SetReceiveHandler(this, this.options.InvokeProcessorAfterReceiveTimeout);
 8283                this.linkedCancellationToken.WaitHandle.WaitOne();
 284
 8285                EventProcessorEventSource.Current.Message("RunAsync continuing, cleanup");
 8286            }
 287            finally
 288            {
 12289                if (processorOpened)
 290                {
 291                    try
 292                    {
 8293                        await this.userEventProcessor.CloseAsync(this.partitionContext, this.linkedCancellationToken.IsC
 8294                    }
 0295                    catch (Exception e)
 296                    {
 0297                        EventProcessorEventSource.Current.Message($"IEventProcessor.CloseAsync threw {e}, continuing cle
 0298                    }
 299                }
 12300                if (receiver != null)
 301                {
 302                    try
 303                    {
 8304                        receiver.SetReceiveHandler(null);
 8305                        await receiver.CloseAsync().ConfigureAwait(false);
 8306                    }
 0307                    catch (Exception e)
 308                    {
 0309                        EventProcessorEventSource.Current.Message($"Receiver close threw {e}, continuing cleanup");
 0310                    }
 311                }
 12312                if (ehclient != null)
 313                {
 314                    try
 315                    {
 12316                        await ehclient.CloseAsync().ConfigureAwait(false);
 12317                    }
 0318                    catch (Exception e)
 319                    {
 0320                        EventProcessorEventSource.Current.Message($"EventHubClient close threw {e}, continuing cleanup")
 0321                    }
 322                }
 12323                if (this.internalFatalException != null)
 324                {
 0325                    throw this.internalFatalException;
 326                }
 327            }
 8328        }
 329
 330        private EventHubsException RetryWrapper(Action action)
 331        {
 32332            EventHubsException lastException = null;
 333
 0334            for (int i = 0; i < Constants.RetryCount; i++)
 335            {
 32336                this.linkedCancellationToken.ThrowIfCancellationRequested();
 337                try
 338                {
 32339                    action.Invoke();
 32340                    break;
 341                }
 0342                catch (EventHubsException e)
 343                {
 0344                    if (!e.IsTransient)
 345                    {
 0346                        throw e;
 347                    }
 0348                    lastException = e;
 0349                }
 0350                catch (AggregateException ae)
 351                {
 0352                    if (ae.InnerException is EventHubsException)
 353                    {
 0354                        EventHubsException ehe = (EventHubsException)ae.InnerException;
 0355                        if (!ehe.IsTransient)
 356                        {
 0357                            throw ehe;
 358                        }
 0359                        lastException = ehe;
 360                    }
 361                    else
 362                    {
 0363                        throw ae;
 364                    }
 0365                }
 366            }
 367
 32368            return lastException;
 369        }
 370
 371        /// <summary>
 372        /// From IPartitionReceiveHandler
 373        /// </summary>
 16374        public int MaxBatchSize { get; set; }
 375
 376        async Task IPartitionReceiveHandler.ProcessEventsAsync(IEnumerable<EventData> events)
 377        {
 226378            IEnumerable<EventData> effectiveEvents = events ?? new List<EventData>(); // convert to empty list if events
 379
 226380            if (events != null)
 381            {
 382                // Save position of last event if we got a real list of events
 226383                IEnumerator<EventData> scanner = effectiveEvents.GetEnumerator();
 226384                EventData last = null;
 2486385                while (scanner.MoveNext())
 386                {
 2260387                    last = scanner.Current;
 388                }
 226389                if (last != null)
 390                {
 226391                    this.partitionContext.SetOffsetAndSequenceNumber(last);
 226392                    if (this.options.EnableReceiverRuntimeMetric)
 393                    {
 0394                        this.partitionContext.RuntimeInformation.Update(last);
 395                    }
 396                }
 397            }
 398
 399            try
 400            {
 226401                await this.userEventProcessor.ProcessEventsAsync(this.linkedCancellationToken, this.partitionContext, ef
 226402            }
 0403            catch (Exception e)
 404            {
 0405                EventProcessorEventSource.Current.Message($"Processing exception on {this.hubPartitionId}: {e}");
 0406                SafeProcessError(this.partitionContext, e);
 0407            }
 408
 4972409            foreach (EventData ev in effectiveEvents)
 410            {
 2260411                ev.Dispose();
 412            }
 226413        }
 414
 415        Task IPartitionReceiveHandler.ProcessErrorAsync(Exception error)
 416        {
 0417            EventProcessorEventSource.Current.Message($"RECEIVE EXCEPTION on {this.hubPartitionId}: {error}");
 0418            SafeProcessError(this.partitionContext, error);
 0419            if (error is EventHubsException)
 420            {
 0421                if (!(error as EventHubsException).IsTransient)
 422                {
 0423                    this.internalFatalException = error;
 0424                    this.internalCanceller.Cancel();
 425                }
 426                // else don't cancel on transient errors
 427            }
 428            else
 429            {
 430                // All other exceptions are assumed fatal.
 0431                this.internalFatalException = error;
 0432                this.internalCanceller.Cancel();
 433            }
 0434            return Task.CompletedTask;
 435        }
 436
 437        private void SafeProcessError(PartitionContext context, Exception error)
 438        {
 439            try
 440            {
 0441                this.userEventProcessor.ProcessErrorAsync(context, error).Wait();
 0442            }
 0443            catch (Exception e)
 444            {
 445                // The user's error notification method has thrown.
 446                // Recursively notifying could easily cause an infinite loop, until the stack runs out.
 447                // So do not notify, just log.
 0448                EventProcessorEventSource.Current.Message($"Error thrown by ProcessErrorASync: {e}");
 0449            }
 0450        }
 451
 452        private async Task CheckpointStartup(CancellationToken cancellationToken)
 453        {
 454            // Set up store and get checkpoint, if any.
 8455            await this.checkpointManager.CreateCheckpointStoreIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
 8456            Checkpoint checkpoint = await this.checkpointManager.CreateCheckpointIfNotExistsAsync(this.hubPartitionId, c
 8457            if (!checkpoint.Valid)
 458            {
 459                // Not actually any existing checkpoint.
 6460                this.initialOffset = null;
 6461                EventProcessorEventSource.Current.Message("No checkpoint");
 462            }
 2463            else if (checkpoint.Version == 1)
 464            {
 2465                this.initialOffset = checkpoint.Offset;
 2466                EventProcessorEventSource.Current.Message($"Checkpoint provides initial offset {this.initialOffset}");
 467            }
 468            else
 469            {
 470                // It's actually a later-version checkpoint but we don't know the details.
 471                // Access it via the V1 interface and hope it does something sensible.
 0472                this.initialOffset = checkpoint.Offset;
 0473                EventProcessorEventSource.Current.Message($"Unexpected checkpoint version {checkpoint.Version}, provided
 474            }
 8475        }
 476
 477        private async Task GetServicePartitionId(CancellationToken cancellationToken)
 478        {
 12479            if (this.fabricPartitionOrdinal == -1)
 480            {
 12481                IFabricPartitionLister lister = this.MockMode ?? new ServiceFabricPartitionLister();
 482
 12483                this.servicePartitionCount = await lister.GetServiceFabricPartitionCount(this.serviceFabricServiceName).
 484
 12485                this.fabricPartitionOrdinal = await lister.GetServiceFabricPartitionOrdinal(this.serviceFabricPartitionI
 486
 12487                EventProcessorEventSource.Current.Message($"Total partitions {this.servicePartitionCount}");
 12488                EventProcessorEventSource.Current.Message($"Partition ordinal {this.fabricPartitionOrdinal}");
 489                // TODO check that ordinal is not -1
 12490            }
 12491        }
 492
 493        private void MetricsHandler()
 494        {
 8495            EventProcessorEventSource.Current.Message("METRIC reporter starting");
 496
 16497            while (!this.linkedCancellationToken.IsCancellationRequested)
 498            {
 8499                Dictionary<string, int> userMetrics = this.userEventProcessor.GetLoadMetric(this.linkedCancellationToken
 500
 501                try
 502                {
 8503                    List<LoadMetric> reportableMetrics = new List<LoadMetric>();
 32504                    foreach (KeyValuePair<string, int> metric in userMetrics)
 505                    {
 8506                        EventProcessorEventSource.Current.Message($"METRIC {metric.Key} for partition {this.partitionCon
 8507                        reportableMetrics.Add(new LoadMetric(metric.Key, metric.Value));
 508                    }
 8509                    this.servicePartition.ReportLoad(reportableMetrics);
 8510                    Task.Delay(Constants.MetricReportingInterval, this.linkedCancellationToken).Wait(); // throws on can
 0511                }
 8512                catch (Exception e)
 513                {
 8514                    EventProcessorEventSource.Current.Message($"METRIC partition {this.partitionContext.PartitionId} exc
 8515                }
 516            }
 517
 8518            EventProcessorEventSource.Current.Message("METRIC reporter exiting");
 8519        }
 520    }
 521}