< Summary

Class:Azure.Messaging.EventHubs.Processor.Samples.Sample09_ProcessEventsByBatch
Assembly:Azure.Messaging.EventHubs.Processor.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample09_ProcessEventsByBatch.cs
Covered lines:0
Uncovered lines:113
Coverable lines:113
Total lines:298
Line coverage:0% (0 of 113)
Covered branches:0
Total branches:41
Branch coverage:0% (0 of 41)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%
<RunAsync()-0%0%
SendEventBatchAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample09_ProcessEventsByBatch.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Concurrent;
 6using System.Collections.Generic;
 7using System.Linq;
 8using System.Text;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Azure.Messaging.EventHubs.Consumer;
 12using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure;
 13using Azure.Messaging.EventHubs.Producer;
 14using Azure.Storage.Blobs;
 15
 16namespace Azure.Messaging.EventHubs.Processor.Samples
 17{
 18    /// <summary>
 19    ///   An example of grouping events into batches for downstream processing.
 20    /// </summary>
 21    ///
 22    public class Sample09_ProcessEventsByBatch : IEventHubsBlobCheckpointSample
 23    {
 24        /// <summary>
 25        ///   The name of the sample.
 26        /// </summary>
 27        ///
 028        public string Name => nameof(Sample09_ProcessEventsByBatch);
 29
 30        /// <summary>
 31        ///   A short description of the sample.
 32        /// </summary>
 33        ///
 034        public string Description => "An example of grouping events into batches for downstream processing.";
 35
 36        /// <summary>
 37        ///   Runs the sample using the specified Event Hubs and Azure storage connection information.
 38        /// </summary>
 39        ///
 40        /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s
 41        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru
 42        /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an
 43        /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be
 44        ///
 45        public async Task RunAsync(string eventHubsConnectionString,
 46                                   string eventHubName,
 47                                   string blobStorageConnectionString,
 48                                   string blobContainerName)
 49        {
 50            // In order to ensure efficient communication with the Event Hubs service and the best throughput possible f
 51            // the Event Processor client is eagerly reading from each partition of the Event Hub and staging events.  T
 52            // to the "ProcessEvent" handler immediately when one is available.  Each call to the handler passes a singl
 53            // from which the event was read.  This pattern is intended to allow developers to act on an event as soon a
 54            // and understandable interface.
 55            //
 56            // This approach is optimized for scenarios where the processing of events can be performed quickly and with
 57            // where that is not the case, it may be advantageous to collect the events into batches and send them to be
 58            // of the "ProcessEvent" handler.
 59            //
 60            // In this example, our "ProcessEvent" handler will group events into batches by partition, sending them for
 61            // batch size was reached or when no event was available for more than a maximum wait time interval.
 62
 063            int desiredBatchSize = 3;
 064            TimeSpan maximumWaitTime = TimeSpan.FromMilliseconds(150);
 65
 66            // The Event Processor client will preserve the order that events were enqueued in a partition by waiting fo
 67            // complete before it is invoked with a new event for the same partition.  However, partitions are processed
 68            // handler is likely to be executing for multiple partitions at the same time.
 69            //
 70            // To account for this, we'll use a concurrent dictionary to track batches, grouping them by partition.
 71
 072            ConcurrentDictionary<string, List<ProcessEventArgs>> eventBatches = new ConcurrentDictionary<string, List<Pr
 73
 74            // Create our Event Processor client, specifying the maximum wait time as an option to ensure that
 75            // our handler is invoked when no event was available.
 76
 077            EventProcessorClientOptions clientOptions = new EventProcessorClientOptions
 078            {
 079                MaximumWaitTime = maximumWaitTime
 080            };
 81
 082            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
 083            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
 084            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS
 85
 86            // For this example, we'll create a simple event handler that writes to the
 87            // console each time it was invoked.
 88
 089            int eventIndex = 0;
 90
 91            async Task processEventHandler(ProcessEventArgs eventArgs)
 92            {
 093                if (eventArgs.CancellationToken.IsCancellationRequested)
 94                {
 095                    return;
 96                }
 97
 98                try
 99                {
 100                    // Retrieve or create the active batch for the current partition.
 101
 0102                    List<ProcessEventArgs> currentBatch = eventBatches.GetOrAdd(eventArgs.Partition.PartitionId, _ => ne
 0103                    bool sendBatchForProcessing = false;
 104
 105                    // If there was an event emitted, add the event and check to see if the size of the batch has reache
 106                    // size.  If so, it will need to be sent.
 107                    //
 108                    // NOTE:  There is a bug in the Event Hubs preview 6 library causing "HasEvents" to return the
 109                    //        wrong value.  We'll substitute a check against the "Data" property to work around it.
 110                    //
 111                    //        if (eventArgs.HasEvents) {} is the preferred snippet.
 112                    //
 0113                    if (eventArgs.Data != null)
 114                    {
 0115                        currentBatch.Add(eventArgs);
 0116                        sendBatchForProcessing = (currentBatch.Count >= desiredBatchSize);
 117                    }
 118                    else
 119                    {
 120                        // There was no event available within the interval requested by the maximum
 121                        // wait time, send the batch for processing if it contains any events.
 122
 0123                        sendBatchForProcessing = (currentBatch.Count > 0);
 124                    }
 125
 126                    // It is important to be aware that no events for the partition will be processed until the handler 
 127                    // so you may wish to delegate processing to a downstream service or background task in order to mai
 128                    // throughput.
 129                    //
 130                    // In this example, if the batch is to be sent, we'll delegate to simple helper method that will wri
 131                    // to the console.  If sending our batch completed successfully, we'll checkpoint using the last
 132                    // event and clear the batch.
 133
 0134                    if (sendBatchForProcessing)
 135                    {
 0136                        await SendEventBatchAsync(currentBatch.Select(item => item.Data));
 0137                        await currentBatch[currentBatch.Count - 1].UpdateCheckpointAsync();
 0138                        currentBatch.Clear();
 139                    }
 0140                }
 0141                catch (Exception ex)
 142                {
 0143                    Console.WriteLine();
 0144                    Console.WriteLine($"An error was observed while processing events.  Message: { ex.Message }");
 0145                    Console.WriteLine();
 0146                }
 147
 0148                ++eventIndex;
 0149            };
 150
 151            // For this example, exceptions will just be logged to the console.
 152
 153            Task processErrorHandler(ProcessErrorEventArgs eventArgs)
 154            {
 0155                if (eventArgs.CancellationToken.IsCancellationRequested)
 156                {
 0157                    return Task.CompletedTask;
 158                }
 159
 0160                Console.WriteLine();
 0161                Console.WriteLine("===============================");
 0162                Console.WriteLine($"The error handler was invoked during the operation: { eventArgs.Operation ?? "Unknow
 0163                Console.WriteLine("===============================");
 0164                Console.WriteLine();
 165
 0166                return Task.CompletedTask;
 167            }
 168
 0169            processor.ProcessEventAsync += processEventHandler;
 0170            processor.ProcessErrorAsync += processErrorHandler;
 171
 172            try
 173            {
 0174                Console.WriteLine("Starting the Event Processor client...");
 0175                Console.WriteLine();
 176
 0177                eventIndex = 0;
 0178                await processor.StartProcessingAsync();
 179
 0180                using var cancellationSource = new CancellationTokenSource();
 0181                cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));
 182
 183                // We'll publish a batch of events for our processor to receive. We'll split the events into a couple of
 184                // increase the chance they'll be spread around to different partitions and introduce a delay between ba
 185                // allow for the handler to be invoked without an available event interleaved.
 186
 0187                var expectedEvents = new List<EventData>()
 0188                {
 0189                   new EventData(Encoding.UTF8.GetBytes("First Event, First Batch")),
 0190                   new EventData(Encoding.UTF8.GetBytes("Second Event, First Batch")),
 0191                   new EventData(Encoding.UTF8.GetBytes("Third Event, First Batch")),
 0192
 0193                   new EventData(Encoding.UTF8.GetBytes("First Event, Second Batch")),
 0194                   new EventData(Encoding.UTF8.GetBytes("Second Event, Second Batch")),
 0195                   new EventData(Encoding.UTF8.GetBytes("Third Event, Second Batch")),
 0196
 0197                   new EventData(Encoding.UTF8.GetBytes("First Event, Third Batch")),
 0198                   new EventData(Encoding.UTF8.GetBytes("Second Event, Third Batch")),
 0199                   new EventData(Encoding.UTF8.GetBytes("Third Event, Third Batch")),
 0200
 0201                   new EventData(Encoding.UTF8.GetBytes("First Event, Fourth Batch")),
 0202                   new EventData(Encoding.UTF8.GetBytes("Second Event, Fourth Batch")),
 0203                   new EventData(Encoding.UTF8.GetBytes("Third Event, Fourth Batch")),
 0204
 0205                   new EventData(Encoding.UTF8.GetBytes("First Event, Fifth Batch")),
 0206                   new EventData(Encoding.UTF8.GetBytes("Second Event, Fifth Batch")),
 0207                   new EventData(Encoding.UTF8.GetBytes("Third Event, Fifth Batch")),
 0208
 0209                   new EventData(Encoding.UTF8.GetBytes("First Event, Fifth Batch")),
 0210                   new EventData(Encoding.UTF8.GetBytes("Second Event, Fifth Batch")),
 0211                   new EventData(Encoding.UTF8.GetBytes("Third Event, Fifth Batch")),
 0212
 0213                   new EventData(Encoding.UTF8.GetBytes("First Event, Sixth Batch")),
 0214                   new EventData(Encoding.UTF8.GetBytes("Second Event, Sixth Batch")),
 0215                   new EventData(Encoding.UTF8.GetBytes("Third Event, Sixth Batch")),
 0216
 0217                   new EventData(Encoding.UTF8.GetBytes("First Event, Seventh Batch")),
 0218                   new EventData(Encoding.UTF8.GetBytes("Second Event, Seventh Batch")),
 0219                   new EventData(Encoding.UTF8.GetBytes("Third Event, Seventh Batch"))
 0220                };
 221
 0222                int sentIndex = 0;
 0223                int numberOfBatches = 3;
 0224                int eventsPerBatch = (expectedEvents.Count / numberOfBatches);
 225
 0226                await using (var producer = new EventHubProducerClient(eventHubsConnectionString, eventHubName))
 227                {
 0228                    while (sentIndex < expectedEvents.Count)
 229                    {
 0230                        using EventDataBatch eventBatch = await producer.CreateBatchAsync();
 231
 0232                        for (int index = 0; index < eventsPerBatch; ++index)
 233                        {
 0234                            eventBatch.TryAdd(expectedEvents[sentIndex]);
 0235                            ++sentIndex;
 236                        }
 237
 0238                        await producer.SendAsync(eventBatch);
 0239                        await Task.Delay(250, cancellationSource.Token);
 0240                    }
 241                }
 242
 243                // We'll allow the Event Processor client to read and dispatch the events that we published, along with
 244                // ensuring a few invocations with no event.  Note that, due to non-determinism in the timing, we may or
 245                // not see all of the events from our batches read.
 246
 0247                while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count + 5))
 248                {
 0249                    await Task.Delay(TimeSpan.FromMilliseconds(250));
 250                }
 251
 252                // Once we arrive at this point, either cancellation was requested or we have processed all of our event
 253                // both cases, we'll want to shut down the processor.
 254
 0255                Console.WriteLine();
 0256                Console.WriteLine("Stopping the processor...");
 257
 0258                await processor.StopProcessingAsync();
 0259            }
 260            finally
 261            {
 262                // It is encouraged that you unregister your handlers when you have finished
 263                // using the Event Processor to ensure proper cleanup.  This is especially
 264                // important when using lambda expressions or handlers in any form that may
 265                // contain closure scopes or hold other references.
 266
 0267                processor.ProcessEventAsync -= processEventHandler;
 0268                processor.ProcessErrorAsync -= processErrorHandler;
 269            }
 270
 271            // The Event Processor client has been stopped and is not explicitly disposable; there
 272            // is nothing further that we need to do for cleanup.
 273
 0274            Console.WriteLine();
 0275        }
 276
 277        /// <summary>
 278        ///   A helper method to simulate sending an event batch for processing.
 279        /// </summary>
 280        ///
 281        /// <param name="eventBatch">The event batch to "send".</param>
 282        ///
 283        private async Task SendEventBatchAsync(IEnumerable<EventData> eventBatch)
 284        {
 0285            StringBuilder batchMessage = new StringBuilder();
 0286            int eventCount = 0;
 287
 0288            foreach (EventData data in eventBatch)
 289            {
 0290                batchMessage.AppendLine($"\tEvent: { Encoding.UTF8.GetString(data.Body.ToArray()) }");
 0291                ++eventCount;
 292            }
 293
 0294            Console.WriteLine($"Event Batch with { eventCount } events sent:{ Environment.NewLine }{ batchMessage.ToStri
 0295            await Task.Delay(50);
 0296        }
 297    }
 298}