| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using Azure.Core; |
| | 7 | | using Azure.Messaging.EventHubs.Core; |
| | 8 | | using Azure.Messaging.EventHubs.Diagnostics; |
| | 9 | |
|
| | 10 | | namespace Azure.Messaging.EventHubs.Producer |
| | 11 | | { |
| | 12 | | /// <summary> |
| | 13 | | /// A set of <see cref="EventData" /> with size constraints known up-front, |
| | 14 | | /// intended to be sent to the Event Hubs service as a single batch. |
| | 15 | | /// </summary> |
| | 16 | | /// |
| | 17 | | /// <remarks> |
| | 18 | | /// The operations for this class are thread-safe and will prevent changes when |
| | 19 | | /// actively being published to the Event Hubs service. |
| | 20 | | /// </remarks> |
| | 21 | | /// |
| | 22 | | public sealed class EventDataBatch : IDisposable |
| | 23 | | { |
| | 24 | | /// <summary>An object instance to use as the synchronization root for ensuring the thread-safety of operations. |
| 106 | 25 | | private readonly object SyncGuard = new object(); |
| | 26 | |
|
| | 27 | | /// <summary>A flag indicating that the batch is locked, such as when in use during a publish operation.</summar |
| | 28 | | private bool _locked = false; |
| | 29 | |
|
| | 30 | | /// <summary> |
| | 31 | | /// The maximum size allowed for the batch, in bytes. This includes the events in the batch as |
| | 32 | | /// well as any overhead for the batch itself when sent to the Event Hubs service. |
| | 33 | | /// </summary> |
| | 34 | | /// |
| 2 | 35 | | public long MaximumSizeInBytes => InnerBatch.MaximumSizeInBytes; |
| | 36 | |
|
| | 37 | | /// <summary> |
| | 38 | | /// The size of the batch, in bytes, as it will be sent to the Event Hubs |
| | 39 | | /// service. |
| | 40 | | /// </summary> |
| | 41 | | /// |
| 2 | 42 | | public long SizeInBytes => InnerBatch.SizeInBytes; |
| | 43 | |
|
| | 44 | | /// <summary> |
| | 45 | | /// The count of events contained in the batch. |
| | 46 | | /// </summary> |
| | 47 | | /// |
| 2 | 48 | | public int Count => InnerBatch.Count; |
| | 49 | |
|
| | 50 | | /// <summary> |
| | 51 | | /// The set of options that should be used when publishing the batch. |
| | 52 | | /// </summary> |
| | 53 | | /// |
| 202 | 54 | | internal SendEventOptions SendOptions { get; } |
| | 55 | |
|
| | 56 | | /// <summary> |
| | 57 | | /// The transport-specific batch responsible for performing the batch operations |
| | 58 | | /// in a manner compatible with the associated <see cref="TransportProducer" />. |
| | 59 | | /// </summary> |
| | 60 | | /// |
| 112 | 61 | | private TransportEventBatch InnerBatch { get; } |
| | 62 | |
|
| | 63 | | /// <summary> |
| | 64 | | /// The fully qualified Event Hubs namespace that the batch is associated with. To be used |
| | 65 | | /// during instrumentation. |
| | 66 | | /// </summary> |
| | 67 | | /// |
| 40 | 68 | | private string FullyQualifiedNamespace { get; } |
| | 69 | |
|
| | 70 | | /// <summary> |
| | 71 | | /// The name of the Event Hub that the batch is associated with, specific to the |
| | 72 | | /// Event Hubs namespace that contains it. To be used during instrumentation. |
| | 73 | | /// </summary> |
| | 74 | | /// |
| 40 | 75 | | private string EventHubName { get; } |
| | 76 | |
|
| | 77 | | /// <summary> |
| | 78 | | /// The list of diagnostic identifiers of events added to this batch. To be used during |
| | 79 | | /// instrumentation. |
| | 80 | | /// </summary> |
| | 81 | | /// |
| 152 | 82 | | private List<string> EventDiagnosticIdentifiers { get; } = new List<string>(); |
| | 83 | |
|
| | 84 | | /// <summary> |
| | 85 | | /// Initializes a new instance of the <see cref="EventDataBatch"/> class. |
| | 86 | | /// </summary> |
| | 87 | | /// |
| | 88 | | /// <param name="transportBatch">The transport-specific batch responsible for performing the batch operations.< |
| | 89 | | /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace to use for instrumentation.</ |
| | 90 | | /// <param name="eventHubName">The name of the specific Event Hub to associate the events with during instrument |
| | 91 | | /// <param name="sendOptions">The set of options that should be used when publishing the batch.</param> |
| | 92 | | /// |
| | 93 | | /// <remarks> |
| | 94 | | /// As an internal type, this class performs only basic sanity checks against its arguments. It |
| | 95 | | /// is assumed that callers are trusted and have performed deep validation. |
| | 96 | | /// |
| | 97 | | /// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose; |
| | 98 | | /// creation of clones or otherwise protecting the parameters is assumed to be the purview of the |
| | 99 | | /// caller. |
| | 100 | | /// </remarks> |
| | 101 | | /// |
| 106 | 102 | | internal EventDataBatch(TransportEventBatch transportBatch, |
| 106 | 103 | | string fullyQualifiedNamespace, |
| 106 | 104 | | string eventHubName, |
| 106 | 105 | | SendEventOptions sendOptions) |
| | 106 | | { |
| 106 | 107 | | Argument.AssertNotNull(transportBatch, nameof(transportBatch)); |
| 104 | 108 | | Argument.AssertNotNullOrEmpty(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace)); |
| 100 | 109 | | Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName)); |
| 96 | 110 | | Argument.AssertNotNull(sendOptions, nameof(sendOptions)); |
| | 111 | |
|
| 94 | 112 | | InnerBatch = transportBatch; |
| 94 | 113 | | FullyQualifiedNamespace = fullyQualifiedNamespace; |
| 94 | 114 | | EventHubName = eventHubName; |
| 94 | 115 | | SendOptions = sendOptions; |
| 94 | 116 | | } |
| | 117 | |
|
| | 118 | | /// <summary> |
| | 119 | | /// Attempts to add an event to the batch, ensuring that the size |
| | 120 | | /// of the batch does not exceed its maximum. |
| | 121 | | /// </summary> |
| | 122 | | /// |
| | 123 | | /// <param name="eventData">The event to attempt to add to the batch.</param> |
| | 124 | | /// |
| | 125 | | /// <returns><c>true</c> if the event was added; otherwise, <c>false</c>.</returns> |
| | 126 | | /// |
| | 127 | | public bool TryAdd(EventData eventData) |
| | 128 | | { |
| 44 | 129 | | lock (SyncGuard) |
| | 130 | | { |
| 44 | 131 | | AssertNotLocked(); |
| | 132 | |
|
| 40 | 133 | | eventData = eventData.Clone(); |
| 40 | 134 | | EventDataInstrumentation.InstrumentEvent(eventData, FullyQualifiedNamespace, EventHubName); |
| | 135 | |
|
| 40 | 136 | | var added = InnerBatch.TryAdd(eventData); |
| | 137 | |
|
| 40 | 138 | | if ((added) && (EventDataInstrumentation.TryExtractDiagnosticId(eventData, out string diagnosticId))) |
| | 139 | | { |
| 10 | 140 | | EventDiagnosticIdentifiers.Add(diagnosticId); |
| | 141 | | } |
| | 142 | |
|
| 40 | 143 | | return added; |
| | 144 | | } |
| 40 | 145 | | } |
| | 146 | |
|
| | 147 | | /// <summary> |
| | 148 | | /// Performs the task needed to clean up resources used by the <see cref="EventDataBatch" />. |
| | 149 | | /// </summary> |
| | 150 | | /// |
| | 151 | | public void Dispose() |
| | 152 | | { |
| 14 | 153 | | lock (SyncGuard) |
| | 154 | | { |
| 14 | 155 | | AssertNotLocked(); |
| 12 | 156 | | InnerBatch.Dispose(); |
| 12 | 157 | | } |
| 12 | 158 | | } |
| | 159 | |
|
| | 160 | | /// <summary> |
| | 161 | | /// Clears the batch, removing all events and resetting the |
| | 162 | | /// available size. |
| | 163 | | /// </summary> |
| | 164 | | /// |
| | 165 | | internal void Clear() |
| | 166 | | { |
| 6 | 167 | | lock (SyncGuard) |
| | 168 | | { |
| 6 | 169 | | AssertNotLocked(); |
| 4 | 170 | | InnerBatch.Clear(); |
| 4 | 171 | | } |
| 4 | 172 | | } |
| | 173 | |
|
| | 174 | | /// <summary> |
| | 175 | | /// Represents the batch as an enumerable set of specific representations of an event. |
| | 176 | | /// </summary> |
| | 177 | | /// |
| | 178 | | /// <typeparam name="T">The specific event representation being requested.</typeparam> |
| | 179 | | /// |
| | 180 | | /// <returns>The set of events as an enumerable of the requested type.</returns> |
| | 181 | | /// |
| 48 | 182 | | internal IEnumerable<T> AsEnumerable<T>() => InnerBatch.AsEnumerable<T>(); |
| | 183 | |
|
| | 184 | | /// <summary> |
| | 185 | | /// Gets the list of diagnostic identifiers of events added to this batch. |
| | 186 | | /// </summary> |
| | 187 | | /// |
| | 188 | | /// <returns>A read-only list of diagnostic identifiers.</returns> |
| | 189 | | /// |
| 36 | 190 | | internal IReadOnlyList<string> GetEventDiagnosticIdentifiers() => EventDiagnosticIdentifiers; |
| | 191 | |
|
| | 192 | | /// <summary> |
| | 193 | | /// Locks the batch to prevent new events from being added while a service |
| | 194 | | /// operation is active. |
| | 195 | | /// </summary> |
| | 196 | | /// |
| | 197 | | internal void Lock() |
| | 198 | | { |
| 44 | 199 | | lock (SyncGuard) |
| | 200 | | { |
| 44 | 201 | | _locked = true; |
| 44 | 202 | | } |
| 44 | 203 | | } |
| | 204 | |
|
| | 205 | | /// <summary> |
| | 206 | | /// Unlocks the batch, allowing new events to be added. |
| | 207 | | /// </summary> |
| | 208 | | /// |
| | 209 | | internal void Unlock() |
| | 210 | | { |
| 44 | 211 | | lock (SyncGuard) |
| | 212 | | { |
| 44 | 213 | | _locked = false; |
| 44 | 214 | | } |
| 44 | 215 | | } |
| | 216 | |
|
| | 217 | | /// <summary> |
| | 218 | | /// Validates that the batch is not in a locked state, triggering an |
| | 219 | | /// invalid operation if it is. |
| | 220 | | /// </summary> |
| | 221 | | /// |
| | 222 | | /// <exception cref="InvalidOperationException">Occurs when the batch is locked.</exception> |
| | 223 | | /// |
| | 224 | | private void AssertNotLocked() |
| | 225 | | { |
| 64 | 226 | | if (_locked) |
| | 227 | | { |
| 8 | 228 | | throw new InvalidOperationException(Resources.EventBatchIsLocked); |
| | 229 | | } |
| 56 | 230 | | } |
| | 231 | | } |
| | 232 | | } |