| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.Threading.Tasks; |
| | 7 | | using Azure.Messaging.EventHubs.Producer; |
| | 8 | | using Azure.Messaging.EventHubs.Samples.Infrastructure; |
| | 9 | |
|
| | 10 | | namespace Azure.Messaging.EventHubs.Samples |
| | 11 | | { |
| | 12 | | /// <summary> |
| | 13 | | /// An example of publishing events using multiple batches. |
| | 14 | | /// </summary> |
| | 15 | | /// |
| | 16 | | public class Sample04_PublishMultipleEventBatches : IEventHubsSample |
| | 17 | | { |
| | 18 | | /// <summary> |
| | 19 | | /// The name of the sample. |
| | 20 | | /// </summary> |
| | 21 | | /// |
| 0 | 22 | | public string Name => nameof(Sample04_PublishMultipleEventBatches); |
| | 23 | |
|
| | 24 | | /// <summary> |
| | 25 | | /// A short description of the sample. |
| | 26 | | /// </summary> |
| | 27 | | /// |
| 0 | 28 | | public string Description => "An example of publishing events using multiple batches."; |
| | 29 | |
|
| | 30 | | /// <summary> |
| | 31 | | /// Runs the sample using the specified Event Hubs connection information. |
| | 32 | | /// </summary> |
| | 33 | | /// |
| | 34 | | /// <param name="connectionString">The connection string for the Event Hubs namespace that the sample should tar |
| | 35 | | /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru |
| | 36 | | /// |
| | 37 | | public async Task RunAsync(string connectionString, |
| | 38 | | string eventHubName) |
| | 39 | | { |
| | 40 | | // It is important to be aware that the EventDataBatch is responsible for unmanaged resources and should be |
| | 41 | | // after it has been published. A batch cannot be reused nor published multiple times. In the case where |
| | 42 | | // that you would like to publish do not fit into a single batch, a new batch should be created. |
| | 43 | | // |
| | 44 | | // In this example, we'll create a set of events that will need to span multiple batches to publish. As wi |
| | 45 | | // examples, we'll begin by creating an Event Hub producer client to create the batches and publish events |
| | 46 | |
|
| 0 | 47 | | await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName)) |
| | 48 | | { |
| | 49 | | // Because the maximum size of an event batch is dictated by the Event Hubs service and varies between t |
| | 50 | | // service plan levels, we'll create a batch to query for its maximum allowed size before we create our |
| | 51 | |
|
| | 52 | | long maximumBatchSize; |
| | 53 | |
|
| 0 | 54 | | using (EventDataBatch measureBatch = await producerClient.CreateBatchAsync()) |
| | 55 | | { |
| 0 | 56 | | maximumBatchSize = measureBatch.MaximumSizeInBytes; |
| 0 | 57 | | } |
| | 58 | |
|
| | 59 | | // Create our set of events such that we'll need three batches to publish them all. |
| | 60 | |
|
| 0 | 61 | | int eventsPerBatch = 4; |
| 0 | 62 | | int batchCount = 3; |
| 0 | 63 | | int eventCount = (batchCount * eventsPerBatch); |
| 0 | 64 | | long eventSize = (maximumBatchSize / eventsPerBatch); |
| | 65 | |
|
| 0 | 66 | | Queue<EventData> eventsToPublish = new Queue<EventData>(eventCount); |
| | 67 | |
|
| 0 | 68 | | for (int index = 0; index < eventCount; ++index) |
| | 69 | | { |
| | 70 | | // Because the content of the event is not interesting to us, we'll |
| | 71 | | // use an empty array of the correct size. |
| | 72 | |
|
| 0 | 73 | | eventsToPublish.Enqueue(new EventData(new byte[eventSize])); |
| | 74 | | } |
| | 75 | |
|
| | 76 | | // Now that our events are available, create the batches and send them. |
| | 77 | |
|
| 0 | 78 | | int currentBatch = 0; |
| | 79 | |
|
| 0 | 80 | | while (eventsToPublish.Count > 0) |
| | 81 | | { |
| 0 | 82 | | using (EventDataBatch eventBatch = await producerClient.CreateBatchAsync()) |
| | 83 | | { |
| 0 | 84 | | while ((TryDequeue(eventsToPublish, out EventData currentEvent)) && (eventBatch.TryAdd(currentEv |
| | 85 | | { |
| | 86 | | } |
| | 87 | |
|
| | 88 | | // When an event could not be dequeued or could not be added to the batch, then the batch is rea |
| | 89 | |
|
| 0 | 90 | | if (eventBatch.Count > 0) |
| | 91 | | { |
| 0 | 92 | | await producerClient.SendAsync(eventBatch); |
| | 93 | |
|
| 0 | 94 | | ++currentBatch; |
| 0 | 95 | | Console.WriteLine($"Batch: { currentBatch } containing { eventBatch.Count } events was publi |
| | 96 | | } |
| 0 | 97 | | } |
| | 98 | |
|
| | 99 | | // At this point, the batch has passed it's "using" scope and has been safely disposed of. If there |
| | 100 | | // queue, a new batch will be created for them. |
| | 101 | | } |
| | 102 | |
|
| 0 | 103 | | Console.WriteLine(); |
| 0 | 104 | | Console.WriteLine("All events have been published."); |
| 0 | 105 | | } |
| | 106 | |
|
| | 107 | | // At this point, our client has passed its "using" scope and has safely been disposed of. We |
| | 108 | | // have no further obligations. |
| | 109 | |
|
| 0 | 110 | | Console.WriteLine(); |
| 0 | 111 | | } |
| | 112 | |
|
| | 113 | | /// <summary> |
| | 114 | | /// Attempts to dequeue an event from the specified <paramref name="queue"/>. |
| | 115 | | /// </summary> |
| | 116 | | /// |
| | 117 | | /// <param name="queue">The queue to attempt to dequeue from.</param> |
| | 118 | | /// <param name="currentEvent">The current event that was dequeued, or <c>null</c> if no event was available.</p |
| | 119 | | /// |
| | 120 | | /// <returns><c>true</c> if an event was dequeued; otherwise, <c>false</c>.</returns> |
| | 121 | | /// |
| | 122 | | private static bool TryDequeue(Queue<EventData> queue, |
| | 123 | | out EventData currentEvent) |
| | 124 | | { |
| 0 | 125 | | if (queue.Count > 0) |
| | 126 | | { |
| 0 | 127 | | currentEvent = queue.Dequeue(); |
| 0 | 128 | | return true; |
| | 129 | | } |
| | 130 | |
|
| 0 | 131 | | currentEvent = null; |
| 0 | 132 | | return false; |
| | 133 | | } |
| | 134 | | } |
| | 135 | | } |