| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.Linq; |
| | 7 | | using System.Text; |
| | 8 | | using System.Threading; |
| | 9 | | using System.Threading.Tasks; |
| | 10 | | using Azure.Messaging.EventHubs.Consumer; |
| | 11 | | using Azure.Messaging.EventHubs.Producer; |
| | 12 | | using Azure.Messaging.EventHubs.Samples.Infrastructure; |
| | 13 | |
|
| | 14 | | namespace Azure.Messaging.EventHubs.Samples |
| | 15 | | { |
| | 16 | | /// <summary> |
| | 17 | | /// An example of reading events from a single Event Hub partition, starting at a well-known position. |
| | 18 | | /// </summary> |
| | 19 | | /// |
| | 20 | | public class Sample10_ReadEventsFromAKnownPosition : IEventHubsSample |
| | 21 | | { |
| | 22 | | /// <summary> |
| | 23 | | /// The name of the sample. |
| | 24 | | /// </summary> |
| | 25 | | /// |
| 0 | 26 | | public string Name => nameof(Sample10_ReadEventsFromAKnownPosition); |
| | 27 | |
|
| | 28 | | /// <summary> |
| | 29 | | /// A short description of the sample. |
| | 30 | | /// </summary> |
| | 31 | | /// |
| 0 | 32 | | public string Description => "An example of reading events from a single Event Hub partition, starting at a well |
| | 33 | |
|
| | 34 | | /// <summary> |
| | 35 | | /// Runs the sample using the specified Event Hubs connection information. |
| | 36 | | /// </summary> |
| | 37 | | /// |
| | 38 | | /// <param name="connectionString">The connection string for the Event Hubs namespace that the sample should tar |
| | 39 | | /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that she sample should ru |
| | 40 | | /// |
| | 41 | | public async Task RunAsync(string connectionString, |
| | 42 | | string eventHubName) |
| | 43 | | { |
| | 44 | | // In this example, we'll make use of multiple clients in order to publish an event that we will then read b |
| | 45 | | // for reading events in the partition. Our initial consumer will begin watching for new events published t |
| | 46 | | // Hub. Before we can publish events and have them observed, we will need to ask the consumer to perform an |
| | 47 | | // the partition. |
| | 48 | | // |
| | 49 | | // Each event that a consumer reads will have attributes set that describe the event's location in the parti |
| | 50 | | // number, and the date/time that it was enqueued. These attributes can be used to create a new consumer th |
| | 51 | | // |
| | 52 | | // With Event Hubs, it is the responsibility of an application consuming events to keep track of those that |
| | 53 | | // and to manage where in the partition the consumer begins reading events. This is done by using the posit |
| | 54 | | // state, commonly known as "creating a checkpoint." |
| | 55 | | // |
| | 56 | | // The goal is to preserve the position of an event in some form of durable state, such as writing it to a d |
| | 57 | | // consuming application crashes or is otherwise restarted, it can retrieve that checkpoint information and |
| | 58 | | // begins reading at the position where it left off. |
| | 59 | | // |
| | 60 | | // It is important to note that there is potential for a consumer to process an event and be unable to prese |
| | 61 | | // consumer must be able to deal with processing the same event multiple times without it causing data corru |
| | 62 | | // Event Hubs, like most event streaming systems, guarantees "at least once" delivery; even in cases where t |
| | 63 | | // there is a small possibility that the service will return an event multiple times. |
| | 64 | | // |
| | 65 | | // To demonstrate, we will publish a batch of events to be read by an initial consumer. The third event tha |
| | 66 | | // and another consumer will use its attributes to start reading the event that follows, reading the set of |
| | 67 | | // the first three. |
| | 68 | |
|
| | 69 | | string firstPartition; |
| | 70 | | EventData thirdEvent; |
| | 71 | |
|
| 0 | 72 | | int eventBatchSize = 50; |
| | 73 | |
|
| 0 | 74 | | await using (var initialConsumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGr |
| | 75 | | { |
| | 76 | | // We will start by using the consumer client inspect the Event Hub and select the first partition to op |
| | 77 | |
|
| 0 | 78 | | firstPartition = (await initialConsumerClient.GetPartitionIdsAsync()).First(); |
| | 79 | |
|
| | 80 | | // Each time the consumer looks to read events, we'll ask that it waits only a short time before emittin |
| | 81 | | // an empty event, so that our code has the chance to run without indefinite blocking. |
| | 82 | |
|
| 0 | 83 | | ReadEventOptions readOptions = new ReadEventOptions |
| 0 | 84 | | { |
| 0 | 85 | | MaximumWaitTime = TimeSpan.FromMilliseconds(150) |
| 0 | 86 | | }; |
| | 87 | |
|
| | 88 | | // As a preventative measure, we'll also specify that cancellation should occur after 30 seconds, so tha |
| | 89 | | // in the event of a service error where the events we've published cannot be read. |
| | 90 | |
|
| 0 | 91 | | using CancellationTokenSource cancellationSource = new CancellationTokenSource(); |
| 0 | 92 | | cancellationSource.CancelAfter(TimeSpan.FromSeconds(60)); |
| | 93 | |
|
| 0 | 94 | | List<EventData> receivedEvents = new List<EventData>(); |
| 0 | 95 | | bool wereEventsPublished = false; |
| | 96 | |
|
| 0 | 97 | | await foreach (PartitionEvent currentEvent in initialConsumerClient.ReadEventsFromPartitionAsync(firstPa |
| | 98 | | { |
| 0 | 99 | | if (!wereEventsPublished) |
| | 100 | | { |
| 0 | 101 | | await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName)) |
| | 102 | | { |
| | 103 | | // When we publish the event batch, we'll specify the partition to ensure that our consumer |
| | 104 | | // operating against the same partition. |
| | 105 | |
|
| 0 | 106 | | using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(new CreateBatchOptio |
| | 107 | |
|
| 0 | 108 | | for (int index = 0; index < eventBatchSize; ++index) |
| | 109 | | { |
| 0 | 110 | | eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"I am event #{ index }"))); |
| | 111 | | } |
| | 112 | |
|
| 0 | 113 | | await producerClient.SendAsync(eventBatch); |
| 0 | 114 | | wereEventsPublished = true; |
| | 115 | |
|
| 0 | 116 | | await Task.Delay(250); |
| 0 | 117 | | Console.WriteLine($"The event batch with { eventBatchSize } events has been published."); |
| 0 | 118 | | } |
| | 119 | |
|
| | 120 | | // Since we know that there was no event to observe for this iteration, |
| | 121 | | // we'll just skip to the next one. |
| | 122 | |
|
| 0 | 123 | | continue; |
| | 124 | | } |
| | 125 | |
|
| | 126 | | // Because publishing and reading events is asynchronous, the events that we published may not |
| | 127 | | // be immediately available for our consumer to see, so we'll have to guard against an empty event b |
| | 128 | | // punctuation if our actual event is not available within the waiting time period. |
| | 129 | |
|
| 0 | 130 | | if (currentEvent.Data != null) |
| | 131 | | { |
| 0 | 132 | | receivedEvents.Add(currentEvent.Data); |
| | 133 | |
|
| 0 | 134 | | if (receivedEvents.Count >= eventBatchSize) |
| | 135 | | { |
| | 136 | | break; |
| | 137 | | } |
| | 138 | | } |
| | 139 | | } |
| | 140 | |
|
| | 141 | | // Print out the events that we read, which will be the entire set that |
| | 142 | | // we had published. |
| | 143 | |
|
| 0 | 144 | | Console.WriteLine(); |
| 0 | 145 | | Console.WriteLine($"The initial consumer processed { receivedEvents.Count } events of the { eventBatchSi |
| | 146 | |
|
| 0 | 147 | | foreach (EventData eventData in receivedEvents) |
| | 148 | | { |
| | 149 | | // The body of our event was an encoded string; we'll recover the |
| | 150 | | // message by reversing the encoding process. |
| | 151 | |
|
| 0 | 152 | | string message = Encoding.UTF8.GetString(eventData.Body.ToArray()); |
| 0 | 153 | | Console.WriteLine($"\tEvent Message: \"{ message }\""); |
| | 154 | | } |
| | 155 | |
|
| | 156 | | // Remember the third event that was consumed. |
| | 157 | |
|
| 0 | 158 | | thirdEvent = receivedEvents[2]; |
| 0 | 159 | | } |
| | 160 | |
|
| | 161 | | // At this point, our initial consumer client has passed its "using" scope and has been safely disposed of. |
| | 162 | | // |
| | 163 | | // Create a new consumer for the partition, specifying the sequence number of the third event as the locatio |
| | 164 | | // by default, the consumer will read the next available event following that sequence number, allowing it t |
| | 165 | | // |
| | 166 | | // Because our second consumer will begin watching the partition at a specific event, there is no need to as |
| | 167 | | // we begin iterating, the consumer will locate the proper place in the partition to read from. |
| | 168 | |
|
| 0 | 169 | | await using (var newConsumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupN |
| | 170 | | { |
| | 171 | | // We will consume the events using the new consumer until all of the published events have been receive |
| | 172 | |
|
| 0 | 173 | | using CancellationTokenSource cancellationSource = new CancellationTokenSource(); |
| 0 | 174 | | cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); |
| | 175 | |
|
| 0 | 176 | | List<EventData> receivedEvents = new List<EventData>(); |
| 0 | 177 | | int expectedCount = (eventBatchSize - 3); |
| 0 | 178 | | EventPosition startingPosition = EventPosition.FromSequenceNumber(thirdEvent.SequenceNumber); |
| | 179 | |
|
| 0 | 180 | | await foreach (PartitionEvent currentEvent in newConsumerClient.ReadEventsFromPartitionAsync(firstPartit |
| | 181 | | { |
| 0 | 182 | | receivedEvents.Add(currentEvent.Data); |
| | 183 | |
|
| 0 | 184 | | if (receivedEvents.Count >= expectedCount) |
| | 185 | | { |
| | 186 | | break; |
| | 187 | | } |
| | 188 | | } |
| | 189 | |
|
| | 190 | | // Print out the events that we received. |
| | 191 | |
|
| 0 | 192 | | Console.WriteLine(); |
| 0 | 193 | | Console.WriteLine(); |
| 0 | 194 | | Console.WriteLine($"The new consumer processed { receivedEvents.Count } events of the { eventBatchSize } |
| | 195 | |
|
| 0 | 196 | | foreach (EventData eventData in receivedEvents) |
| | 197 | | { |
| | 198 | | // The body of our event was an encoded string; we'll recover the |
| | 199 | | // message by reversing the encoding process. |
| | 200 | |
|
| 0 | 201 | | string message = Encoding.UTF8.GetString(eventData.Body.ToArray()); |
| 0 | 202 | | Console.WriteLine($"\tEvent Message: \"{ message }\""); |
| | 203 | | } |
| 0 | 204 | | } |
| | 205 | |
|
| | 206 | | // At this point, our clients and connection have passed their "using" scope and have safely been disposed o |
| | 207 | | // have no further obligations. |
| | 208 | |
|
| 0 | 209 | | Console.WriteLine(); |
| 0 | 210 | | } |
| | 211 | | } |
| | 212 | | } |