|   |  | 1 |  | // Copyright (c) Microsoft Corporation. All rights reserved. | 
|   |  | 2 |  | // Licensed under the MIT License. | 
|   |  | 3 |  |  | 
|   |  | 4 |  | using System; | 
|   |  | 5 |  | using System.Collections.Generic; | 
|   |  | 6 |  | using System.Linq; | 
|   |  | 7 |  | using System.Text; | 
|   |  | 8 |  | using System.Threading; | 
|   |  | 9 |  | using System.Threading.Tasks; | 
|   |  | 10 |  | using Azure.Messaging.EventHubs.Consumer; | 
|   |  | 11 |  | using Azure.Messaging.EventHubs.Producer; | 
|   |  | 12 |  | using Azure.Messaging.EventHubs.Samples.Infrastructure; | 
|   |  | 13 |  |  | 
|   |  | 14 |  | namespace Azure.Messaging.EventHubs.Samples | 
|   |  | 15 |  | { | 
|   |  | 16 |  |     /// <summary> | 
|   |  | 17 |  |     ///   An example of reading events from a single Event Hub partition, starting at a well-known position. | 
|   |  | 18 |  |     /// </summary> | 
|   |  | 19 |  |     /// | 
|   |  | 20 |  |     public class Sample10_ReadEventsFromAKnownPosition : IEventHubsSample | 
|   |  | 21 |  |     { | 
|   |  | 22 |  |         /// <summary> | 
|   |  | 23 |  |         ///   The name of the sample. | 
|   |  | 24 |  |         /// </summary> | 
|   |  | 25 |  |         /// | 
|   | 0 | 26 |  |         public string Name => nameof(Sample10_ReadEventsFromAKnownPosition); | 
|   |  | 27 |  |  | 
|   |  | 28 |  |         /// <summary> | 
|   |  | 29 |  |         ///   A short description of the sample. | 
|   |  | 30 |  |         /// </summary> | 
|   |  | 31 |  |         /// | 
|   | 0 | 32 |  |         public string Description => "An example of reading events from a single Event Hub partition, starting at a well | 
|   |  | 33 |  |  | 
|   |  | 34 |  |         /// <summary> | 
|   |  | 35 |  |         ///   Runs the sample using the specified Event Hubs connection information. | 
|   |  | 36 |  |         /// </summary> | 
|   |  | 37 |  |         /// | 
|   |  | 38 |  |         /// <param name="connectionString">The connection string for the Event Hubs namespace that the sample should tar | 
|   |  | 39 |  |         /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that she sample should ru | 
|   |  | 40 |  |         /// | 
|   |  | 41 |  |         public async Task RunAsync(string connectionString, | 
|   |  | 42 |  |                                    string eventHubName) | 
|   |  | 43 |  |         { | 
|   |  | 44 |  |             // In this example, we'll make use of multiple clients in order to publish an event that we will then read b | 
|   |  | 45 |  |             // for reading events in the partition.  Our initial consumer will begin watching for new events published t | 
|   |  | 46 |  |             // Hub.  Before we can publish events and have them observed, we will need to ask the consumer to perform an | 
|   |  | 47 |  |             // the partition. | 
|   |  | 48 |  |             // | 
|   |  | 49 |  |             // Each event that a consumer reads will have attributes set that describe the event's location in the parti | 
|   |  | 50 |  |             // number, and the date/time that it was enqueued.  These attributes can be used to create a new consumer th | 
|   |  | 51 |  |             // | 
|   |  | 52 |  |             // With Event Hubs, it is the responsibility of an application consuming events to keep track of those that  | 
|   |  | 53 |  |             // and to manage where in the partition the consumer begins reading events.  This is done by using the posit | 
|   |  | 54 |  |             // state, commonly known as "creating a checkpoint." | 
|   |  | 55 |  |             // | 
|   |  | 56 |  |             // The goal is to preserve the position of an event in some form of durable state, such as writing it to a d | 
|   |  | 57 |  |             // consuming application crashes or is otherwise restarted, it can retrieve that checkpoint information and  | 
|   |  | 58 |  |             // begins reading at the position where it left off. | 
|   |  | 59 |  |             // | 
|   |  | 60 |  |             // It is important to note that there is potential for a consumer to process an event and be unable to prese | 
|   |  | 61 |  |             // consumer must be able to deal with processing the same event multiple times without it causing data corru | 
|   |  | 62 |  |             // Event Hubs, like most event streaming systems, guarantees "at least once" delivery; even in cases where t | 
|   |  | 63 |  |             // there is a small possibility that the service will return an event multiple times. | 
|   |  | 64 |  |             // | 
|   |  | 65 |  |             // To demonstrate, we will publish a batch of events to be read by an initial consumer.  The third event tha | 
|   |  | 66 |  |             // and another consumer will use its attributes to start reading the event that follows, reading the set of  | 
|   |  | 67 |  |             // the first three. | 
|   |  | 68 |  |  | 
|   |  | 69 |  |             string firstPartition; | 
|   |  | 70 |  |             EventData thirdEvent; | 
|   |  | 71 |  |  | 
|   | 0 | 72 |  |             int eventBatchSize = 50; | 
|   |  | 73 |  |  | 
|   | 0 | 74 |  |             await using (var initialConsumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGr | 
|   |  | 75 |  |             { | 
|   |  | 76 |  |                 // We will start by using the consumer client inspect the Event Hub and select the first partition to op | 
|   |  | 77 |  |  | 
|   | 0 | 78 |  |                 firstPartition = (await initialConsumerClient.GetPartitionIdsAsync()).First(); | 
|   |  | 79 |  |  | 
|   |  | 80 |  |                 // Each time the consumer looks to read events, we'll ask that it waits only a short time before emittin | 
|   |  | 81 |  |                 // an empty event, so that our code has the chance to run without indefinite blocking. | 
|   |  | 82 |  |  | 
|   | 0 | 83 |  |                 ReadEventOptions readOptions = new ReadEventOptions | 
|   | 0 | 84 |  |                 { | 
|   | 0 | 85 |  |                     MaximumWaitTime = TimeSpan.FromMilliseconds(150) | 
|   | 0 | 86 |  |                 }; | 
|   |  | 87 |  |  | 
|   |  | 88 |  |                 // As a preventative measure, we'll also specify that cancellation should occur after 30 seconds, so tha | 
|   |  | 89 |  |                 // in the event of a service error where the events we've published cannot be read. | 
|   |  | 90 |  |  | 
|   | 0 | 91 |  |                 using CancellationTokenSource cancellationSource = new CancellationTokenSource(); | 
|   | 0 | 92 |  |                 cancellationSource.CancelAfter(TimeSpan.FromSeconds(60)); | 
|   |  | 93 |  |  | 
|   | 0 | 94 |  |                 List<EventData> receivedEvents = new List<EventData>(); | 
|   | 0 | 95 |  |                 bool wereEventsPublished = false; | 
|   |  | 96 |  |  | 
|   | 0 | 97 |  |                 await foreach (PartitionEvent currentEvent in initialConsumerClient.ReadEventsFromPartitionAsync(firstPa | 
|   |  | 98 |  |                 { | 
|   | 0 | 99 |  |                     if (!wereEventsPublished) | 
|   |  | 100 |  |                     { | 
|   | 0 | 101 |  |                         await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName)) | 
|   |  | 102 |  |                         { | 
|   |  | 103 |  |                             // When we publish the event batch, we'll specify the partition to ensure that our consumer  | 
|   |  | 104 |  |                             // operating against the same partition. | 
|   |  | 105 |  |  | 
|   | 0 | 106 |  |                             using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(new CreateBatchOptio | 
|   |  | 107 |  |  | 
|   | 0 | 108 |  |                             for (int index = 0; index < eventBatchSize; ++index) | 
|   |  | 109 |  |                             { | 
|   | 0 | 110 |  |                                 eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"I am event #{ index }"))); | 
|   |  | 111 |  |                             } | 
|   |  | 112 |  |  | 
|   | 0 | 113 |  |                             await producerClient.SendAsync(eventBatch); | 
|   | 0 | 114 |  |                             wereEventsPublished = true; | 
|   |  | 115 |  |  | 
|   | 0 | 116 |  |                             await Task.Delay(250); | 
|   | 0 | 117 |  |                             Console.WriteLine($"The event batch with { eventBatchSize } events has been published."); | 
|   | 0 | 118 |  |                         } | 
|   |  | 119 |  |  | 
|   |  | 120 |  |                         // Since we know that there was no event to observe for this iteration, | 
|   |  | 121 |  |                         // we'll just skip to the next one. | 
|   |  | 122 |  |  | 
|   | 0 | 123 |  |                         continue; | 
|   |  | 124 |  |                     } | 
|   |  | 125 |  |  | 
|   |  | 126 |  |                     // Because publishing and reading events is asynchronous, the events that we published may not | 
|   |  | 127 |  |                     // be immediately available for our consumer to see, so we'll have to guard against an empty event b | 
|   |  | 128 |  |                     // punctuation if our actual event is not available within the waiting time period. | 
|   |  | 129 |  |  | 
|   | 0 | 130 |  |                     if (currentEvent.Data != null) | 
|   |  | 131 |  |                     { | 
|   | 0 | 132 |  |                         receivedEvents.Add(currentEvent.Data); | 
|   |  | 133 |  |  | 
|   | 0 | 134 |  |                         if (receivedEvents.Count >= eventBatchSize) | 
|   |  | 135 |  |                         { | 
|   |  | 136 |  |                             break; | 
|   |  | 137 |  |                         } | 
|   |  | 138 |  |                     } | 
|   |  | 139 |  |                 } | 
|   |  | 140 |  |  | 
|   |  | 141 |  |                 // Print out the events that we read, which will be the entire set that | 
|   |  | 142 |  |                 // we had published. | 
|   |  | 143 |  |  | 
|   | 0 | 144 |  |                 Console.WriteLine(); | 
|   | 0 | 145 |  |                 Console.WriteLine($"The initial consumer processed { receivedEvents.Count } events of the { eventBatchSi | 
|   |  | 146 |  |  | 
|   | 0 | 147 |  |                 foreach (EventData eventData in receivedEvents) | 
|   |  | 148 |  |                 { | 
|   |  | 149 |  |                     // The body of our event was an encoded string; we'll recover the | 
|   |  | 150 |  |                     // message by reversing the encoding process. | 
|   |  | 151 |  |  | 
|   | 0 | 152 |  |                     string message = Encoding.UTF8.GetString(eventData.Body.ToArray()); | 
|   | 0 | 153 |  |                     Console.WriteLine($"\tEvent Message: \"{ message }\""); | 
|   |  | 154 |  |                 } | 
|   |  | 155 |  |  | 
|   |  | 156 |  |                 // Remember the third event that was consumed. | 
|   |  | 157 |  |  | 
|   | 0 | 158 |  |                 thirdEvent = receivedEvents[2]; | 
|   | 0 | 159 |  |             } | 
|   |  | 160 |  |  | 
|   |  | 161 |  |             // At this point, our initial consumer client has passed its "using" scope and has been safely disposed of. | 
|   |  | 162 |  |             // | 
|   |  | 163 |  |             // Create a new consumer for the partition, specifying the sequence number of the third event as the locatio | 
|   |  | 164 |  |             // by default, the consumer will read the next available event following that sequence number, allowing it t | 
|   |  | 165 |  |             // | 
|   |  | 166 |  |             // Because our second consumer will begin watching the partition at a specific event, there is no need to as | 
|   |  | 167 |  |             // we begin iterating, the consumer will locate the proper place in the partition to read from. | 
|   |  | 168 |  |  | 
|   | 0 | 169 |  |             await using (var newConsumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupN | 
|   |  | 170 |  |             { | 
|   |  | 171 |  |                 // We will consume the events using the new consumer until all of the published events have been receive | 
|   |  | 172 |  |  | 
|   | 0 | 173 |  |                 using CancellationTokenSource cancellationSource = new CancellationTokenSource(); | 
|   | 0 | 174 |  |                 cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); | 
|   |  | 175 |  |  | 
|   | 0 | 176 |  |                 List<EventData> receivedEvents = new List<EventData>(); | 
|   | 0 | 177 |  |                 int expectedCount = (eventBatchSize - 3); | 
|   | 0 | 178 |  |                 EventPosition startingPosition = EventPosition.FromSequenceNumber(thirdEvent.SequenceNumber); | 
|   |  | 179 |  |  | 
|   | 0 | 180 |  |                 await foreach (PartitionEvent currentEvent in newConsumerClient.ReadEventsFromPartitionAsync(firstPartit | 
|   |  | 181 |  |                 { | 
|   | 0 | 182 |  |                     receivedEvents.Add(currentEvent.Data); | 
|   |  | 183 |  |  | 
|   | 0 | 184 |  |                     if (receivedEvents.Count >= expectedCount) | 
|   |  | 185 |  |                     { | 
|   |  | 186 |  |                         break; | 
|   |  | 187 |  |                     } | 
|   |  | 188 |  |                 } | 
|   |  | 189 |  |  | 
|   |  | 190 |  |                 // Print out the events that we received. | 
|   |  | 191 |  |  | 
|   | 0 | 192 |  |                 Console.WriteLine(); | 
|   | 0 | 193 |  |                 Console.WriteLine(); | 
|   | 0 | 194 |  |                 Console.WriteLine($"The new consumer processed { receivedEvents.Count } events of the { eventBatchSize } | 
|   |  | 195 |  |  | 
|   | 0 | 196 |  |                 foreach (EventData eventData in receivedEvents) | 
|   |  | 197 |  |                 { | 
|   |  | 198 |  |                     // The body of our event was an encoded string; we'll recover the | 
|   |  | 199 |  |                     // message by reversing the encoding process. | 
|   |  | 200 |  |  | 
|   | 0 | 201 |  |                     string message = Encoding.UTF8.GetString(eventData.Body.ToArray()); | 
|   | 0 | 202 |  |                     Console.WriteLine($"\tEvent Message: \"{ message }\""); | 
|   |  | 203 |  |                 } | 
|   | 0 | 204 |  |             } | 
|   |  | 205 |  |  | 
|   |  | 206 |  |             // At this point, our clients and connection have passed their "using" scope and have safely been disposed o | 
|   |  | 207 |  |             // have no further obligations. | 
|   |  | 208 |  |  | 
|   | 0 | 209 |  |             Console.WriteLine(); | 
|   | 0 | 210 |  |         } | 
|   |  | 211 |  |     } | 
|   |  | 212 |  | } |