| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Threading.Tasks; |
| | 6 | | using Azure.Messaging.EventHubs.Consumer; |
| | 7 | | using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure; |
| | 8 | | using Azure.Storage.Blobs; |
| | 9 | |
|
| | 10 | | namespace Azure.Messaging.EventHubs.Processor.Samples |
| | 11 | | { |
| | 12 | | /// <summary> |
| | 13 | | /// An introduction to the Event Processor client, illustrating how to create the client and perform basic |
| | 14 | | /// start and stop requests. |
| | 15 | | /// </summary> |
| | 16 | | /// |
| | 17 | | public class Sample01_HelloWorld : IEventHubsBlobCheckpointSample |
| | 18 | | { |
| | 19 | | /// <summary> |
| | 20 | | /// The name of the sample. |
| | 21 | | /// </summary> |
| | 22 | | /// |
| 0 | 23 | | public string Name => nameof(Sample01_HelloWorld); |
| | 24 | |
|
| | 25 | | /// <summary> |
| | 26 | | /// A short description of the sample. |
| | 27 | | /// </summary> |
| | 28 | | /// |
| 0 | 29 | | public string Description => "An introduction to the Event Processor client, illustrating how to create the clie |
| | 30 | |
|
| | 31 | | /// <summary> |
| | 32 | | /// Runs the sample using the specified Event Hubs and Azure storage connection information. |
| | 33 | | /// </summary> |
| | 34 | | /// |
| | 35 | | /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s |
| | 36 | | /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru |
| | 37 | | /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an |
| | 38 | | /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be |
| | 39 | | /// |
| | 40 | | public async Task RunAsync(string eventHubsConnectionString, |
| | 41 | | string eventHubName, |
| | 42 | | string blobStorageConnectionString, |
| | 43 | | string blobContainerName) |
| | 44 | | { |
| | 45 | | // An Event Processor client is associated with a specific Event Hub and consumer group. The consumer group |
| | 46 | | // a label that identifies one or more consumers as a set. Often, consumer groups are named after the respo |
| | 47 | | // of the consumer in an application, such as "Telemetry" or "OrderProcessing". When an Event Hub is create |
| | 48 | | // consumer group is created with it, called "$Default." |
| | 49 | | // |
| | 50 | | // Each processor has a unique view of the events in the partitions of an Event Hub, meaning that events are |
| | 51 | | // processors and are not removed from the partition when a processor reads them. This allows for one or mo |
| | 52 | | // Event Hub clients to read and process events from the partition at different speeds and beginning with di |
| | 53 | | // interfering with one another. |
| | 54 | | // |
| | 55 | | // An Event Processor client works cooperatively with other Event Processors configured for the same Event H |
| | 56 | | // processors will dynamically detect one another and distribute the responsibility for partitions among the |
| | 57 | | // new processor instances appear or others be removed, the work will be automatically redistributed among t |
| | 58 | | // |
| | 59 | | // In this example, our processor will work as a single instance and will make use of the default consumer g |
| | 60 | | // associated Event Hub. |
| | 61 | | // |
| | 62 | | // Our processor will an Azure Storage Blobs container client to use as a durable store for managing state a |
| | 63 | | // processors. |
| | 64 | |
|
| 0 | 65 | | string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; |
| 0 | 66 | | BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); |
| 0 | 67 | | EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS |
| | 68 | |
|
| | 69 | | // When the processor is first created, it is not actively performing any processing work. In order to proc |
| | 70 | | // handlers for the "ProcessEventAsync" and "ProcessErrorAsync" events. |
| | 71 | | // |
| | 72 | | // These handlers are what allows your code to be invoked when an event is available for processing or when |
| | 73 | | // be provided; without them, the processor will not be able to start. |
| | 74 | | // |
| | 75 | | // In this example, we'll simply register empty handlers. |
| | 76 | |
|
| 0 | 77 | | Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask; |
| 0 | 78 | | Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask; |
| | 79 | |
|
| 0 | 80 | | processor.ProcessEventAsync += processEventHandler; |
| 0 | 81 | | processor.ProcessErrorAsync += processErrorHandler; |
| | 82 | |
|
| | 83 | | try |
| | 84 | | { |
| | 85 | | // In order to begin processing, an explicit call must be made to the processor. This will instruct the |
| | 86 | | // processing in the background, invoking your handlers when they are needed. |
| | 87 | |
|
| 0 | 88 | | await processor.StartProcessingAsync(); |
| | 89 | |
|
| | 90 | | // It is important to note that the start call will return as soon as processing has begun; it will not |
| | 91 | | // wait for processing to complete. Should you want to perform other tasks while processing takes place |
| | 92 | | // If, instead, you would just like to allow processing to take place, you are responsible for blocking |
| | 93 | | // that the host process does not complete. |
| | 94 | | // |
| | 95 | | // In this example, we'll illustrate by waiting for a short delay, during which time processing is occur |
| | 96 | |
|
| 0 | 97 | | await Task.Delay(TimeSpan.FromSeconds(2)); |
| | 98 | |
|
| | 99 | | // When you wish to end processing, an explicit request must be made to do so. When that request comple |
| | 100 | | // processing has been confirmed to have stopped. |
| | 101 | |
|
| 0 | 102 | | await processor.StopProcessingAsync(); |
| 0 | 103 | | } |
| | 104 | | finally |
| | 105 | | { |
| | 106 | | // It is encouraged that you unregister your handlers when you have finished |
| | 107 | | // using the Event Processor to ensure proper cleanup. This is especially |
| | 108 | | // important when using lambda expressions or handlers in any form that may |
| | 109 | | // contain closure scopes or hold other references. |
| | 110 | |
|
| 0 | 111 | | processor.ProcessEventAsync -= processEventHandler; |
| 0 | 112 | | processor.ProcessErrorAsync -= processErrorHandler; |
| | 113 | | } |
| | 114 | |
|
| | 115 | | // The Event Processor client has been stopped and is not explicitly disposable; there |
| | 116 | | // is nothing further that we need to do for cleanup. |
| | 117 | |
|
| 0 | 118 | | Console.WriteLine(); |
| 0 | 119 | | } |
| | 120 | | } |
| | 121 | | } |