< Summary

Class:Azure.Messaging.EventHubs.Samples.Sample09_ReadOnlyNewEvents
Assembly:Azure.Messaging.EventHubs.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\samples\Sample09_ReadOnlyNewEvents.cs
Covered lines:0
Uncovered lines:35
Coverable lines:35
Total lines:143
Line coverage:0% (0 of 35)
Covered branches:0
Total branches:40
Branch coverage:0% (0 of 40)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\samples\Sample09_ReadOnlyNewEvents.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Text;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Messaging.EventHubs.Consumer;
 10using Azure.Messaging.EventHubs.Producer;
 11using Azure.Messaging.EventHubs.Samples.Infrastructure;
 12
 13namespace Azure.Messaging.EventHubs.Samples
 14{
 15    /// <summary>
 16    ///   An example of reading events, beginning with only those newly available from an Event Hub.
 17    /// </summary>
 18    ///
 19    public class Sample09_ReadOnlyNewEvents : IEventHubsSample
 20    {
 21        /// <summary>
 22        ///   The name of the sample.
 23        /// </summary>
 24        ///
 025        public string Name => nameof(Sample09_ReadOnlyNewEvents);
 26
 27        /// <summary>
 28        ///   A short description of the sample.
 29        /// </summary>
 30        ///
 031        public string Description => "An example of reading events, beginning with only those newly available from an Ev
 32
 33        /// <summary>
 34        ///   Runs the sample using the specified Event Hubs connection information.
 35        /// </summary>
 36        ///
 37        /// <param name="connectionString">The connection string for the Event Hubs namespace that the sample should tar
 38        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that she sample should ru
 39        ///
 40        public async Task RunAsync(string connectionString,
 41                                   string eventHubName)
 42        {
 43            // In this example, our consumer will read from the latest position instead of the earliest.  As a result, i
 44            // have previously been published.  Before we can publish the events and have them observed, we will need to
 45            // to perform a read operation in order for it to begin observing the Event Hub partitions.
 46            //
 47            // Each partition of an Event Hub represents potentially infinite stream of events.  When a consumer is read
 48            // point where it can assess that all events have been read and no more will be available.  As a result, whe
 49            // the available events for a partition, it will continue to wait for new events to arrive so that it can su
 50            // time, the iterator will block.
 51            //
 52            // In order to prevent the consumer from waiting forever for events, and blocking other code, there are two 
 53            // control this behavior.  First, signaling the cancellation token passed when reading will cause the consum
 54            // immediately.  This is desirable when you have decided that you are done reading and do not wish to contin
 55            // you would like control returned to your code momentarily to perform some action and then to continue read
 56            //
 57            // In that scenario, you may specify a maximum wait time which is applied to each iteration of the enumerato
 58            // event being available to read, the enumerator will emit an empty event in order to return control to the 
 59            // such as sending a heartbeat, emitting telemetry, or simply exiting the loop.
 60            //
 61            // For our loop, we'll specify a small wait time when we begin reading, which will allow control to return t
 62            // the events after we ensure the consumer is observing the partition.
 63
 064            await using (var consumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName
 65            {
 066                bool wereEventsPublished = false;
 067                int eventBatchCount = 0;
 068                List<EventData> receivedEvents = new List<EventData>();
 69
 70                // Each time the consumer looks to read events, we'll ask that it waits only a short time before emittin
 71                // an empty event, so that our code has the chance to run without indefinite blocking.
 72
 073                ReadEventOptions readOptions = new ReadEventOptions
 074                {
 075                    MaximumWaitTime = TimeSpan.FromMilliseconds(150)
 076                };
 77
 78                // As a preventative measure, we'll also specify that cancellation should occur after 2 minutes, so that
 79                // in the event of a service error where the events we've published cannot be read.
 80
 081                using CancellationTokenSource cancellationSource = new CancellationTokenSource();
 082                cancellationSource.CancelAfter(TimeSpan.FromMinutes(2));
 83
 84                // The reading of all events will default to the earliest events available in each partition; in order t
 85                // latest event, we'll need to specify that reading should not start at earliest.
 86
 087                await foreach (PartitionEvent currentEvent in consumerClient.ReadEventsAsync(startReadingAtEarliestEvent
 88                {
 089                    if (!wereEventsPublished)
 90                    {
 091                        await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName))
 92                        {
 093                            using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
 094                            eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Hello, Event Hubs!")));
 095                            eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Goodbye, Event Hubs!")));
 96
 097                            await producerClient.SendAsync(eventBatch);
 098                            wereEventsPublished = true;
 099                            eventBatchCount = eventBatch.Count;
 100
 0101                            await Task.Delay(250);
 0102                            Console.WriteLine("The event batch has been published.");
 0103                        }
 104
 105                        // Since we know that there was no event to observe for this iteration,
 106                        // we'll just skip to the next one.
 107
 0108                        continue;
 109                    }
 110
 111                    // Because publishing and receiving events is asynchronous, the events that we published may not
 112                    // be immediately available for our consumer to see, so we'll have to guard against an empty event b
 113                    // if our wait time interval has elapsed before the consumer observed the events that we published.
 114
 0115                    if (currentEvent.Data != null)
 116                    {
 0117                        receivedEvents.Add(currentEvent.Data);
 118
 0119                        if (receivedEvents.Count >= eventBatchCount)
 120                        {
 121                            break;
 122                        }
 123                    }
 124                }
 125
 126                // Print out the events that we received; the body is an encoded string; we'll recover the message by re
 127
 0128                Console.WriteLine();
 129
 0130                foreach (EventData currentEvent in receivedEvents)
 131                {
 0132                    string message = Encoding.UTF8.GetString(currentEvent.Body.ToArray());
 0133                    Console.WriteLine($"\tEvent Message: \"{ message }\"");
 134                }
 0135            }
 136
 137            // At this point, our clients have passed their "using" scope and have safely been disposed of.  We
 138            // have no further obligations.
 139
 0140            Console.WriteLine();
 0141        }
 142    }
 143}