< Summary

Class:Azure.Messaging.EventHubs.Processor.Samples.Sample05_InitializeAPartition
Assembly:Azure.Messaging.EventHubs.Processor.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample05_InitializeAPartition.cs
Covered lines:0
Uncovered lines:48
Coverable lines:48
Total lines:178
Line coverage:0% (0 of 48)
Covered branches:0
Total branches:12
Branch coverage:0% (0 of 12)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample05_InitializeAPartition.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Text;
 6using System.Threading;
 7using System.Threading.Tasks;
 8using Azure.Messaging.EventHubs.Consumer;
 9using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure;
 10using Azure.Storage.Blobs;
 11
 12namespace Azure.Messaging.EventHubs.Processor.Samples
 13{
 14    /// <summary>
 15    ///   An introduction to the Event Processor client, illustrating how to participate in initialization for a partiti
 16    /// </summary>
 17    ///
 18    public class Sample05_InitializeAPartition : IEventHubsBlobCheckpointSample
 19    {
 20        /// <summary>
 21        ///   The name of the sample.
 22        /// </summary>
 23        ///
 024        public string Name => nameof(Sample05_InitializeAPartition);
 25
 26        /// <summary>
 27        ///   A short description of the sample.
 28        /// </summary>
 29        ///
 030        public string Description => "An introduction to the Event Processor client, illustrating how to participate in 
 31
 32        /// <summary>
 33        ///   Runs the sample using the specified Event Hubs and Azure storage connection information.
 34        /// </summary>
 35        ///
 36        /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s
 37        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru
 38        /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an
 39        /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be
 40        ///
 41        public async Task RunAsync(string eventHubsConnectionString,
 42                                   string eventHubName,
 43                                   string blobStorageConnectionString,
 44                                   string blobContainerName)
 45        {
 46            // When the Event Processor client begins processing, it will take ownership over a set of Event Hub partiti
 47            // partition that the processor claims, the first step in processing is to initialize the partition.  In ord
 48            // track partitions owned by the processor and to participate in the initialization, a "PartitionInitializin
 49            // the processor.
 50            //
 51            // When a partition is initialized, one of the decisions made is where in the partition's event stream to be
 52            // where a checkpoint exists for a partition, processing will begin at the next available event after the ch
 53            // is found for a partition, a default location is used.
 54            //
 55            // One of the common reasons that you may choose to participate in initialization is to influence where to b
 56            // is not found, overriding the default.  To achieve this, you'll set the associated property on the event a
 57
 058            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
 059            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
 060            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS
 61
 62            // The handler for partition initialization will set the default position for all partitions, asking to begi
 63            // point in the stream.
 64
 65            Task partitionInitializingHandler(PartitionInitializingEventArgs eventArgs)
 66            {
 067                if (eventArgs.CancellationToken.IsCancellationRequested)
 68                {
 069                    return Task.CompletedTask;
 70                }
 71
 72                try
 73                {
 074                    eventArgs.DefaultStartingPosition = EventPosition.Latest;
 075                    Console.WriteLine($"Initialized partition: { eventArgs.PartitionId }");
 076                }
 077                catch (Exception ex)
 78                {
 79                    // For real-world scenarios, you should take action appropriate to your application.  For our exampl
 80                    // the exception to the console.
 81
 082                    Console.WriteLine();
 083                    Console.WriteLine($"An error was observed while initializing partition: { eventArgs.PartitionId }.  
 084                    Console.WriteLine();
 085                }
 86
 087                return Task.CompletedTask;
 88            }
 89
 90            // For this example, events will just be logged to the console.
 91
 92            Task processEventHandler(ProcessEventArgs eventArgs)
 93            {
 094                if (eventArgs.CancellationToken.IsCancellationRequested)
 95                {
 096                    return Task.CompletedTask;
 97                }
 98
 99                try
 100                {
 0101                    Console.WriteLine($"Event Received: { Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()) }");
 0102                }
 0103                catch (Exception ex)
 104                {
 105                    // For real-world scenarios, you should take action appropriate to your application.  For our exampl
 106                    // the exception to the console.
 107
 0108                    Console.WriteLine();
 0109                    Console.WriteLine($"An error was observed while processing events.  Message: { ex.Message }");
 0110                    Console.WriteLine();
 0111                }
 112
 0113                return Task.CompletedTask;
 114            };
 115
 116            // For this example, exceptions will just be logged to the console.
 117
 118            Task processErrorHandler(ProcessErrorEventArgs eventArgs)
 119            {
 0120                if (eventArgs.CancellationToken.IsCancellationRequested)
 121                {
 0122                    return Task.CompletedTask;
 123                }
 124
 0125                Console.WriteLine("===============================");
 0126                Console.WriteLine($"The error handler was invoked during the operation: { eventArgs.Operation ?? "Unknow
 0127                Console.WriteLine("===============================");
 0128                Console.WriteLine();
 129
 0130                return Task.CompletedTask;
 131            }
 132
 0133            processor.PartitionInitializingAsync += partitionInitializingHandler;
 0134            processor.ProcessEventAsync += processEventHandler;
 0135            processor.ProcessErrorAsync += processErrorHandler;
 136
 137            try
 138            {
 139                // In order to begin processing, an explicit call must be made to the processor.  This will instruct the
 140                // processing in the background, invoking your handlers when they are needed.
 141
 0142                await processor.StartProcessingAsync();
 143
 144                // Because processing takes place in the background, we'll wait for a small period of time and then trig
 145                // cancellation.
 146
 0147                using var cancellationSource = new CancellationTokenSource();
 0148                cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
 149
 0150                while (!cancellationSource.IsCancellationRequested)
 151                {
 0152                    await Task.Delay(TimeSpan.FromMilliseconds(250));
 153                }
 154
 155                // Once we arrive at this point, either cancellation was requested or we have processed all of our event
 156                // both cases, we'll want to shut down the processor.
 157
 0158                await processor.StopProcessingAsync();
 0159            }
 160            finally
 161            {
 162                // It is encouraged that you unregister your handlers when you have finished
 163                // using the Event Processor to ensure proper cleanup.  This is especially
 164                // important when using lambda expressions or handlers in any form that may
 165                // contain closure scopes or hold other references.
 166
 0167                processor.PartitionInitializingAsync -= partitionInitializingHandler;
 0168                processor.ProcessEventAsync -= processEventHandler;
 0169                processor.ProcessErrorAsync -= processErrorHandler;
 170            }
 171
 172            // The Event Processor client has been stopped and is not explicitly disposable; there
 173            // is nothing further that we need to do for cleanup.
 174
 0175            Console.WriteLine();
 0176        }
 177    }
 178}