< Summary

Class:Azure.Messaging.EventHubs.Processor.Samples.Sample06_TrackWhenAPartitionIsClosed
Assembly:Azure.Messaging.EventHubs.Processor.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample06_TrackWhenAPartitionIsClosed.cs
Covered lines:0
Uncovered lines:92
Coverable lines:92
Total lines:268
Line coverage:0% (0 of 92)
Covered branches:0
Total branches:30
Branch coverage:0% (0 of 30)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%
<RunAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample06_TrackWhenAPartitionIsClosed.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Concurrent;
 6using System.Collections.Generic;
 7using System.Text;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using Azure.Messaging.EventHubs.Consumer;
 11using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure;
 12using Azure.Messaging.EventHubs.Producer;
 13using Azure.Storage.Blobs;
 14
 15namespace Azure.Messaging.EventHubs.Processor.Samples
 16{
 17    /// <summary>
 18    ///   An introduction to the Event Processor client, illustrating how to track when processing stops for a partition
 19    /// </summary>
 20    ///
 21    public class Sample06_TrackWhenAPartitionIsClosed : IEventHubsBlobCheckpointSample
 22    {
 23        /// <summary>
 24        ///   The name of the sample.
 25        /// </summary>
 26        ///
 027        public string Name => nameof(Sample06_TrackWhenAPartitionIsClosed);
 28
 29        /// <summary>
 30        ///   A short description of the sample.
 31        /// </summary>
 32        ///
 033        public string Description => "An introduction to the Event Processor client, illustrating how to track when proc
 34
 35        /// <summary>
 36        ///   Runs the sample using the specified Event Hubs and Azure storage connection information.
 37        /// </summary>
 38        ///
 39        /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s
 40        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru
 41        /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an
 42        /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be
 43        ///
 44        public async Task RunAsync(string eventHubsConnectionString,
 45                                   string eventHubName,
 46                                   string blobStorageConnectionString,
 47                                   string blobContainerName)
 48        {
 49            // When the Event Processor client begins processing, it will take ownership over a set of Event Hub partiti
 50            // processor has of a partition is transient; ownership may be relinquished by the processor in several scen
 51            //
 52            // - Because processors work collaboratively within the context of a consumer group, they will share respons
 53            //   with ownership of a partition potentially moving from one processor to another.
 54            //
 55            // - If a processor encounters an issue and believes that it cannot safely recover processing for a partitio
 56            //   relinquish ownership and allow the partition to be claimed by another processor or may reclaim it with 
 57            //   also result in errors being sent for processing by the handler.
 58            //
 59            // - Should there be a request for a processor to stop processing, it will relinquish ownership of its parti
 60            //   potentially claim them so that processing continues as event processor instances are scaled for the con
 61            //
 62            // In any of these cases, the "PartitionClosing" event will be triggered on the processor.  While there are 
 63            // the closing of a partition, this event serves as the logical partner of the "PartitionInitializing" event
 64            // by the processor.
 65
 066            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
 067            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
 068            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS
 69
 70            // For this example, we'll keep a list of partitions that are owned by our processor instance and keep it up
 71            // date with the last event processed for that partition.  When our processor closes that partition, we'll e
 72            // have an up-to-date checkpoint based on the last event.
 73
 074            int eventsProcessed = 0;
 075            ConcurrentDictionary<string, ProcessEventArgs> ownedPartitions = new ConcurrentDictionary<string, ProcessEve
 76
 77            // The handler for partition initialization is responsible for beginning to track the partition.
 78
 79            Task partitionInitializingHandler(PartitionInitializingEventArgs eventArgs)
 80            {
 081                if (eventArgs.CancellationToken.IsCancellationRequested)
 82                {
 083                    return Task.CompletedTask;
 84                }
 85
 86                try
 87                {
 088                    ownedPartitions[eventArgs.PartitionId] = default(ProcessEventArgs);
 089                    Console.WriteLine($"Initialized partition: { eventArgs.PartitionId }");
 090                }
 091                catch (Exception ex)
 92                {
 93                    // For real-world scenarios, you should take action appropriate to your application.  For our exampl
 94                    // the exception to the console.
 95
 096                    Console.WriteLine();
 097                    Console.WriteLine($"An error was observed while initializing partition: { eventArgs.PartitionId }.  
 098                    Console.WriteLine();
 099                }
 100
 0101                return Task.CompletedTask;
 102            }
 103
 104            // The handler for partition close will stop tracking the partition and checkpoint if an event was processed
 105
 106            async Task partitionClosingHandler(PartitionClosingEventArgs eventArgs)
 107            {
 0108                if (eventArgs.CancellationToken.IsCancellationRequested)
 109                {
 0110                    return;
 111                }
 112
 113                try
 114                {
 0115                    if (ownedPartitions.TryRemove(eventArgs.PartitionId, out ProcessEventArgs lastProcessEventArgs))
 116                    {
 0117                        await lastProcessEventArgs.UpdateCheckpointAsync();
 118                    }
 119
 0120                    Console.WriteLine($"Closing partition: { eventArgs.PartitionId }");
 0121                }
 0122                catch (Exception ex)
 123                {
 124                    // For real-world scenarios, you should take action appropriate to your application.  For our exampl
 125                    // the exception to the console.
 126
 0127                    Console.WriteLine();
 0128                    Console.WriteLine($"An error was observed while closing partition: { eventArgs.PartitionId }.  Messa
 0129                    Console.WriteLine();
 0130                }
 0131            }
 132
 133            // When an event is received, update the partition if tracked.  In the case that the value changes in the
 134            // time that it was checked, consider the other event fresher and do not force an update.
 135
 136            Task processEventHandler(ProcessEventArgs eventArgs)
 137            {
 0138                if (eventArgs.CancellationToken.IsCancellationRequested)
 139                {
 0140                    return Task.CompletedTask;
 141                }
 142
 143                try
 144                {
 0145                    ownedPartitions[eventArgs.Partition.PartitionId] = eventArgs;
 146
 0147                    ++eventsProcessed;
 0148                    Console.WriteLine($"Event Received: { Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()) }");
 0149                }
 0150                catch (Exception ex)
 151                {
 152                    // For real-world scenarios, you should take action appropriate to your application.  For our exampl
 153                    // the exception to the console.
 154
 0155                    Console.WriteLine();
 0156                    Console.WriteLine($"An error was observed while processing events.  Message: { ex.Message }");
 0157                    Console.WriteLine();
 0158                }
 159
 0160                return Task.CompletedTask;
 161            };
 162
 163            // For this example, exceptions will just be logged to the console.
 164
 165            Task processErrorHandler(ProcessErrorEventArgs eventArgs)
 166            {
 0167                if (eventArgs.CancellationToken.IsCancellationRequested)
 168                {
 0169                    return Task.CompletedTask;
 170                }
 171
 0172                Console.WriteLine("===============================");
 0173                Console.WriteLine($"The error handler was invoked during the operation: { eventArgs.Operation ?? "Unknow
 0174                Console.WriteLine("===============================");
 0175                Console.WriteLine();
 176
 0177                return Task.CompletedTask;
 178            }
 179
 0180            processor.PartitionInitializingAsync += partitionInitializingHandler;
 0181            processor.PartitionClosingAsync += partitionClosingHandler;
 0182            processor.ProcessEventAsync += processEventHandler;
 0183            processor.ProcessErrorAsync += processErrorHandler;
 184
 185            try
 186            {
 187                // To begin, we'll publish a batch of events for our processor to receive. Because we are not specifying
 188                // the Event Hubs service will automatically route these to partitions.  We'll split the events into a c
 189                // increase the chance they'll be spread around.
 190
 0191                var expectedEvents = new List<EventData>()
 0192                {
 0193                   new EventData(Encoding.UTF8.GetBytes("First Event, First Batch")),
 0194                   new EventData(Encoding.UTF8.GetBytes("Second Event, First Batch")),
 0195                   new EventData(Encoding.UTF8.GetBytes("Third Event, First Batch")),
 0196
 0197                   new EventData(Encoding.UTF8.GetBytes("First Event, Second Batch")),
 0198                   new EventData(Encoding.UTF8.GetBytes("Second Event, Second Batch")),
 0199                   new EventData(Encoding.UTF8.GetBytes("Third Event, Second Batch")),
 0200
 0201                   new EventData(Encoding.UTF8.GetBytes("First Event, Third Batch")),
 0202                   new EventData(Encoding.UTF8.GetBytes("Second Event, Third Batch")),
 0203                   new EventData(Encoding.UTF8.GetBytes("Third Event, Third Batch")),
 0204                };
 205
 0206                int sentIndex = 0;
 0207                int numberOfBatches = 3;
 0208                int eventsPerBatch = (expectedEvents.Count / numberOfBatches);
 209
 0210                await using (var producer = new EventHubProducerClient(eventHubsConnectionString, eventHubName))
 211                {
 0212                    while (sentIndex < expectedEvents.Count)
 213                    {
 0214                        using EventDataBatch eventBatch = await producer.CreateBatchAsync();
 215
 0216                        for (int index = 0; index < eventsPerBatch; ++index)
 217                        {
 0218                            eventBatch.TryAdd(expectedEvents[sentIndex]);
 0219                            ++sentIndex;
 220                        }
 221
 0222                        await producer.SendAsync(eventBatch);
 0223                    }
 224                }
 225
 226                // In order to begin processing, an explicit call must be made to the processor.  This will instruct the
 227                // processing in the background, invoking your handlers when they are needed.
 228
 0229                eventsProcessed = 0;
 0230                await processor.StartProcessingAsync();
 231
 232                // Because processing takes place in the background, we'll continue to wait until all of our events were
 233                // read and handled before stopping.   To ensure that we don't wait indefinitely should an unrecoverable
 234                // error be encountered, we'll also add a timed cancellation.
 235
 0236                using var cancellationSource = new CancellationTokenSource();
 0237                cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));
 238
 0239                while ((!cancellationSource.IsCancellationRequested) && (eventsProcessed <= expectedEvents.Count))
 240                {
 0241                    await Task.Delay(TimeSpan.FromMilliseconds(250));
 242                }
 243
 244                // Once we arrive at this point, either cancellation was requested or we have processed all of our event
 245                // both cases, we'll want to shut down the processor.
 246
 0247                await processor.StopProcessingAsync();
 0248            }
 249            finally
 250            {
 251                // It is encouraged that you unregister your handlers when you have finished
 252                // using the Event Processor to ensure proper cleanup.  This is especially
 253                // important when using lambda expressions or handlers in any form that may
 254                // contain closure scopes or hold other references.
 255
 0256                processor.PartitionInitializingAsync -= partitionInitializingHandler;
 0257                processor.PartitionClosingAsync -= partitionClosingHandler;
 0258                processor.ProcessEventAsync -= processEventHandler;
 0259                processor.ProcessErrorAsync -= processErrorHandler;
 260            }
 261
 262            // The Event Processor client has been stopped and is not explicitly disposable; there
 263            // is nothing further that we need to do for cleanup.
 264
 0265            Console.WriteLine();
 0266        }
 267    }
 268}