|   |  | 1 |  | // Copyright (c) Microsoft. All rights reserved. | 
|   |  | 2 |  | // Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; | 
|   |  | 3 |  |  | 
|   |  | 4 |  | namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor | 
|   |  | 5 |  | { | 
|   |  | 6 |  |     using System; | 
|   |  | 7 |  |     using System.Collections.Generic; | 
|   |  | 8 |  |     using System.Fabric; | 
|   |  | 9 |  |     using System.Fabric.Description; | 
|   |  | 10 |  |     using System.Fabric.Query; | 
|   |  | 11 |  |     using System.Threading; | 
|   |  | 12 |  |     using System.Threading.Tasks; | 
|   |  | 13 |  |     using Microsoft.ServiceFabric.Data; | 
|   |  | 14 |  |  | 
|   |  | 15 |  |     /// <summary> | 
|   |  | 16 |  |     /// Base class that implements event processor functionality. | 
|   |  | 17 |  |     /// </summary> | 
|   |  | 18 |  |     public class ServiceFabricProcessor : IPartitionReceiveHandler | 
|   |  | 19 |  |     { | 
|   |  | 20 |  |         // Service Fabric objects initialized in constructor | 
|   |  | 21 |  |         private readonly IReliableStateManager serviceStateManager; | 
|   |  | 22 |  |         private readonly Uri serviceFabricServiceName; | 
|   |  | 23 |  |         private readonly Guid serviceFabricPartitionId; | 
|   |  | 24 |  |         private readonly IStatefulServicePartition servicePartition; | 
|   |  | 25 |  |  | 
|   |  | 26 |  |         // ServiceFabricProcessor settings initialized in constructor | 
|   |  | 27 |  |         private readonly IEventProcessor userEventProcessor; | 
|   |  | 28 |  |         private readonly EventProcessorOptions options; | 
|   |  | 29 |  |         private readonly ICheckpointMananger checkpointManager; | 
|   |  | 30 |  |  | 
|   |  | 31 |  |         // Initialized during RunAsync startup | 
|   | 12 | 32 |  |         private int fabricPartitionOrdinal = -1; | 
|   | 12 | 33 |  |         private int servicePartitionCount = -1; | 
|   |  | 34 |  |         private string hubPartitionId; | 
|   |  | 35 |  |         private PartitionContext partitionContext; | 
|   |  | 36 |  |         private string initialOffset; | 
|   |  | 37 |  |         private CancellationTokenSource internalCanceller; | 
|   |  | 38 |  |         private Exception internalFatalException; | 
|   |  | 39 |  |         private CancellationToken linkedCancellationToken; | 
|   |  | 40 |  |         private EventHubsConnectionStringBuilder ehConnectionString; | 
|   |  | 41 |  |         private string consumerGroupName; | 
|   |  | 42 |  |  | 
|   |  | 43 |  |         // Value managed by RunAsync | 
|   |  | 44 |  |         private int running = 0; | 
|   |  | 45 |  |  | 
|   |  | 46 |  |  | 
|   |  | 47 |  |         /// <summary> | 
|   |  | 48 |  |         /// Constructor. Arguments break down into three groups: (1) Service Fabric objects so this library can access | 
|   |  | 49 |  |         /// Service Fabric facilities, (2) Event Hub-related arguments which indicate what event hub to receive from and | 
|   |  | 50 |  |         /// how to process the events, and (3) advanced, which right now consists only of the ability to replace the def | 
|   |  | 51 |  |         /// reliable dictionary-based checkpoint manager with a user-provided implementation. | 
|   |  | 52 |  |         /// </summary> | 
|   |  | 53 |  |         /// <param name="serviceFabricServiceName">Service Fabric Uri found in StatefulServiceContext</param> | 
|   |  | 54 |  |         /// <param name="serviceFabricPartitionId">Service Fabric partition id found in StatefulServiceContext</param> | 
|   |  | 55 |  |         /// <param name="stateManager">Service Fabric-provided state manager, provides access to reliable dictionaries</ | 
|   |  | 56 |  |         /// <param name="partition">Service Fabric-provided partition information</param> | 
|   |  | 57 |  |         /// <param name="userEventProcessor">User's event processor implementation</param> | 
|   |  | 58 |  |         /// <param name="eventHubConnectionString">Connection string for user's event hub</param> | 
|   |  | 59 |  |         /// <param name="eventHubConsumerGroup">Name of event hub consumer group to receive from</param> | 
|   |  | 60 |  |         /// <param name="options">Optional: Options structure for ServiceFabricProcessor library</param> | 
|   |  | 61 |  |         /// <param name="checkpointManager">Very advanced/optional: user-provided checkpoint manager implementation</par | 
|   | 12 | 62 |  |         public ServiceFabricProcessor(Uri serviceFabricServiceName, Guid serviceFabricPartitionId, IReliableStateManager | 
|   | 12 | 63 |  |             string eventHubConnectionString, string eventHubConsumerGroup, | 
|   | 12 | 64 |  |             EventProcessorOptions options = null, ICheckpointMananger checkpointManager = null) | 
|   |  | 65 |  |         { | 
|   | 12 | 66 |  |             if (serviceFabricServiceName == null) | 
|   |  | 67 |  |             { | 
|   | 0 | 68 |  |                 throw new ArgumentNullException("serviceFabricServiceName is null"); | 
|   |  | 69 |  |             } | 
|   |  | 70 |  |             if (serviceFabricPartitionId == null) | 
|   |  | 71 |  |             { | 
|   |  | 72 |  |                 throw new ArgumentNullException("serviceFabricPartitionId is null"); | 
|   |  | 73 |  |             } | 
|   | 12 | 74 |  |             if (stateManager == null) | 
|   |  | 75 |  |             { | 
|   | 0 | 76 |  |                 throw new ArgumentNullException("stateManager is null"); | 
|   |  | 77 |  |             } | 
|   | 12 | 78 |  |             if (partition == null) | 
|   |  | 79 |  |             { | 
|   | 0 | 80 |  |                 throw new ArgumentNullException("partition is null"); | 
|   |  | 81 |  |             } | 
|   | 12 | 82 |  |             if (userEventProcessor == null) | 
|   |  | 83 |  |             { | 
|   | 0 | 84 |  |                 throw new ArgumentNullException("userEventProcessor is null"); | 
|   |  | 85 |  |             } | 
|   | 12 | 86 |  |             if (string.IsNullOrEmpty(eventHubConnectionString)) | 
|   |  | 87 |  |             { | 
|   | 0 | 88 |  |                 throw new ArgumentException("eventHubConnectionString is null or empty"); | 
|   |  | 89 |  |             } | 
|   | 12 | 90 |  |             if (string.IsNullOrEmpty(eventHubConsumerGroup)) | 
|   |  | 91 |  |             { | 
|   | 0 | 92 |  |                 throw new ArgumentException("eventHubConsumerGroup is null or empty"); | 
|   |  | 93 |  |             } | 
|   |  | 94 |  |  | 
|   | 12 | 95 |  |             this.serviceFabricServiceName = serviceFabricServiceName; | 
|   | 12 | 96 |  |             this.serviceFabricPartitionId = serviceFabricPartitionId; | 
|   | 12 | 97 |  |             this.serviceStateManager = stateManager; | 
|   | 12 | 98 |  |             this.servicePartition = partition; | 
|   |  | 99 |  |  | 
|   | 12 | 100 |  |             this.userEventProcessor = userEventProcessor; | 
|   |  | 101 |  |  | 
|   | 12 | 102 |  |             this.ehConnectionString = new EventHubsConnectionStringBuilder(eventHubConnectionString); | 
|   | 12 | 103 |  |             this.consumerGroupName = eventHubConsumerGroup; | 
|   |  | 104 |  |  | 
|   | 12 | 105 |  |             this.options = options ?? new EventProcessorOptions(); | 
|   | 12 | 106 |  |             this.checkpointManager = checkpointManager ?? new ReliableDictionaryCheckpointMananger(this.serviceStateMana | 
|   |  | 107 |  |  | 
|   | 12 | 108 |  |             this.EventHubClientFactory = new EventHubWrappers.EventHubClientFactory(); | 
|   | 12 | 109 |  |             this.TestMode = false; | 
|   | 12 | 110 |  |             this.MockMode = null; | 
|   | 12 | 111 |  |         } | 
|   |  | 112 |  |  | 
|   |  | 113 |  |         /// <summary> | 
|   |  | 114 |  |         /// For testing purposes. Do not change after calling RunAsync. | 
|   |  | 115 |  |         /// </summary> | 
|   | 36 | 116 |  |         public EventHubWrappers.IEventHubClientFactory EventHubClientFactory { get; set; } | 
|   |  | 117 |  |  | 
|   |  | 118 |  |         /// <summary> | 
|   |  | 119 |  |         /// For testing purposes. Do not change after calling RunAsync. | 
|   |  | 120 |  |         /// </summary> | 
|   | 24 | 121 |  |         public bool TestMode { get; set; } | 
|   |  | 122 |  |  | 
|   |  | 123 |  |         /// <summary> | 
|   |  | 124 |  |         /// For testing purposes. Do not change after calling RunAsync. | 
|   |  | 125 |  |         /// </summary> | 
|   | 36 | 126 |  |         public IFabricPartitionLister MockMode { get; set; } | 
|   |  | 127 |  |  | 
|   |  | 128 |  |         /// <summary> | 
|   |  | 129 |  |         /// Starts processing of events. | 
|   |  | 130 |  |         /// </summary> | 
|   |  | 131 |  |         /// <param name="fabricCancellationToken">Cancellation token provided by Service Fabric, assumed to indicate ins | 
|   |  | 132 |  |         /// <returns>Task that completes when event processing shuts down.</returns> | 
|   |  | 133 |  |         public async Task RunAsync(CancellationToken fabricCancellationToken) | 
|   |  | 134 |  |         { | 
|   | 12 | 135 |  |             if (Interlocked.Exchange(ref this.running, 1) == 1) | 
|   |  | 136 |  |             { | 
|   | 0 | 137 |  |                 EventProcessorEventSource.Current.Message("Already running"); | 
|   | 0 | 138 |  |                 throw new InvalidOperationException("EventProcessorService.RunAsync has already been called."); | 
|   |  | 139 |  |             } | 
|   |  | 140 |  |  | 
|   | 12 | 141 |  |             this.internalCanceller = new CancellationTokenSource(); | 
|   | 12 | 142 |  |             this.internalFatalException = null; | 
|   |  | 143 |  |  | 
|   |  | 144 |  |             try | 
|   |  | 145 |  |             { | 
|   | 12 | 146 |  |                 using (CancellationTokenSource linkedCanceller = CancellationTokenSource.CreateLinkedTokenSource(fabricC | 
|   |  | 147 |  |                 { | 
|   | 12 | 148 |  |                     this.linkedCancellationToken = linkedCanceller.Token; | 
|   |  | 149 |  |  | 
|   | 12 | 150 |  |                     await InnerRunAsync().ConfigureAwait(false); | 
|   |  | 151 |  |  | 
|   | 8 | 152 |  |                     this.options.NotifyOnShutdown(null); | 
|   | 8 | 153 |  |                 } | 
|   | 8 | 154 |  |             } | 
|   | 4 | 155 |  |             catch (Exception e) | 
|   |  | 156 |  |             { | 
|   |  | 157 |  |                 // If InnerRunAsync throws, that is intended to be a fatal exception for this instance. | 
|   |  | 158 |  |                 // Catch it here just long enough to log and notify, then rethrow. | 
|   |  | 159 |  |  | 
|   | 4 | 160 |  |                 EventProcessorEventSource.Current.Message("THROWING OUT: {0}", e); | 
|   | 4 | 161 |  |                 if (e.InnerException != null) | 
|   |  | 162 |  |                 { | 
|   | 0 | 163 |  |                     EventProcessorEventSource.Current.Message("THROWING OUT INNER: {0}", e.InnerException); | 
|   |  | 164 |  |                 } | 
|   | 4 | 165 |  |                 this.options.NotifyOnShutdown(e); | 
|   | 4 | 166 |  |                 throw e; | 
|   |  | 167 |  |             } | 
|   | 8 | 168 |  |         } | 
|   |  | 169 |  |  | 
|   |  | 170 |  |         private async Task InnerRunAsync() | 
|   |  | 171 |  |         { | 
|   | 12 | 172 |  |             EventHubWrappers.IEventHubClient ehclient = null; | 
|   | 12 | 173 |  |             EventHubWrappers.IPartitionReceiver receiver = null; | 
|   | 12 | 174 |  |             bool processorOpened = false; | 
|   |  | 175 |  |  | 
|   |  | 176 |  |             try | 
|   |  | 177 |  |             { | 
|   |  | 178 |  |                 // | 
|   |  | 179 |  |                 // Get Service Fabric partition information. | 
|   |  | 180 |  |                 // | 
|   | 12 | 181 |  |                 await GetServicePartitionId(this.linkedCancellationToken).ConfigureAwait(false); | 
|   |  | 182 |  |  | 
|   |  | 183 |  |                 // | 
|   |  | 184 |  |                 // Create EventHubClient and check partition count. | 
|   |  | 185 |  |                 // | 
|   | 12 | 186 |  |                 Exception lastException = null; | 
|   | 12 | 187 |  |                 EventProcessorEventSource.Current.Message("Creating event hub client"); | 
|   | 36 | 188 |  |                 lastException = RetryWrapper(() => { ehclient = this.EventHubClientFactory.Create(this.ehConnectionStrin | 
|   | 12 | 189 |  |                 if (ehclient == null) | 
|   |  | 190 |  |                 { | 
|   | 0 | 191 |  |                     EventProcessorEventSource.Current.Message("Out of retries event hub client"); | 
|   | 0 | 192 |  |                     throw new Exception("Out of retries creating EventHubClient", lastException); | 
|   |  | 193 |  |                 } | 
|   | 12 | 194 |  |                 EventProcessorEventSource.Current.Message("Event hub client OK"); | 
|   | 12 | 195 |  |                 EventProcessorEventSource.Current.Message("Getting event hub info"); | 
|   | 12 | 196 |  |                 EventHubRuntimeInformation ehInfo = null; | 
|   |  | 197 |  |                 // Lambda MUST be synchronous to work with RetryWrapper! | 
|   | 36 | 198 |  |                 lastException = RetryWrapper(() => { ehInfo = ehclient.GetRuntimeInformationAsync().Result; }); | 
|   | 12 | 199 |  |                 if (ehInfo == null) | 
|   |  | 200 |  |                 { | 
|   | 0 | 201 |  |                     EventProcessorEventSource.Current.Message("Out of retries getting event hub info"); | 
|   | 0 | 202 |  |                     throw new Exception("Out of retries getting event hub runtime info", lastException); | 
|   |  | 203 |  |                 } | 
|   | 12 | 204 |  |                 if (this.TestMode) | 
|   |  | 205 |  |                 { | 
|   | 0 | 206 |  |                     if (this.servicePartitionCount > ehInfo.PartitionCount) | 
|   |  | 207 |  |                     { | 
|   | 0 | 208 |  |                         EventProcessorEventSource.Current.Message("TestMode requires event hub partition count larger th | 
|   | 0 | 209 |  |                         throw new EventProcessorConfigurationException("TestMode requires event hub partition count larg | 
|   |  | 210 |  |                     } | 
|   | 0 | 211 |  |                     else if (this.servicePartitionCount < ehInfo.PartitionCount) | 
|   |  | 212 |  |                     { | 
|   | 0 | 213 |  |                         EventProcessorEventSource.Current.Message("TestMode: receiving from subset of event hub"); | 
|   |  | 214 |  |                     } | 
|   |  | 215 |  |                 } | 
|   | 12 | 216 |  |                 else if (ehInfo.PartitionCount != this.servicePartitionCount) | 
|   |  | 217 |  |                 { | 
|   | 4 | 218 |  |                     EventProcessorEventSource.Current.Message($"Service partition count {this.servicePartitionCount} doe | 
|   | 4 | 219 |  |                     throw new EventProcessorConfigurationException($"Service partition count {this.servicePartitionCount | 
|   |  | 220 |  |                 } | 
|   | 8 | 221 |  |                 this.hubPartitionId = ehInfo.PartitionIds[this.fabricPartitionOrdinal]; | 
|   |  | 222 |  |  | 
|   |  | 223 |  |                 // | 
|   |  | 224 |  |                 // Generate a PartitionContext now that the required info is available. | 
|   |  | 225 |  |                 // | 
|   | 8 | 226 |  |                 this.partitionContext = new PartitionContext(this.linkedCancellationToken, this.hubPartitionId, this.ehC | 
|   |  | 227 |  |  | 
|   |  | 228 |  |                 // | 
|   |  | 229 |  |                 // Start up checkpoint manager and get checkpoint, if any. | 
|   |  | 230 |  |                 // | 
|   | 8 | 231 |  |                 await CheckpointStartup(this.linkedCancellationToken).ConfigureAwait(false); | 
|   |  | 232 |  |  | 
|   |  | 233 |  |                 // | 
|   |  | 234 |  |                 // If there was a checkpoint, the offset is in this.initialOffset, so convert it to an EventPosition. | 
|   |  | 235 |  |                 // If no checkpoint, get starting point from user-supplied provider. | 
|   |  | 236 |  |                 // | 
|   | 8 | 237 |  |                 EventPosition initialPosition = null; | 
|   | 8 | 238 |  |                 if (this.initialOffset != null) | 
|   |  | 239 |  |                 { | 
|   | 2 | 240 |  |                     EventProcessorEventSource.Current.Message($"Initial position from checkpoint, offset {this.initialOf | 
|   | 2 | 241 |  |                     initialPosition = EventPosition.FromOffset(this.initialOffset); | 
|   |  | 242 |  |                 } | 
|   |  | 243 |  |                 else | 
|   |  | 244 |  |                 { | 
|   | 6 | 245 |  |                     initialPosition = this.options.InitialPositionProvider(this.hubPartitionId); | 
|   | 6 | 246 |  |                     EventProcessorEventSource.Current.Message("Initial position from provider"); | 
|   |  | 247 |  |                 } | 
|   |  | 248 |  |  | 
|   |  | 249 |  |                 // | 
|   |  | 250 |  |                 // Create receiver. | 
|   |  | 251 |  |                 // | 
|   | 8 | 252 |  |                 EventProcessorEventSource.Current.Message("Creating receiver"); | 
|   | 16 | 253 |  |                 lastException = RetryWrapper(() => { receiver = ehclient.CreateEpochReceiver(this.consumerGroupName, thi | 
|   | 24 | 254 |  |                     Constants.FixedReceiverEpoch, this.options.ClientReceiverOptions); }); | 
|   | 8 | 255 |  |                 if (receiver == null) | 
|   |  | 256 |  |                 { | 
|   | 0 | 257 |  |                     EventProcessorEventSource.Current.Message("Out of retries creating receiver"); | 
|   | 0 | 258 |  |                     throw new Exception("Out of retries creating event hub receiver", lastException); | 
|   |  | 259 |  |                 } | 
|   | 8 | 260 |  |                 receiver.PrefetchCount = this.options.PrefetchCount; | 
|   |  | 261 |  |  | 
|   |  | 262 |  |                 // | 
|   |  | 263 |  |                 // Call Open on user's event processor instance. | 
|   |  | 264 |  |                 // If user's Open code fails, treat that as a fatal exception and let it throw out. | 
|   |  | 265 |  |                 // | 
|   | 8 | 266 |  |                 EventProcessorEventSource.Current.Message("Creating event processor"); | 
|   | 8 | 267 |  |                 await this.userEventProcessor.OpenAsync(this.linkedCancellationToken, this.partitionContext).ConfigureAw | 
|   | 8 | 268 |  |                 processorOpened = true; | 
|   | 8 | 269 |  |                 EventProcessorEventSource.Current.Message("Event processor created and opened OK"); | 
|   |  | 270 |  |  | 
|   |  | 271 |  |                 // | 
|   |  | 272 |  |                 // Start metrics reporting. This runs as a separate background thread. | 
|   |  | 273 |  |                 // | 
|   | 8 | 274 |  |                 Thread t = new Thread(this.MetricsHandler); | 
|   | 8 | 275 |  |                 t.Start(); | 
|   |  | 276 |  |  | 
|   |  | 277 |  |                 // | 
|   |  | 278 |  |                 // Receive pump. | 
|   |  | 279 |  |                 // | 
|   | 8 | 280 |  |                 EventProcessorEventSource.Current.Message("RunAsync setting handler and waiting"); | 
|   | 8 | 281 |  |                 this.MaxBatchSize = this.options.MaxBatchSize; | 
|   | 8 | 282 |  |                 receiver.SetReceiveHandler(this, this.options.InvokeProcessorAfterReceiveTimeout); | 
|   | 8 | 283 |  |                 this.linkedCancellationToken.WaitHandle.WaitOne(); | 
|   |  | 284 |  |  | 
|   | 8 | 285 |  |                 EventProcessorEventSource.Current.Message("RunAsync continuing, cleanup"); | 
|   | 8 | 286 |  |             } | 
|   |  | 287 |  |             finally | 
|   |  | 288 |  |             { | 
|   | 12 | 289 |  |                 if (processorOpened) | 
|   |  | 290 |  |                 { | 
|   |  | 291 |  |                     try | 
|   |  | 292 |  |                     { | 
|   | 8 | 293 |  |                         await this.userEventProcessor.CloseAsync(this.partitionContext, this.linkedCancellationToken.IsC | 
|   | 8 | 294 |  |                     } | 
|   | 0 | 295 |  |                     catch (Exception e) | 
|   |  | 296 |  |                     { | 
|   | 0 | 297 |  |                         EventProcessorEventSource.Current.Message($"IEventProcessor.CloseAsync threw {e}, continuing cle | 
|   | 0 | 298 |  |                     } | 
|   |  | 299 |  |                 } | 
|   | 12 | 300 |  |                 if (receiver != null) | 
|   |  | 301 |  |                 { | 
|   |  | 302 |  |                     try | 
|   |  | 303 |  |                     { | 
|   | 8 | 304 |  |                         receiver.SetReceiveHandler(null); | 
|   | 8 | 305 |  |                         await receiver.CloseAsync().ConfigureAwait(false); | 
|   | 8 | 306 |  |                     } | 
|   | 0 | 307 |  |                     catch (Exception e) | 
|   |  | 308 |  |                     { | 
|   | 0 | 309 |  |                         EventProcessorEventSource.Current.Message($"Receiver close threw {e}, continuing cleanup"); | 
|   | 0 | 310 |  |                     } | 
|   |  | 311 |  |                 } | 
|   | 12 | 312 |  |                 if (ehclient != null) | 
|   |  | 313 |  |                 { | 
|   |  | 314 |  |                     try | 
|   |  | 315 |  |                     { | 
|   | 12 | 316 |  |                         await ehclient.CloseAsync().ConfigureAwait(false); | 
|   | 12 | 317 |  |                     } | 
|   | 0 | 318 |  |                     catch (Exception e) | 
|   |  | 319 |  |                     { | 
|   | 0 | 320 |  |                         EventProcessorEventSource.Current.Message($"EventHubClient close threw {e}, continuing cleanup") | 
|   | 0 | 321 |  |                     } | 
|   |  | 322 |  |                 } | 
|   | 12 | 323 |  |                 if (this.internalFatalException != null) | 
|   |  | 324 |  |                 { | 
|   | 0 | 325 |  |                     throw this.internalFatalException; | 
|   |  | 326 |  |                 } | 
|   |  | 327 |  |             } | 
|   | 8 | 328 |  |         } | 
|   |  | 329 |  |  | 
|   |  | 330 |  |         private EventHubsException RetryWrapper(Action action) | 
|   |  | 331 |  |         { | 
|   | 32 | 332 |  |             EventHubsException lastException = null; | 
|   |  | 333 |  |  | 
|   | 0 | 334 |  |             for (int i = 0; i < Constants.RetryCount; i++) | 
|   |  | 335 |  |             { | 
|   | 32 | 336 |  |                 this.linkedCancellationToken.ThrowIfCancellationRequested(); | 
|   |  | 337 |  |                 try | 
|   |  | 338 |  |                 { | 
|   | 32 | 339 |  |                     action.Invoke(); | 
|   | 32 | 340 |  |                     break; | 
|   |  | 341 |  |                 } | 
|   | 0 | 342 |  |                 catch (EventHubsException e) | 
|   |  | 343 |  |                 { | 
|   | 0 | 344 |  |                     if (!e.IsTransient) | 
|   |  | 345 |  |                     { | 
|   | 0 | 346 |  |                         throw e; | 
|   |  | 347 |  |                     } | 
|   | 0 | 348 |  |                     lastException = e; | 
|   | 0 | 349 |  |                 } | 
|   | 0 | 350 |  |                 catch (AggregateException ae) | 
|   |  | 351 |  |                 { | 
|   | 0 | 352 |  |                     if (ae.InnerException is EventHubsException) | 
|   |  | 353 |  |                     { | 
|   | 0 | 354 |  |                         EventHubsException ehe = (EventHubsException)ae.InnerException; | 
|   | 0 | 355 |  |                         if (!ehe.IsTransient) | 
|   |  | 356 |  |                         { | 
|   | 0 | 357 |  |                             throw ehe; | 
|   |  | 358 |  |                         } | 
|   | 0 | 359 |  |                         lastException = ehe; | 
|   |  | 360 |  |                     } | 
|   |  | 361 |  |                     else | 
|   |  | 362 |  |                     { | 
|   | 0 | 363 |  |                         throw ae; | 
|   |  | 364 |  |                     } | 
|   | 0 | 365 |  |                 } | 
|   |  | 366 |  |             } | 
|   |  | 367 |  |  | 
|   | 32 | 368 |  |             return lastException; | 
|   |  | 369 |  |         } | 
|   |  | 370 |  |  | 
|   |  | 371 |  |         /// <summary> | 
|   |  | 372 |  |         /// From IPartitionReceiveHandler | 
|   |  | 373 |  |         /// </summary> | 
|   | 16 | 374 |  |         public int MaxBatchSize { get; set; } | 
|   |  | 375 |  |  | 
|   |  | 376 |  |         async Task IPartitionReceiveHandler.ProcessEventsAsync(IEnumerable<EventData> events) | 
|   |  | 377 |  |         { | 
|   | 226 | 378 |  |             IEnumerable<EventData> effectiveEvents = events ?? new List<EventData>(); // convert to empty list if events | 
|   |  | 379 |  |  | 
|   | 226 | 380 |  |             if (events != null) | 
|   |  | 381 |  |             { | 
|   |  | 382 |  |                 // Save position of last event if we got a real list of events | 
|   | 226 | 383 |  |                 IEnumerator<EventData> scanner = effectiveEvents.GetEnumerator(); | 
|   | 226 | 384 |  |                 EventData last = null; | 
|   | 2486 | 385 |  |                 while (scanner.MoveNext()) | 
|   |  | 386 |  |                 { | 
|   | 2260 | 387 |  |                     last = scanner.Current; | 
|   |  | 388 |  |                 } | 
|   | 226 | 389 |  |                 if (last != null) | 
|   |  | 390 |  |                 { | 
|   | 226 | 391 |  |                     this.partitionContext.SetOffsetAndSequenceNumber(last); | 
|   | 226 | 392 |  |                     if (this.options.EnableReceiverRuntimeMetric) | 
|   |  | 393 |  |                     { | 
|   | 0 | 394 |  |                         this.partitionContext.RuntimeInformation.Update(last); | 
|   |  | 395 |  |                     } | 
|   |  | 396 |  |                 } | 
|   |  | 397 |  |             } | 
|   |  | 398 |  |  | 
|   |  | 399 |  |             try | 
|   |  | 400 |  |             { | 
|   | 226 | 401 |  |                 await this.userEventProcessor.ProcessEventsAsync(this.linkedCancellationToken, this.partitionContext, ef | 
|   | 226 | 402 |  |             } | 
|   | 0 | 403 |  |             catch (Exception e) | 
|   |  | 404 |  |             { | 
|   | 0 | 405 |  |                 EventProcessorEventSource.Current.Message($"Processing exception on {this.hubPartitionId}: {e}"); | 
|   | 0 | 406 |  |                 SafeProcessError(this.partitionContext, e); | 
|   | 0 | 407 |  |             } | 
|   |  | 408 |  |  | 
|   | 4972 | 409 |  |             foreach (EventData ev in effectiveEvents) | 
|   |  | 410 |  |             { | 
|   | 2260 | 411 |  |                 ev.Dispose(); | 
|   |  | 412 |  |             } | 
|   | 226 | 413 |  |         } | 
|   |  | 414 |  |  | 
|   |  | 415 |  |         Task IPartitionReceiveHandler.ProcessErrorAsync(Exception error) | 
|   |  | 416 |  |         { | 
|   | 0 | 417 |  |             EventProcessorEventSource.Current.Message($"RECEIVE EXCEPTION on {this.hubPartitionId}: {error}"); | 
|   | 0 | 418 |  |             SafeProcessError(this.partitionContext, error); | 
|   | 0 | 419 |  |             if (error is EventHubsException) | 
|   |  | 420 |  |             { | 
|   | 0 | 421 |  |                 if (!(error as EventHubsException).IsTransient) | 
|   |  | 422 |  |                 { | 
|   | 0 | 423 |  |                     this.internalFatalException = error; | 
|   | 0 | 424 |  |                     this.internalCanceller.Cancel(); | 
|   |  | 425 |  |                 } | 
|   |  | 426 |  |                 // else don't cancel on transient errors | 
|   |  | 427 |  |             } | 
|   |  | 428 |  |             else | 
|   |  | 429 |  |             { | 
|   |  | 430 |  |                 // All other exceptions are assumed fatal. | 
|   | 0 | 431 |  |                 this.internalFatalException = error; | 
|   | 0 | 432 |  |                 this.internalCanceller.Cancel(); | 
|   |  | 433 |  |             } | 
|   | 0 | 434 |  |             return Task.CompletedTask; | 
|   |  | 435 |  |         } | 
|   |  | 436 |  |  | 
|   |  | 437 |  |         private void SafeProcessError(PartitionContext context, Exception error) | 
|   |  | 438 |  |         { | 
|   |  | 439 |  |             try | 
|   |  | 440 |  |             { | 
|   | 0 | 441 |  |                 this.userEventProcessor.ProcessErrorAsync(context, error).Wait(); | 
|   | 0 | 442 |  |             } | 
|   | 0 | 443 |  |             catch (Exception e) | 
|   |  | 444 |  |             { | 
|   |  | 445 |  |                 // The user's error notification method has thrown. | 
|   |  | 446 |  |                 // Recursively notifying could easily cause an infinite loop, until the stack runs out. | 
|   |  | 447 |  |                 // So do not notify, just log. | 
|   | 0 | 448 |  |                 EventProcessorEventSource.Current.Message($"Error thrown by ProcessErrorASync: {e}"); | 
|   | 0 | 449 |  |             } | 
|   | 0 | 450 |  |         } | 
|   |  | 451 |  |  | 
|   |  | 452 |  |         private async Task CheckpointStartup(CancellationToken cancellationToken) | 
|   |  | 453 |  |         { | 
|   |  | 454 |  |             // Set up store and get checkpoint, if any. | 
|   | 8 | 455 |  |             await this.checkpointManager.CreateCheckpointStoreIfNotExistsAsync(cancellationToken).ConfigureAwait(false); | 
|   | 8 | 456 |  |             Checkpoint checkpoint = await this.checkpointManager.CreateCheckpointIfNotExistsAsync(this.hubPartitionId, c | 
|   | 8 | 457 |  |             if (!checkpoint.Valid) | 
|   |  | 458 |  |             { | 
|   |  | 459 |  |                 // Not actually any existing checkpoint. | 
|   | 6 | 460 |  |                 this.initialOffset = null; | 
|   | 6 | 461 |  |                 EventProcessorEventSource.Current.Message("No checkpoint"); | 
|   |  | 462 |  |             } | 
|   | 2 | 463 |  |             else if (checkpoint.Version == 1) | 
|   |  | 464 |  |             { | 
|   | 2 | 465 |  |                 this.initialOffset = checkpoint.Offset; | 
|   | 2 | 466 |  |                 EventProcessorEventSource.Current.Message($"Checkpoint provides initial offset {this.initialOffset}"); | 
|   |  | 467 |  |             } | 
|   |  | 468 |  |             else | 
|   |  | 469 |  |             { | 
|   |  | 470 |  |                 // It's actually a later-version checkpoint but we don't know the details. | 
|   |  | 471 |  |                 // Access it via the V1 interface and hope it does something sensible. | 
|   | 0 | 472 |  |                 this.initialOffset = checkpoint.Offset; | 
|   | 0 | 473 |  |                 EventProcessorEventSource.Current.Message($"Unexpected checkpoint version {checkpoint.Version}, provided | 
|   |  | 474 |  |             } | 
|   | 8 | 475 |  |         } | 
|   |  | 476 |  |  | 
|   |  | 477 |  |         private async Task GetServicePartitionId(CancellationToken cancellationToken) | 
|   |  | 478 |  |         { | 
|   | 12 | 479 |  |             if (this.fabricPartitionOrdinal == -1) | 
|   |  | 480 |  |             { | 
|   | 12 | 481 |  |                 IFabricPartitionLister lister = this.MockMode ?? new ServiceFabricPartitionLister(); | 
|   |  | 482 |  |  | 
|   | 12 | 483 |  |                 this.servicePartitionCount = await lister.GetServiceFabricPartitionCount(this.serviceFabricServiceName). | 
|   |  | 484 |  |  | 
|   | 12 | 485 |  |                 this.fabricPartitionOrdinal = await lister.GetServiceFabricPartitionOrdinal(this.serviceFabricPartitionI | 
|   |  | 486 |  |  | 
|   | 12 | 487 |  |                 EventProcessorEventSource.Current.Message($"Total partitions {this.servicePartitionCount}"); | 
|   | 12 | 488 |  |                 EventProcessorEventSource.Current.Message($"Partition ordinal {this.fabricPartitionOrdinal}"); | 
|   |  | 489 |  |                 // TODO check that ordinal is not -1 | 
|   | 12 | 490 |  |             } | 
|   | 12 | 491 |  |         } | 
|   |  | 492 |  |  | 
|   |  | 493 |  |         private void MetricsHandler() | 
|   |  | 494 |  |         { | 
|   | 8 | 495 |  |             EventProcessorEventSource.Current.Message("METRIC reporter starting"); | 
|   |  | 496 |  |  | 
|   | 16 | 497 |  |             while (!this.linkedCancellationToken.IsCancellationRequested) | 
|   |  | 498 |  |             { | 
|   | 8 | 499 |  |                 Dictionary<string, int> userMetrics = this.userEventProcessor.GetLoadMetric(this.linkedCancellationToken | 
|   |  | 500 |  |  | 
|   |  | 501 |  |                 try | 
|   |  | 502 |  |                 { | 
|   | 8 | 503 |  |                     List<LoadMetric> reportableMetrics = new List<LoadMetric>(); | 
|   | 32 | 504 |  |                     foreach (KeyValuePair<string, int> metric in userMetrics) | 
|   |  | 505 |  |                     { | 
|   | 8 | 506 |  |                         EventProcessorEventSource.Current.Message($"METRIC {metric.Key} for partition {this.partitionCon | 
|   | 8 | 507 |  |                         reportableMetrics.Add(new LoadMetric(metric.Key, metric.Value)); | 
|   |  | 508 |  |                     } | 
|   | 8 | 509 |  |                     this.servicePartition.ReportLoad(reportableMetrics); | 
|   | 8 | 510 |  |                     Task.Delay(Constants.MetricReportingInterval, this.linkedCancellationToken).Wait(); // throws on can | 
|   | 0 | 511 |  |                 } | 
|   | 8 | 512 |  |                 catch (Exception e) | 
|   |  | 513 |  |                 { | 
|   | 8 | 514 |  |                     EventProcessorEventSource.Current.Message($"METRIC partition {this.partitionContext.PartitionId} exc | 
|   | 8 | 515 |  |                 } | 
|   |  | 516 |  |             } | 
|   |  | 517 |  |  | 
|   | 8 | 518 |  |             EventProcessorEventSource.Current.Message("METRIC reporter exiting"); | 
|   | 8 | 519 |  |         } | 
|   |  | 520 |  |     } | 
|   |  | 521 |  | } |