< Summary

Class:Azure.Messaging.EventHubs.Processor.Samples.Sample03_BasicEventProcessing
Assembly:Azure.Messaging.EventHubs.Processor.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample03_BasicEventProcessing.cs
Covered lines:0
Uncovered lines:64
Coverable lines:64
Total lines:243
Line coverage:0% (0 of 64)
Covered branches:0
Total branches:26
Branch coverage:0% (0 of 26)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample03_BasicEventProcessing.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Text;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Messaging.EventHubs.Consumer;
 10using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure;
 11using Azure.Messaging.EventHubs.Producer;
 12using Azure.Storage.Blobs;
 13
 14namespace Azure.Messaging.EventHubs.Processor.Samples
 15{
 16    /// <summary>
 17    ///   An introduction to the Event Processor client, illustrating how to perform basic event processing.
 18    /// </summary>
 19    ///
 20    public class Sample03_BasicEventProcessing : IEventHubsBlobCheckpointSample
 21    {
 22        /// <summary>
 23        ///   The name of the sample.
 24        /// </summary>
 25        ///
 026        public string Name => nameof(Sample03_BasicEventProcessing);
 27
 28        /// <summary>
 29        ///   A short description of the sample.
 30        /// </summary>
 31        ///
 032        public string Description => "An introduction to the Event Processor client, illustrating how to perform basic e
 33
 34        /// <summary>
 35        ///   Runs the sample using the specified Event Hubs and Azure storage connection information.
 36        /// </summary>
 37        ///
 38        /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s
 39        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru
 40        /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an
 41        /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be
 42        ///
 43        public async Task RunAsync(string eventHubsConnectionString,
 44                                   string eventHubName,
 45                                   string blobStorageConnectionString,
 46                                   string blobContainerName)
 47        {
 48            // To begin, we'll publish a batch of events for our processor to receive. Because we are not specifying any
 49            // the Event Hubs service will automatically route these to partitions.  We'll split the events into a coupl
 50            // increase the chance they'll be spread around.
 51
 052            var expectedEvents = new List<EventData>()
 053            {
 054               new EventData(Encoding.UTF8.GetBytes("First Event, First Batch")),
 055               new EventData(Encoding.UTF8.GetBytes("Second Event, First Batch")),
 056               new EventData(Encoding.UTF8.GetBytes("Third Event, First Batch")),
 057
 058               new EventData(Encoding.UTF8.GetBytes("First Event, Second Batch")),
 059               new EventData(Encoding.UTF8.GetBytes("Second Event, Second Batch")),
 060               new EventData(Encoding.UTF8.GetBytes("Third Event, Second Batch")),
 061
 062               new EventData(Encoding.UTF8.GetBytes("First Event, Third Batch")),
 063               new EventData(Encoding.UTF8.GetBytes("Second Event, Third Batch")),
 064               new EventData(Encoding.UTF8.GetBytes("Third Event, Third Batch")),
 065            };
 66
 067            int sentIndex = 0;
 068            int numberOfBatches = 3;
 069            int eventsPerBatch = (expectedEvents.Count / numberOfBatches);
 70
 071            await using (var producer = new EventHubProducerClient(eventHubsConnectionString, eventHubName))
 72            {
 073                while (sentIndex < expectedEvents.Count)
 74                {
 075                    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
 76
 077                    for (int index = 0; index < eventsPerBatch; ++index)
 78                    {
 079                        eventBatch.TryAdd(expectedEvents[sentIndex]);
 080                        ++sentIndex;
 81                    }
 82
 083                    await producer.SendAsync(eventBatch);
 084                }
 85            }
 86
 87            // With our events having been published, we'll create an Event Hub Processor to read them.
 88
 089            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
 090            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
 091            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS
 92
 93            // When creating a handler for processing events, it is important to note that you are responsible for ensur
 94            // takes place within your handler code.  Should an exception go unhandled, the processor will allow it to b
 95            // to route it through the exception handler.
 96
 097            int eventIndex = 0;
 98
 99            Task processEventHandler(ProcessEventArgs eventArgs)
 100            {
 101                // The event arguments contain a cancellation token that the Event Processor client uses to signal
 102                // your handler that processing should stop when possible.  This is most commonly used in the
 103                // case that the event processor is stopping or has otherwise encountered an unrecoverable problem.
 104                //
 105                // Each of the handlers should respect cancellation as they are able in order to ensure that the
 106                // Event Processor client is able to perform its operations efficiently.
 107                //
 108                // In the case of the process event handler, the Event Processor client must await the result in
 109                // order to ensure that the ordering of events within a partition is maintained.  This makes respecting
 110                // the cancellation token important.
 111                //
 112                // Also of note, because the Event Processor client must await this handler, you are unable to safely
 113                // perform operations on the client, such as stopping or starting.  Doing so is likely to result in a
 114                // deadlock unless it is carefully queued as a background task.
 115
 0116                if (eventArgs.CancellationToken.IsCancellationRequested)
 117                {
 0118                    return Task.CompletedTask;
 119                }
 120
 121                try
 122                {
 123                    // For our example, we'll just track that the event was received and write its data to the
 124                    // console.
 125                    //
 126                    // Because there is no long-running or I/O operation, inspecting the cancellation
 127                    // token again does not make sense in this scenario.  However, in real-world processing, it is
 128                    // highly recommended that you do so as you are able.   It is also recommended that the cancellation
 129                    // token be passed to any asynchronous operations that are awaited in this handler.
 130
 0131                    ++eventIndex;
 0132                    Console.WriteLine($"Event Received: { Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()) }");
 0133                }
 0134                catch (Exception ex)
 135                {
 136                    // For real-world scenarios, you should take action appropriate to your application.  For our exampl
 137                    // the exception to the console.
 138
 0139                    Console.WriteLine();
 0140                    Console.WriteLine($"An error was observed while processing events.  Message: { ex.Message }");
 0141                    Console.WriteLine();
 0142                }
 143
 144                // Because our example handler is running synchronously, we'll manually return a completed
 145                // task.
 146
 0147                return Task.CompletedTask;
 148            };
 149
 150            // The error handler is invoked when there is an exception observed within the Event Processor client; it is
 151            // exceptions in your handler code.  The Event Processor client will make every effort to recover from excep
 152            // processing.  Should an exception that cannot be recovered from is encountered, the processor will forfeit
 153            // that it was processing so that work may redistributed.
 154            //
 155            // The exceptions surfaced to this handler may be fatal or non-fatal; because the processor may not be able 
 156            // whether an exception was fatal or whether its state was corrupted, this handler has responsibility for ma
 157            // whether processing should be terminated or restarted.  The handler may do so by calling Stop on the proce
 158            // calling Start on the processor.
 159            //
 160            // It is recommended that, for production scenarios, the decision be made by considering observations made b
 161            // handler invoked when initializing processing for a partition, and the handler invoked when processing for
 162            // developers will also include data from their monitoring platforms in this decision as well.
 163            //
 164            // As with event processing, should an exception occur in your code for this handler, processor will allow i
 165            // further action.
 166            //
 167            // For this example, exceptions will just be logged to the console.
 168
 169            Task processErrorHandler(ProcessErrorEventArgs eventArgs)
 170            {
 171                // As with the process event handler, the event arguments contain a cancellation token used by the Event
 172                // that the operation should be canceled.  The handler should respect cancellation as it is able in orde
 173                // Processor client is able to perform its operations efficiently.
 174                //
 175                // The process error handler is not awaited by the Event Processor client and is, instead, executed in a
 176                // means that you may safely interact with the Event Processor client, such as requesting that it stop p
 177
 0178                if (eventArgs.CancellationToken.IsCancellationRequested)
 179                {
 0180                    return Task.CompletedTask;
 181                }
 182
 183                // Because there is no long-running or I/O operation, inspecting the cancellation token again does not m
 184                // However, in real-world processing, it is recommended that you do so as you are able without compromis
 185                // and troubleshooting information.
 186                //
 187                // It is also recommended that the cancellation token be passed to any asynchronous operations that are 
 188
 0189                Console.WriteLine();
 0190                Console.WriteLine("===============================");
 0191                Console.WriteLine($"The error handler was invoked during the operation: { eventArgs.Operation ?? "Unknow
 0192                Console.WriteLine("===============================");
 0193                Console.WriteLine();
 194
 0195                return Task.CompletedTask;
 196            }
 197
 0198            processor.ProcessEventAsync += processEventHandler;
 0199            processor.ProcessErrorAsync += processErrorHandler;
 200
 201            try
 202            {
 203                // In order to begin processing, an explicit call must be made to the processor.  This will instruct the
 204                // processing in the background, invoking your handlers when they are needed.
 205
 0206                eventIndex = 0;
 0207                await processor.StartProcessingAsync();
 208
 209                // Because processing takes place in the background, we'll continue to wait until all of our events were
 210                // read and handled before stopping.  To ensure that we don't wait indefinitely should an unrecoverable
 211                // error be encountered, we'll also add a timed cancellation.
 212
 0213                using var cancellationSource = new CancellationTokenSource();
 0214                cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));
 215
 0216                while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count))
 217                {
 0218                    await Task.Delay(TimeSpan.FromMilliseconds(250));
 219                }
 220
 221                // Once we arrive at this point, either cancellation was requested or we have processed all of our event
 222                // both cases, we'll want to shut down the processor.
 223
 0224                await processor.StopProcessingAsync();
 0225            }
 226            finally
 227            {
 228                // It is encouraged that you unregister your handlers when you have finished
 229                // using the Event Processor to ensure proper cleanup.  This is especially
 230                // important when using lambda expressions or handlers in any form that may
 231                // contain closure scopes or hold other references.
 232
 0233                processor.ProcessEventAsync -= processEventHandler;
 0234                processor.ProcessErrorAsync -= processErrorHandler;
 235            }
 236
 237            // The Event Processor client has been stopped and is not explicitly disposable; there
 238            // is nothing further that we need to do for cleanup.
 239
 0240            Console.WriteLine();
 0241        }
 242    }
 243}