< Summary

Class:Azure.Messaging.EventHubs.Samples.Sample10_ReadEventsFromAKnownPosition
Assembly:Azure.Messaging.EventHubs.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\samples\Sample10_ReadEventsFromAKnownPosition.cs
Covered lines:0
Uncovered lines:53
Coverable lines:53
Total lines:212
Line coverage:0% (0 of 53)
Covered branches:0
Total branches:78
Branch coverage:0% (0 of 78)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\samples\Sample10_ReadEventsFromAKnownPosition.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Linq;
 7using System.Text;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using Azure.Messaging.EventHubs.Consumer;
 11using Azure.Messaging.EventHubs.Producer;
 12using Azure.Messaging.EventHubs.Samples.Infrastructure;
 13
 14namespace Azure.Messaging.EventHubs.Samples
 15{
 16    /// <summary>
 17    ///   An example of reading events from a single Event Hub partition, starting at a well-known position.
 18    /// </summary>
 19    ///
 20    public class Sample10_ReadEventsFromAKnownPosition : IEventHubsSample
 21    {
 22        /// <summary>
 23        ///   The name of the sample.
 24        /// </summary>
 25        ///
 026        public string Name => nameof(Sample10_ReadEventsFromAKnownPosition);
 27
 28        /// <summary>
 29        ///   A short description of the sample.
 30        /// </summary>
 31        ///
 032        public string Description => "An example of reading events from a single Event Hub partition, starting at a well
 33
 34        /// <summary>
 35        ///   Runs the sample using the specified Event Hubs connection information.
 36        /// </summary>
 37        ///
 38        /// <param name="connectionString">The connection string for the Event Hubs namespace that the sample should tar
 39        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that she sample should ru
 40        ///
 41        public async Task RunAsync(string connectionString,
 42                                   string eventHubName)
 43        {
 44            // In this example, we'll make use of multiple clients in order to publish an event that we will then read b
 45            // for reading events in the partition.  Our initial consumer will begin watching for new events published t
 46            // Hub.  Before we can publish events and have them observed, we will need to ask the consumer to perform an
 47            // the partition.
 48            //
 49            // Each event that a consumer reads will have attributes set that describe the event's location in the parti
 50            // number, and the date/time that it was enqueued.  These attributes can be used to create a new consumer th
 51            //
 52            // With Event Hubs, it is the responsibility of an application consuming events to keep track of those that 
 53            // and to manage where in the partition the consumer begins reading events.  This is done by using the posit
 54            // state, commonly known as "creating a checkpoint."
 55            //
 56            // The goal is to preserve the position of an event in some form of durable state, such as writing it to a d
 57            // consuming application crashes or is otherwise restarted, it can retrieve that checkpoint information and 
 58            // begins reading at the position where it left off.
 59            //
 60            // It is important to note that there is potential for a consumer to process an event and be unable to prese
 61            // consumer must be able to deal with processing the same event multiple times without it causing data corru
 62            // Event Hubs, like most event streaming systems, guarantees "at least once" delivery; even in cases where t
 63            // there is a small possibility that the service will return an event multiple times.
 64            //
 65            // To demonstrate, we will publish a batch of events to be read by an initial consumer.  The third event tha
 66            // and another consumer will use its attributes to start reading the event that follows, reading the set of 
 67            // the first three.
 68
 69            string firstPartition;
 70            EventData thirdEvent;
 71
 072            int eventBatchSize = 50;
 73
 074            await using (var initialConsumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGr
 75            {
 76                // We will start by using the consumer client inspect the Event Hub and select the first partition to op
 77
 078                firstPartition = (await initialConsumerClient.GetPartitionIdsAsync()).First();
 79
 80                // Each time the consumer looks to read events, we'll ask that it waits only a short time before emittin
 81                // an empty event, so that our code has the chance to run without indefinite blocking.
 82
 083                ReadEventOptions readOptions = new ReadEventOptions
 084                {
 085                    MaximumWaitTime = TimeSpan.FromMilliseconds(150)
 086                };
 87
 88                // As a preventative measure, we'll also specify that cancellation should occur after 30 seconds, so tha
 89                // in the event of a service error where the events we've published cannot be read.
 90
 091                using CancellationTokenSource cancellationSource = new CancellationTokenSource();
 092                cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));
 93
 094                List<EventData> receivedEvents = new List<EventData>();
 095                bool wereEventsPublished = false;
 96
 097                await foreach (PartitionEvent currentEvent in initialConsumerClient.ReadEventsFromPartitionAsync(firstPa
 98                {
 099                    if (!wereEventsPublished)
 100                    {
 0101                        await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName))
 102                        {
 103                            // When we publish the event batch, we'll specify the partition to ensure that our consumer 
 104                            // operating against the same partition.
 105
 0106                            using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(new CreateBatchOptio
 107
 0108                            for (int index = 0; index < eventBatchSize; ++index)
 109                            {
 0110                                eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"I am event #{ index }")));
 111                            }
 112
 0113                            await producerClient.SendAsync(eventBatch);
 0114                            wereEventsPublished = true;
 115
 0116                            await Task.Delay(250);
 0117                            Console.WriteLine($"The event batch with { eventBatchSize } events has been published.");
 0118                        }
 119
 120                        // Since we know that there was no event to observe for this iteration,
 121                        // we'll just skip to the next one.
 122
 0123                        continue;
 124                    }
 125
 126                    // Because publishing and reading events is asynchronous, the events that we published may not
 127                    // be immediately available for our consumer to see, so we'll have to guard against an empty event b
 128                    // punctuation if our actual event is not available within the waiting time period.
 129
 0130                    if (currentEvent.Data != null)
 131                    {
 0132                        receivedEvents.Add(currentEvent.Data);
 133
 0134                        if (receivedEvents.Count >= eventBatchSize)
 135                        {
 136                            break;
 137                        }
 138                    }
 139                }
 140
 141                // Print out the events that we read, which will be the entire set that
 142                // we had published.
 143
 0144                Console.WriteLine();
 0145                Console.WriteLine($"The initial consumer processed { receivedEvents.Count } events of the { eventBatchSi
 146
 0147                foreach (EventData eventData in receivedEvents)
 148                {
 149                    // The body of our event was an encoded string; we'll recover the
 150                    // message by reversing the encoding process.
 151
 0152                    string message = Encoding.UTF8.GetString(eventData.Body.ToArray());
 0153                    Console.WriteLine($"\tEvent Message: \"{ message }\"");
 154                }
 155
 156                // Remember the third event that was consumed.
 157
 0158                thirdEvent = receivedEvents[2];
 0159            }
 160
 161            // At this point, our initial consumer client has passed its "using" scope and has been safely disposed of.
 162            //
 163            // Create a new consumer for the partition, specifying the sequence number of the third event as the locatio
 164            // by default, the consumer will read the next available event following that sequence number, allowing it t
 165            //
 166            // Because our second consumer will begin watching the partition at a specific event, there is no need to as
 167            // we begin iterating, the consumer will locate the proper place in the partition to read from.
 168
 0169            await using (var newConsumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupN
 170            {
 171                // We will consume the events using the new consumer until all of the published events have been receive
 172
 0173                using CancellationTokenSource cancellationSource = new CancellationTokenSource();
 0174                cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
 175
 0176                List<EventData> receivedEvents = new List<EventData>();
 0177                int expectedCount = (eventBatchSize - 3);
 0178                EventPosition startingPosition = EventPosition.FromSequenceNumber(thirdEvent.SequenceNumber);
 179
 0180                await foreach (PartitionEvent currentEvent in newConsumerClient.ReadEventsFromPartitionAsync(firstPartit
 181                {
 0182                    receivedEvents.Add(currentEvent.Data);
 183
 0184                    if (receivedEvents.Count >= expectedCount)
 185                    {
 186                        break;
 187                    }
 188                }
 189
 190                // Print out the events that we received.
 191
 0192                Console.WriteLine();
 0193                Console.WriteLine();
 0194                Console.WriteLine($"The new consumer processed { receivedEvents.Count } events of the { eventBatchSize }
 195
 0196                foreach (EventData eventData in receivedEvents)
 197                {
 198                    // The body of our event was an encoded string; we'll recover the
 199                    // message by reversing the encoding process.
 200
 0201                    string message = Encoding.UTF8.GetString(eventData.Body.ToArray());
 0202                    Console.WriteLine($"\tEvent Message: \"{ message }\"");
 203                }
 0204            }
 205
 206            // At this point, our clients and connection have passed their "using" scope and have safely been disposed o
 207            // have no further obligations.
 208
 0209            Console.WriteLine();
 0210        }
 211    }
 212}