< Summary

Class:Azure.Messaging.EventHubs.Processor.Samples.Sample08_EventProcessingHeartbeat
Assembly:Azure.Messaging.EventHubs.Processor.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample08_EventProcessingHeartbeat.cs
Covered lines:0
Uncovered lines:78
Coverable lines:78
Total lines:221
Line coverage:0% (0 of 78)
Covered branches:0
Total branches:37
Branch coverage:0% (0 of 37)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%
<RunAsync()-0%0%
SendHeartbeatAsync()-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample08_EventProcessingHeartbeat.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Text;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Messaging.EventHubs.Consumer;
 10using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure;
 11using Azure.Messaging.EventHubs.Producer;
 12using Azure.Storage.Blobs;
 13
 14namespace Azure.Messaging.EventHubs.Processor.Samples
 15{
 16    /// <summary>
 17    ///   An example of ensuring that the handler for processing events is invoked on a fixed interval when no events ar
 18    /// </summary>
 19    ///
 20    public class Sample08_EventProcessingHeartbeat : IEventHubsBlobCheckpointSample
 21    {
 22        /// <summary>
 23        ///   The name of the sample.
 24        /// </summary>
 25        ///
 026        public string Name => nameof(Sample08_EventProcessingHeartbeat);
 27
 28        /// <summary>
 29        ///   A short description of the sample.
 30        /// </summary>
 31        ///
 032        public string Description => "An example of ensuring that the handler for processing events is invoked on a fixe
 33
 34        /// <summary>
 35        ///   Runs the sample using the specified Event Hubs and Azure storage connection information.
 36        /// </summary>
 37        ///
 38        /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s
 39        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru
 40        /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an
 41        /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be
 42        ///
 43        public async Task RunAsync(string eventHubsConnectionString,
 44                                   string eventHubName,
 45                                   string blobStorageConnectionString,
 46                                   string blobContainerName)
 47        {
 48            // In this example, our Event Processor client will be configured to use a maximum wait time which is consid
 49            // When events are available, the Event Processor client will pass them to the ProcessEvent handler to be pr
 50            // partition for longer than the specified maximum wait interval, the processor will invoke the ProcessEvent
 51            // include an event.  This allows your handler code to receive control and perform actions such as sending a
 52            // operation specific to your application needs.
 53            //
 54            // For our processor, we'll specify a small maximum wait time value as part of the options.
 55
 056            EventProcessorClientOptions clientOptions = new EventProcessorClientOptions
 057            {
 058                MaximumWaitTime = TimeSpan.FromMilliseconds(150)
 059            };
 60
 061            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
 062            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
 063            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS
 64
 65            // For this example, we'll create a simple event handler that writes to the
 66            // console each time it was invoked.
 67
 068            int eventIndex = 0;
 69
 70            async Task processEventHandler(ProcessEventArgs eventArgs)
 71            {
 072                if (eventArgs.CancellationToken.IsCancellationRequested)
 73                {
 074                    return;
 75                }
 76
 77                try
 78                {
 79                    // The "HasEvent" property of the arguments will be set if an event was available from the
 80                    // Event Hubs service.  If so, the argument properties for the event is populated and checkpoints
 81                    // may be created.
 82                    //
 83                    // If the "HasEvent" property is unset, the event will be empty and checkpoints may not be created.
 84
 085                    if (eventArgs.HasEvent)
 86                    {
 087                        Console.WriteLine($"Event Received: { Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()) }")
 88                    }
 89
 90                    // Simulate sending a heartbeat using a simple helper that writes a status to the
 91                    // console.
 92
 093                    await SendHeartbeatAsync();
 094                    ++eventIndex;
 095                }
 096                catch (Exception ex)
 97                {
 098                    Console.WriteLine();
 099                    Console.WriteLine($"An error was observed while processing events.  Message: { ex.Message }");
 0100                    Console.WriteLine();
 0101                }
 0102            };
 103
 104            // For this example, exceptions will just be logged to the console.
 105
 106            Task processErrorHandler(ProcessErrorEventArgs eventArgs)
 107            {
 0108                if (eventArgs.CancellationToken.IsCancellationRequested)
 109                {
 0110                    return Task.CompletedTask;
 111                }
 112
 0113                Console.WriteLine();
 0114                Console.WriteLine("===============================");
 0115                Console.WriteLine($"The error handler was invoked during the operation: { eventArgs.Operation ?? "Unknow
 0116                Console.WriteLine("===============================");
 0117                Console.WriteLine();
 118
 0119                return Task.CompletedTask;
 120            }
 121
 0122            processor.ProcessEventAsync += processEventHandler;
 0123            processor.ProcessErrorAsync += processErrorHandler;
 124
 125            try
 126            {
 0127                Console.WriteLine("Starting the Event Processor client...");
 0128                Console.WriteLine();
 129
 0130                eventIndex = 0;
 0131                await processor.StartProcessingAsync();
 132
 0133                using var cancellationSource = new CancellationTokenSource();
 0134                cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));
 135
 136                // We'll publish a batch of events for our processor to receive. We'll split the events into a couple of
 137                // increase the chance they'll be spread around to different partitions and introduce a delay between ba
 138                // allow for the handler to be invoked without an available event interleaved.
 139
 0140                var expectedEvents = new List<EventData>()
 0141                {
 0142                   new EventData(Encoding.UTF8.GetBytes("First Event, First Batch")),
 0143                   new EventData(Encoding.UTF8.GetBytes("Second Event, First Batch")),
 0144                   new EventData(Encoding.UTF8.GetBytes("Third Event, First Batch")),
 0145
 0146                   new EventData(Encoding.UTF8.GetBytes("First Event, Second Batch")),
 0147                   new EventData(Encoding.UTF8.GetBytes("Second Event, Second Batch")),
 0148                   new EventData(Encoding.UTF8.GetBytes("Third Event, Second Batch")),
 0149
 0150                   new EventData(Encoding.UTF8.GetBytes("First Event, Third Batch")),
 0151                   new EventData(Encoding.UTF8.GetBytes("Second Event, Third Batch")),
 0152                   new EventData(Encoding.UTF8.GetBytes("Third Event, Third Batch"))
 0153                };
 154
 0155                int sentIndex = 0;
 0156                int numberOfBatches = 3;
 0157                int eventsPerBatch = (expectedEvents.Count / numberOfBatches);
 158
 0159                await using (var producer = new EventHubProducerClient(eventHubsConnectionString, eventHubName))
 160                {
 0161                    while (sentIndex < expectedEvents.Count)
 162                    {
 0163                        using EventDataBatch eventBatch = await producer.CreateBatchAsync();
 164
 0165                        for (int index = 0; index < eventsPerBatch; ++index)
 166                        {
 0167                            eventBatch.TryAdd(expectedEvents[sentIndex]);
 0168                            ++sentIndex;
 169                        }
 170
 0171                        await producer.SendAsync(eventBatch);
 0172                        await Task.Delay(250, cancellationSource.Token);
 0173                    }
 174                }
 175
 176                // We'll allow the Event Processor client to read and dispatch the events that we published, along with
 177                // ensuring a few invocations with no event.  Note that, due to non-determinism in the timing, we may or
 178                // not see all of the events from our batches read.
 179
 0180                while ((!cancellationSource.IsCancellationRequested) && (eventIndex <= expectedEvents.Count + 5))
 181                {
 0182                    await Task.Delay(TimeSpan.FromMilliseconds(250));
 183                }
 184
 185                // Once we arrive at this point, either cancellation was requested or we have processed all of our event
 186                // both cases, we'll want to shut down the processor.
 187
 0188                Console.WriteLine();
 0189                Console.WriteLine("Stopping the processor...");
 190
 0191                await processor.StopProcessingAsync();
 0192            }
 193            finally
 194            {
 195                // It is encouraged that you unregister your handlers when you have finished
 196                // using the Event Processor to ensure proper cleanup.  This is especially
 197                // important when using lambda expressions or handlers in any form that may
 198                // contain closure scopes or hold other references.
 199
 0200                processor.ProcessEventAsync -= processEventHandler;
 0201                processor.ProcessErrorAsync -= processErrorHandler;
 202            }
 203
 204            // The Event Processor client has been stopped and is not explicitly disposable; there
 205            // is nothing further that we need to do for cleanup.
 206
 0207            Console.WriteLine();
 0208        }
 209
 210        /// <summary>
 211        ///   A helper method to simulate sending a heartbeat for health monitoring
 212        ///   during event processing.
 213        /// </summary>
 214        ///
 215        private async Task SendHeartbeatAsync()
 216        {
 0217            Console.WriteLine("Sending heartbeat (simulated)...");
 0218            await Task.Delay(50);
 0219        }
 220    }
 221}