< Summary

Class:Azure.Messaging.EventHubs.Processor.Samples.Sample07_RestartProcessingOnError
Assembly:Azure.Messaging.EventHubs.Processor.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample07_RestartProcessingOnError.cs
Covered lines:0
Uncovered lines:77
Coverable lines:77
Total lines:232
Line coverage:0% (0 of 77)
Covered branches:0
Total branches:33
Branch coverage:0% (0 of 33)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%
<RunAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample07_RestartProcessingOnError.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Text;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Messaging.EventHubs.Consumer;
 10using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure;
 11using Azure.Messaging.EventHubs.Producer;
 12using Azure.Storage.Blobs;
 13
 14namespace Azure.Messaging.EventHubs.Processor.Samples
 15{
 16    /// <summary>
 17    ///   An example of stopping and restarting the Event Processor client when a specific error is encountered.
 18    /// </summary>
 19    ///
 20    public class Sample07_RestartProcessingOnError : IEventHubsBlobCheckpointSample
 21    {
 22        /// <summary>
 23        ///   The name of the sample.
 24        /// </summary>
 25        ///
 026        public string Name => nameof(Sample07_RestartProcessingOnError);
 27
 28        /// <summary>
 29        ///   A short description of the sample.
 30        /// </summary>
 31        ///
 032        public string Description => "An example of stopping and restarting the Event Processor client when a specific e
 33
 34        /// <summary>
 35        ///   Runs the sample using the specified Event Hubs and Azure storage connection information.
 36        /// </summary>
 37        ///
 38        /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s
 39        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru
 40        /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an
 41        /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be
 42        ///
 43        public async Task RunAsync(string eventHubsConnectionString,
 44                                   string eventHubName,
 45                                   string blobStorageConnectionString,
 46                                   string blobContainerName)
 47        {
 48            // To begin, we'll publish a batch of events for our processor to receive. Because we are not specifying any
 49            // the Event Hubs service will automatically route these to partitions.  We'll split the events into a coupl
 50            // increase the chance they'll be spread around.
 51
 052            var expectedEvents = new List<EventData>()
 053            {
 054               new EventData(Encoding.UTF8.GetBytes("First Event, First Batch")),
 055               new EventData(Encoding.UTF8.GetBytes("Second Event, First Batch")),
 056               new EventData(Encoding.UTF8.GetBytes("Third Event, First Batch")),
 057
 058               new EventData(Encoding.UTF8.GetBytes("First Event, Second Batch")),
 059               new EventData(Encoding.UTF8.GetBytes("Second Event, Second Batch")),
 060               new EventData(Encoding.UTF8.GetBytes("Third Event, Second Batch")),
 061
 062               new EventData(Encoding.UTF8.GetBytes("First Event, Third Batch")),
 063               new EventData(Encoding.UTF8.GetBytes("Second Event, Third Batch")),
 064               new EventData(Encoding.UTF8.GetBytes("Third Event, Third Batch"))
 065            };
 66
 067            int sentIndex = 0;
 068            int numberOfBatches = 3;
 069            int eventsPerBatch = (expectedEvents.Count / numberOfBatches);
 70
 071            await using (var producer = new EventHubProducerClient(eventHubsConnectionString, eventHubName))
 72            {
 073                while (sentIndex < expectedEvents.Count)
 74                {
 075                    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
 76
 077                    for (int index = 0; index < eventsPerBatch; ++index)
 78                    {
 079                        eventBatch.TryAdd(expectedEvents[sentIndex]);
 080                        ++sentIndex;
 81                    }
 82
 083                    await producer.SendAsync(eventBatch);
 084                }
 85            }
 86
 87            // With our events having been published, we'll create an Event Hub Processor to read them.
 88
 089            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
 090            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
 091            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS
 92
 93            // Create a simple handler for event processing.
 94
 095            int eventIndex = 0;
 96
 97            Task processEventHandler(ProcessEventArgs eventArgs)
 98            {
 099                if (eventArgs.CancellationToken.IsCancellationRequested)
 100                {
 0101                    return Task.CompletedTask;
 102                }
 103
 104                try
 105                {
 0106                    ++eventIndex;
 0107                    Console.WriteLine($"Event Received: { Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()) }");
 0108                }
 0109                catch (Exception ex)
 110                {
 0111                    Console.WriteLine();
 0112                    Console.WriteLine($"An error was observed while processing events.  Message: { ex.Message }");
 0113                    Console.WriteLine();
 0114                }
 115
 116                // Because our example handler is running synchronously, we'll manually return a completed
 117                // task.
 118
 0119                return Task.CompletedTask;
 120            };
 121
 122            // The error handler is invoked when there is an exception observed within the Event Processor client; it is
 123            // exceptions in your handler code.  The Event Processor client will make every effort to recover from excep
 124            // processing.  Should an exception that cannot be recovered from is encountered, the processor will forfeit
 125            // that it was processing so that work may redistributed.
 126            //
 127            // For this example, arbitrarily choose to restart processing when an Event Hubs service exception is encoun
 128            // log other exceptions to the console.
 129            //
 130            // It is important to note that this selection is for demonstration purposes only; it does not constitute th
 131            // of action for service errors.  Because the right approach for handling errors can vary greatly between di
 132            // application, you will need to determine the error handling strategy that best suits your scenario.
 133
 134            async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
 135            {
 136                // As with the process event handler, the event arguments contain a cancellation token used by the Event
 137                // that the operation should be canceled.  The handler should respect cancellation as it is able in orde
 138                // Processor client is able to perform its operations efficiently.
 139                //
 140                // The process error handler is not awaited by the Event Processor client and is, instead, executed in a
 141                // means that you may safely interact with the Event Processor client, such as requesting that it stop p
 142
 0143                if (eventArgs.CancellationToken.IsCancellationRequested)
 144                {
 0145                    return;
 146                }
 147
 148                // Because there is no long-running or I/O operation, inspecting the cancellation token again does not m
 149                // However, in real-world processing, it is recommended that you do so as you are able without compromis
 150                // and troubleshooting information.
 151                //
 152                // It is also recommended that the cancellation token be passed to any asynchronous operations that are 
 153
 0154                Console.WriteLine();
 0155                Console.WriteLine("===============================");
 0156                Console.WriteLine($"The error handler was invoked during the operation: { eventArgs.Operation ?? "Unknow
 0157                Console.WriteLine("===============================");
 0158                Console.WriteLine();
 159
 160                // We will not pass the cancellation token from the event arguments here, as it will be
 161                // signaled when we request that the processor stop.
 162
 0163                if (eventArgs.Exception is EventHubsException)
 164                {
 0165                    Console.WriteLine("Detected an service error.  Restarting the processor...");
 166
 0167                    await processor.StopProcessingAsync();
 0168                    await processor.StartProcessingAsync();
 169
 0170                    Console.WriteLine("Processor has been restarted....");
 0171                    Console.WriteLine();
 172                }
 0173            }
 174
 0175            processor.ProcessEventAsync += processEventHandler;
 0176            processor.ProcessErrorAsync += processErrorHandler;
 177
 178            try
 179            {
 0180                Console.WriteLine("Starting the Event Processor client...");
 181
 0182                eventIndex = 0;
 0183                await processor.StartProcessingAsync();
 184
 0185                using var cancellationSource = new CancellationTokenSource();
 0186                cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
 187
 188                // Unfortunately, because the handler is invoked when exceptions are encountered in the Event Processor 
 189                // developers write in the event handlers, there is no reliable way to force an exception.  As a result,
 190                // be able to observe the error handler in action by just running the Event Processor.
 191                //
 192                // Instead, we will wait a short bit to allow processing to take place and then artificially trigger the
 193                // purposes.
 194
 0195                await Task.Delay(TimeSpan.FromMilliseconds(250));
 196
 0197                Console.WriteLine("Triggering the error handler...");
 198
 0199                ProcessErrorEventArgs eventArgs = new ProcessErrorEventArgs("fake", "artificial invoke", new EventHubsEx
 0200                await processErrorHandler(eventArgs);
 201
 0202                while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count))
 203                {
 0204                    await Task.Delay(TimeSpan.FromMilliseconds(250));
 205                }
 206
 207                // Once we arrive at this point, either cancellation was requested or we have processed all of our event
 208                // both cases, we'll want to shut down the processor.
 209
 0210                Console.WriteLine();
 0211                Console.WriteLine("Stopping the processor...");
 212
 0213                await processor.StopProcessingAsync();
 0214            }
 215            finally
 216            {
 217                // It is encouraged that you unregister your handlers when you have finished
 218                // using the Event Processor to ensure proper cleanup.  This is especially
 219                // important when using lambda expressions or handlers in any form that may
 220                // contain closure scopes or hold other references.
 221
 0222                processor.ProcessEventAsync -= processEventHandler;
 0223                processor.ProcessErrorAsync -= processErrorHandler;
 224            }
 225
 226            // The Event Processor client has been stopped and is not explicitly disposable; there
 227            // is nothing further that we need to do for cleanup.
 228
 0229            Console.WriteLine();
 0230        }
 231    }
 232}