< Summary

Class:Azure.Messaging.EventHubs.Processor.Samples.Sample04_BasicCheckpointing
Assembly:Azure.Messaging.EventHubs.Processor.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample04_BasicCheckpointing.cs
Covered lines:0
Uncovered lines:76
Coverable lines:76
Total lines:238
Line coverage:0% (0 of 76)
Covered branches:0
Total branches:28
Branch coverage:0% (0 of 28)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%
<RunAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample04_BasicCheckpointing.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Text;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Messaging.EventHubs.Consumer;
 10using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure;
 11using Azure.Messaging.EventHubs.Producer;
 12using Azure.Storage.Blobs;
 13
 14namespace Azure.Messaging.EventHubs.Processor.Samples
 15{
 16    /// <summary>
 17    ///   An introduction to the Event Processor client, illustrating how to create simple checkpoints.
 18    /// </summary>
 19    ///
 20    public class Sample04_BasicCheckpointing : IEventHubsBlobCheckpointSample
 21    {
 22        /// <summary>
 23        ///   The name of the sample.
 24        /// </summary>
 25        ///
 026        public string Name => nameof(Sample04_BasicCheckpointing);
 27
 28        /// <summary>
 29        ///   A short description of the sample.
 30        /// </summary>
 31        ///
 032        public string Description => "An introduction to the Event Processor client, illustrating how to create simple c
 33
 34        /// <summary>
 35        ///   Runs the sample using the specified Event Hubs and Azure storage connection information.
 36        /// </summary>
 37        ///
 38        /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s
 39        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru
 40        /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an
 41        /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be
 42        ///
 43        public async Task RunAsync(string eventHubsConnectionString,
 44                                   string eventHubName,
 45                                   string blobStorageConnectionString,
 46                                   string blobContainerName)
 47        {
 48            // A checkpoint is the term used to describe a snapshot of the state of processing for a partition which has
 49            // to durable storage and which allows an Event Processor client to resume processing at a specific location
 50            // event stream.  When a processor starts, it will seek out an existing checkpoint and, if found, use that a
 51            // begins reading events.  If no checkpoint is found, a default location is used.
 52            //
 53            // Checkpoints are intended as a means to allow an Event Processor client or cluster of processors for a con
 54            // on which events were processed, so that processors can dynamically start, stop, join the cluster, and lea
 55            // need to start processing at the beginning of a partition and revisit all of its events.
 56            //
 57            // A checkpoint is based on an event and represents the last event that should be considered as processed fo
 58            // start with that checkpoint, the next available event would be used as the starting point.
 59            //
 60            // The creation of checkpoints comes at a cost, both in terms of processing performance/throughput and a pot
 61            // the underlying storage resource.  While it may seem desirable to create checkpoints for each event that i
 62            // an anti-pattern for most scenarios.
 63            //
 64            // When deciding how frequently to checkpoint, you'll need to consider the trade-off between the costs of cr
 65            // processing events.  For scenarios where processing events is very cheap, it is often a better approach to
 66            // once per time interval.  For scenarios where processing events is more expensive, it may be a better appr
 67            //
 68            // In either case, it is important to understand that your processing must be tolerant of receiving the same
 69            // Event Hubs service, like most messaging platforms, guarantees at-least-once delivery.  Even were you to c
 70            // process, it is entirely possible that you would receive that same event again from the service.
 71
 072            var expectedEvents = new List<EventData>()
 073            {
 074               new EventData(Encoding.UTF8.GetBytes("First Event, First Batch")),
 075               new EventData(Encoding.UTF8.GetBytes("Second Event, First Batch")),
 076               new EventData(Encoding.UTF8.GetBytes("Third Event, First Batch")),
 077
 078               new EventData(Encoding.UTF8.GetBytes("First Event, Second Batch")),
 079               new EventData(Encoding.UTF8.GetBytes("Second Event, Second Batch")),
 080               new EventData(Encoding.UTF8.GetBytes("Third Event, Second Batch")),
 081
 082               new EventData(Encoding.UTF8.GetBytes("First Event, Third Batch")),
 083               new EventData(Encoding.UTF8.GetBytes("Second Event, Third Batch")),
 084               new EventData(Encoding.UTF8.GetBytes("Third Event, Third Batch")),
 085            };
 86
 87            // To begin, we'll publish a batch of events for our processor to receive. Because we are not specifying any
 88            // the Event Hubs service will automatically route these to partitions.  We'll split the events into a coupl
 89            // increase the chance they'll be spread around.
 90
 091            int sentIndex = 0;
 092            int numberOfBatches = 3;
 093            int eventsPerBatch = (expectedEvents.Count / numberOfBatches);
 94
 095            await using (var producer = new EventHubProducerClient(eventHubsConnectionString, eventHubName))
 96            {
 097                while (sentIndex < expectedEvents.Count)
 98                {
 099                    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
 100
 0101                    for (int index = 0; index < eventsPerBatch; ++index)
 102                    {
 0103                        eventBatch.TryAdd(expectedEvents[sentIndex]);
 0104                        ++sentIndex;
 105                    }
 106
 0107                    await producer.SendAsync(eventBatch);
 0108                }
 109            }
 110
 111            // With our events having been published, we'll create an Event Hub Processor to read them.
 112
 0113            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
 0114            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
 0115            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS
 116
 117            // When your handler for processing events is invoked, the set of event arguments that it are passed allow t
 118            // saved as a checkpoint for the processor.  For this example, our handler will create a checkpoint once per
 119            // sent.
 120
 0121            int eventIndex = 0;
 0122            int eventsSinceLastCheckpoint = 0;
 123
 124            async Task processEventHandler(ProcessEventArgs eventArgs)
 125            {
 0126                if (eventArgs.CancellationToken.IsCancellationRequested)
 127                {
 0128                    return;
 129                }
 130
 131                try
 132                {
 0133                    ++eventIndex;
 0134                    ++eventsSinceLastCheckpoint;
 135
 0136                    if (eventsSinceLastCheckpoint >= eventsPerBatch)
 137                    {
 138                        // Updating the checkpoint will interact with the Azure Storage.  As a service call,
 139                        // this is done asynchronously and may be long-running.  You may want to influence its behavior,
 140                        // such as limiting the time that it may execute in order to ensure throughput for
 141                        // processing events.
 142                        //
 143                        // In our case, we'll limit the checkpoint operation to a second and request cancellation
 144                        // if it runs longer.
 145
 0146                        using CancellationTokenSource cancellationSource = new CancellationTokenSource(TimeSpan.FromSeco
 147
 148                        try
 149                        {
 0150                            await eventArgs.UpdateCheckpointAsync(cancellationSource.Token);
 0151                            eventsSinceLastCheckpoint = 0;
 152
 0153                            Console.WriteLine("Created checkpoint");
 0154                        }
 0155                        catch (TaskCanceledException)
 156                        {
 0157                            Console.WriteLine("Checkpoint creation took too long and was canceled.");
 0158                        }
 159
 0160                        Console.WriteLine();
 0161                    }
 162
 0163                    Console.WriteLine($"Event Received: { Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()) }");
 0164                }
 0165                catch (Exception ex)
 166                {
 167                    // For real-world scenarios, you should take action appropriate to your application.  For our exampl
 168                    // the exception to the console.
 169
 0170                    Console.WriteLine();
 0171                    Console.WriteLine($"An error was observed while processing events.  Message: { ex.Message }");
 0172                    Console.WriteLine();
 0173                }
 0174            };
 175
 176            // For this example, exceptions will just be logged to the console.
 177
 178            Task processErrorHandler(ProcessErrorEventArgs eventArgs)
 179            {
 0180                if (eventArgs.CancellationToken.IsCancellationRequested)
 181                {
 0182                    return Task.CompletedTask;
 183                }
 184
 0185                Console.WriteLine("===============================");
 0186                Console.WriteLine($"The error handler was invoked during the operation: { eventArgs.Operation ?? "Unknow
 0187                Console.WriteLine("===============================");
 0188                Console.WriteLine();
 189
 0190                return Task.CompletedTask;
 191            }
 192
 0193            processor.ProcessEventAsync += processEventHandler;
 0194            processor.ProcessErrorAsync += processErrorHandler;
 195
 196            try
 197            {
 198                // In order to begin processing, an explicit call must be made to the processor.  This will instruct the
 199                // processing in the background, invoking your handlers when they are needed.
 200
 0201                eventIndex = 0;
 0202                await processor.StartProcessingAsync();
 203
 204                // Because processing takes place in the background, we'll continue to wait until all of our events were
 205                // read and handled before stopping.  To ensure that we don't wait indefinitely should an unrecoverable
 206                // error be encountered, we'll also add a timed cancellation.
 207
 0208                using var cancellationSource = new CancellationTokenSource();
 0209                cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));
 210
 0211                while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count))
 212                {
 0213                    await Task.Delay(TimeSpan.FromMilliseconds(250));
 214                }
 215
 216                // Once we arrive at this point, either cancellation was requested or we have processed all of our event
 217                // both cases, we'll want to shut down the processor.
 218
 0219                await processor.StopProcessingAsync();
 0220            }
 221            finally
 222            {
 223                // It is encouraged that you unregister your handlers when you have finished
 224                // using the Event Processor to ensure proper cleanup.  This is especially
 225                // important when using lambda expressions or handlers in any form that may
 226                // contain closure scopes or hold other references.
 227
 0228                processor.ProcessEventAsync -= processEventHandler;
 0229                processor.ProcessErrorAsync -= processErrorHandler;
 230            }
 231
 232            // The Event Processor client has been stopped and is not explicitly disposable; there
 233            // is nothing further that we need to do for cleanup.
 234
 0235            Console.WriteLine();
 0236        }
 237    }
 238}