| | 1 | | // Copyright (c) Microsoft. All rights reserved. |
| | 2 | | // Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; |
| | 3 | |
|
| | 4 | | namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor |
| | 5 | | { |
| | 6 | | using System; |
| | 7 | | using System.Collections.Generic; |
| | 8 | | using System.Reflection; |
| | 9 | | using System.Threading; |
| | 10 | | using System.Threading.Tasks; |
| | 11 | |
|
| | 12 | | /// <summary> |
| | 13 | | /// Mocks for the underlying event hub client. Using these instead of the regular wrappers allows unit testing witho |
| | 14 | | /// By default, EventProcessorService.EventHubClientFactory is a EventHubWrappers.EventHubClientFactory. |
| | 15 | | /// To use the mocks, change it to a EventHubMocks.EventHubClientFactoryMock. |
| | 16 | | /// </summary> |
| | 17 | | public class EventHubMocks |
| | 18 | | { |
| | 19 | | /// <summary> |
| | 20 | | /// Mock of an Event Hub partition receiver. |
| | 21 | | /// </summary> |
| | 22 | | public class PartitionReceiverMock : EventHubWrappers.IPartitionReceiver |
| | 23 | | { |
| | 24 | | /// <summary> |
| | 25 | | /// |
| | 26 | | /// </summary> |
| 0 | 27 | | public static Dictionary<string, PartitionReceiverMock> receivers = new Dictionary<string, PartitionReceiver |
| | 28 | |
|
| | 29 | | /// <summary> |
| | 30 | | /// |
| | 31 | | /// </summary> |
| | 32 | | protected readonly string partitionId; |
| | 33 | | /// <summary> |
| | 34 | | /// |
| | 35 | | /// </summary> |
| | 36 | | protected long sequenceNumber; |
| | 37 | | /// <summary> |
| | 38 | | /// |
| | 39 | | /// </summary> |
| | 40 | | protected volatile IPartitionReceiveHandler outerHandler; |
| | 41 | | /// <summary> |
| | 42 | | /// |
| | 43 | | /// </summary> |
| | 44 | | protected int handlerBatchSize; |
| | 45 | | /// <summary> |
| | 46 | | /// |
| | 47 | | /// </summary> |
| 0 | 48 | | public int HandlerBatchSize { get => this.handlerBatchSize; } |
| | 49 | | /// <summary> |
| | 50 | | /// |
| | 51 | | /// </summary> |
| | 52 | | protected bool invokeWhenNoEvents; |
| | 53 | | /// <summary> |
| | 54 | | /// |
| | 55 | | /// </summary> |
| | 56 | | protected TimeSpan pumpTimeout; |
| | 57 | | /// <summary> |
| | 58 | | /// |
| | 59 | | /// </summary> |
| 0 | 60 | | public TimeSpan ReceiveTimeout { get => this.pumpTimeout; } |
| | 61 | | /// <summary> |
| | 62 | | /// Not meaningful in this mock but exposed so that tests can verify. |
| | 63 | | /// </summary> |
| 0 | 64 | | public ReceiverOptions Options { get; private set; } |
| | 65 | | /// <summary> |
| | 66 | | /// |
| | 67 | | /// </summary> |
| | 68 | | protected readonly CancellationToken token; |
| | 69 | |
|
| | 70 | | /// <summary> |
| | 71 | | /// Construct the partition receiver mock. |
| | 72 | | /// </summary> |
| | 73 | | /// <param name="partitionId"></param> |
| | 74 | | /// <param name="sequenceNumber"></param> |
| | 75 | | /// <param name="token"></param> |
| | 76 | | /// <param name="pumpTimeout"></param> |
| | 77 | | /// <param name="options"></param> |
| | 78 | | /// <param name="tag"></param> |
| 8 | 79 | | public PartitionReceiverMock(string partitionId, long sequenceNumber, CancellationToken token, TimeSpan pump |
| 8 | 80 | | ReceiverOptions options, string tag) |
| | 81 | | { |
| 8 | 82 | | this.partitionId = partitionId; |
| 8 | 83 | | this.sequenceNumber = sequenceNumber; |
| 8 | 84 | | this.token = token; |
| 8 | 85 | | this.pumpTimeout = pumpTimeout; |
| 8 | 86 | | this.Options = options; |
| | 87 | |
|
| 8 | 88 | | if (tag != null) |
| | 89 | | { |
| 0 | 90 | | PartitionReceiverMock.receivers[tag] = this; |
| | 91 | | } |
| 8 | 92 | | } |
| | 93 | |
|
| | 94 | | /// <summary> |
| | 95 | | /// Not meaningful in this mock but exposed so that tests can verify. |
| | 96 | | /// </summary> |
| 0 | 97 | | public int PrefetchCount { get; set; } |
| | 98 | |
|
| | 99 | | /// <summary> |
| | 100 | | /// Receive mock events. |
| | 101 | | /// </summary> |
| | 102 | | /// <param name="maxEventCount"></param> |
| | 103 | | /// <param name="waitTime"></param> |
| | 104 | | /// <returns></returns> |
| | 105 | | public virtual Task<IEnumerable<EventData>> ReceiveAsync(int maxEventCount, TimeSpan waitTime) |
| | 106 | | { |
| 232 | 107 | | List<EventData> events = null; |
| 232 | 108 | | events = new List<EventData>(); |
| 232 | 109 | | long lastSeq = this.sequenceNumber + maxEventCount; |
| 5104 | 110 | | for (int i = 0; i < maxEventCount; i++) |
| | 111 | | { |
| 2320 | 112 | | this.sequenceNumber++; |
| 2320 | 113 | | byte[] body = new byte[] { 0x4D, 0x4F, 0x43, 0x4B, 0x42, 0x4F, 0x44, 0x59 }; // M O C K B O D Y |
| 2320 | 114 | | EventData e = new EventData(body); |
| 2320 | 115 | | PropertyInfo propertyInfo = e.GetType().GetProperty("LastSequenceNumber", BindingFlags.NonPublic | B |
| 2320 | 116 | | propertyInfo.SetValue(e, lastSeq); |
| 2320 | 117 | | e.SystemProperties = new EventData.SystemPropertiesCollection(this.sequenceNumber, DateTime.UtcNow, |
| 2320 | 118 | | e.Properties.Add("userkey", "uservalue"); |
| 2320 | 119 | | events.Add(e); |
| | 120 | | } |
| 232 | 121 | | Thread.Sleep(50); |
| 232 | 122 | | EventProcessorEventSource.Current.Message($"MOCK ReceiveAsync returning {maxEventCount} events for parti |
| 232 | 123 | | return Task.FromResult<IEnumerable<EventData>>(events); |
| | 124 | | } |
| | 125 | |
|
| | 126 | | /// <summary> |
| | 127 | | /// Set a mock receive handler. |
| | 128 | | /// </summary> |
| | 129 | | /// <param name="receiveHandler"></param> |
| | 130 | | /// <param name="invokeWhenNoEvents"></param> |
| | 131 | | public virtual void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invokeWhenNoEvents = fal |
| | 132 | | { |
| 16 | 133 | | EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.SetReceiveHandler"); |
| 16 | 134 | | this.outerHandler = receiveHandler; |
| 16 | 135 | | this.invokeWhenNoEvents = invokeWhenNoEvents; |
| 16 | 136 | | if (this.outerHandler != null) |
| | 137 | | { |
| 8 | 138 | | this.handlerBatchSize = this.outerHandler.MaxBatchSize; |
| 16 | 139 | | Task.Run(() => PumpLoop()); |
| | 140 | | } |
| | 141 | | else |
| | 142 | | { |
| 8 | 143 | | EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.SetReceiveHandler with NULL handl |
| | 144 | | } |
| 8 | 145 | | } |
| | 146 | |
|
| | 147 | | /// <summary> |
| | 148 | | /// Close the mock receiver. |
| | 149 | | /// </summary> |
| | 150 | | /// <returns></returns> |
| | 151 | | public virtual Task CloseAsync() |
| | 152 | | { |
| 8 | 153 | | EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.CloseAsync"); |
| 8 | 154 | | return Task.CompletedTask; |
| | 155 | | } |
| | 156 | |
|
| | 157 | | private async void PumpLoop() |
| | 158 | | { |
| 400 | 159 | | while ((!this.token.IsCancellationRequested) && (this.outerHandler != null)) |
| | 160 | | { |
| | 161 | | // TODO random batch sizes |
| 392 | 162 | | IEnumerable<EventData> events = ReceiveAsync(this.handlerBatchSize, this.pumpTimeout).Result; |
| 392 | 163 | | IPartitionReceiveHandler capturedHandler = this.outerHandler; |
| 392 | 164 | | if (capturedHandler != null) |
| | 165 | | { |
| 384 | 166 | | if (events != null) |
| | 167 | | { |
| 226 | 168 | | EventProcessorEventSource.Current.Message("MOCK Sending messages to handler"); |
| 226 | 169 | | await capturedHandler.ProcessEventsAsync(events).ConfigureAwait(false); |
| | 170 | | } |
| 158 | 171 | | else if (this.invokeWhenNoEvents) |
| | 172 | | { |
| 0 | 173 | | EventProcessorEventSource.Current.Message("MOCK Sending empty batch to handler"); |
| 0 | 174 | | await capturedHandler.ProcessEventsAsync(events); |
| | 175 | | } |
| | 176 | | else |
| | 177 | | { |
| 158 | 178 | | EventProcessorEventSource.Current.Message("MOCK Suppressing empty batch"); |
| | 179 | | } |
| | 180 | | } |
| | 181 | | else |
| | 182 | | { |
| 8 | 183 | | EventProcessorEventSource.Current.Message("MOCK Handler has been detached"); |
| | 184 | | } |
| | 185 | | } |
| 8 | 186 | | EventProcessorEventSource.Current.Message("MOCK Message generation ending"); |
| 8 | 187 | | } |
| | 188 | | } |
| | 189 | |
|
| | 190 | | /// <summary> |
| | 191 | | /// Mock of EventHubClient class. |
| | 192 | | /// </summary> |
| | 193 | | public class EventHubClientMock : EventHubWrappers.IEventHubClient |
| | 194 | | { |
| | 195 | | /// <summary> |
| | 196 | | /// |
| | 197 | | /// </summary> |
| | 198 | | protected readonly int partitionCount; |
| | 199 | | /// <summary> |
| | 200 | | /// |
| | 201 | | /// </summary> |
| | 202 | | protected readonly EventHubsConnectionStringBuilder csb; |
| | 203 | | /// <summary> |
| | 204 | | /// |
| | 205 | | /// </summary> |
| | 206 | | protected readonly string tag; |
| | 207 | | /// <summary> |
| | 208 | | /// |
| | 209 | | /// </summary> |
| | 210 | | protected CancellationToken token = new CancellationToken(); |
| | 211 | |
|
| | 212 | | /// <summary> |
| | 213 | | /// Construct the mock. |
| | 214 | | /// </summary> |
| | 215 | | /// <param name="partitionCount"></param> |
| | 216 | | /// <param name="csb"></param> |
| | 217 | | /// <param name="tag"></param> |
| 12 | 218 | | public EventHubClientMock(int partitionCount, EventHubsConnectionStringBuilder csb, string tag) |
| | 219 | | { |
| 12 | 220 | | this.partitionCount = partitionCount; |
| 12 | 221 | | this.csb = csb; |
| 12 | 222 | | this.tag = tag; |
| 12 | 223 | | } |
| | 224 | |
|
| | 225 | | internal void SetCancellationToken(CancellationToken t) |
| | 226 | | { |
| 0 | 227 | | this.token = t; |
| 0 | 228 | | } |
| | 229 | |
|
| | 230 | | /// <summary> |
| | 231 | | /// Get runtime info of the fake event hub. |
| | 232 | | /// </summary> |
| | 233 | | /// <returns></returns> |
| | 234 | | public virtual Task<EventHubRuntimeInformation> GetRuntimeInformationAsync() |
| | 235 | | { |
| 12 | 236 | | EventHubRuntimeInformation ehri = new EventHubRuntimeInformation(); |
| 12 | 237 | | ehri.PartitionCount = this.partitionCount; |
| 12 | 238 | | ehri.PartitionIds = new string[this.partitionCount]; |
| 112 | 239 | | for (int i = 0; i < this.partitionCount; i++) |
| | 240 | | { |
| 44 | 241 | | ehri.PartitionIds[i] = i.ToString(); |
| | 242 | | } |
| 12 | 243 | | ehri.Path = this.csb.EntityPath; |
| 12 | 244 | | EventProcessorEventSource.Current.Message($"MOCK GetRuntimeInformationAsync for {ehri.Path}"); |
| 12 | 245 | | return Task.FromResult<EventHubRuntimeInformation>(ehri); |
| | 246 | | } |
| | 247 | |
|
| | 248 | | /// <summary> |
| | 249 | | /// Create a mock receiver on the fake event hub. |
| | 250 | | /// </summary> |
| | 251 | | /// <param name="consumerGroupName"></param> |
| | 252 | | /// <param name="partitionId"></param> |
| | 253 | | /// <param name="eventPosition"></param> |
| | 254 | | /// <param name="epoch"></param> |
| | 255 | | /// <param name="receiverOptions"></param> |
| | 256 | | /// <returns></returns> |
| | 257 | | public virtual EventHubWrappers.IPartitionReceiver CreateEpochReceiver(string consumerGroupName, string part |
| | 258 | | { |
| 6 | 259 | | EventProcessorEventSource.Current.Message($"MOCK CreateEpochReceiver(CG {consumerGroupName}, part {parti |
| | 260 | | // TODO implement epoch semantics |
| 6 | 261 | | long startSeq = CalculateStartSeq(eventPosition); |
| 6 | 262 | | return new PartitionReceiverMock(partitionId, startSeq, this.token, this.csb.OperationTimeout, receiverO |
| | 263 | | } |
| | 264 | |
|
| | 265 | | /// <summary> |
| | 266 | | /// |
| | 267 | | /// </summary> |
| | 268 | | /// <param name="eventPosition"></param> |
| | 269 | | /// <returns></returns> |
| | 270 | | protected long CalculateStartSeq(EventPosition eventPosition) |
| | 271 | | { |
| 8 | 272 | | long startSeq = 0L; |
| 8 | 273 | | if (eventPosition.SequenceNumber.HasValue) |
| | 274 | | { |
| 2 | 275 | | startSeq = eventPosition.SequenceNumber.Value; |
| | 276 | | } |
| | 277 | | else |
| | 278 | | { |
| 6 | 279 | | PropertyInfo propertyInfo = eventPosition.GetType().GetProperty("Offset", BindingFlags.NonPublic | B |
| 6 | 280 | | string offset = (string)propertyInfo.GetValue(eventPosition); |
| 6 | 281 | | if (!string.IsNullOrEmpty(offset)) |
| | 282 | | { |
| 6 | 283 | | startSeq = (long.Parse(offset) / 100L); |
| | 284 | | } |
| | 285 | | } |
| 8 | 286 | | return startSeq; |
| | 287 | | } |
| | 288 | |
|
| | 289 | | /// <summary> |
| | 290 | | /// Close the mock EventHubClient. |
| | 291 | | /// </summary> |
| | 292 | | /// <returns></returns> |
| | 293 | | public virtual Task CloseAsync() |
| | 294 | | { |
| 12 | 295 | | EventProcessorEventSource.Current.Message("MOCK IEventHubClient.CloseAsync"); |
| 12 | 296 | | return Task.CompletedTask; |
| | 297 | | } |
| | 298 | | } |
| | 299 | |
|
| | 300 | | /// <summary> |
| | 301 | | /// An EventHubClient factory which dispenses mocks. |
| | 302 | | /// </summary> |
| | 303 | | public class EventHubClientFactoryMock : EventHubWrappers.IEventHubClientFactory |
| | 304 | | { |
| | 305 | | /// <summary> |
| | 306 | | /// |
| | 307 | | /// </summary> |
| | 308 | | protected readonly int partitionCount; |
| | 309 | | /// <summary> |
| | 310 | | /// |
| | 311 | | /// </summary> |
| | 312 | | protected readonly string tag; |
| | 313 | |
|
| | 314 | | /// <summary> |
| | 315 | | /// Construct the mock factory. |
| | 316 | | /// </summary> |
| | 317 | | /// <param name="partitionCount"></param> |
| | 318 | | /// <param name="tag"></param> |
| 10 | 319 | | public EventHubClientFactoryMock(int partitionCount, string tag = null) |
| | 320 | | { |
| 10 | 321 | | this.partitionCount = partitionCount; |
| 10 | 322 | | this.tag = tag; |
| 10 | 323 | | } |
| | 324 | |
|
| | 325 | | /// <summary> |
| | 326 | | /// Dispense a mock instance operating on a fake event hub with name taken from the connection string. |
| | 327 | | /// </summary> |
| | 328 | | /// <param name="connectionString"></param> |
| | 329 | | /// <param name="receiveTimeout"></param> |
| | 330 | | /// <returns></returns> |
| | 331 | | public virtual EventHubWrappers.IEventHubClient Create(string connectionString, TimeSpan receiveTimeout) |
| | 332 | | { |
| 10 | 333 | | EventProcessorEventSource.Current.Message($"MOCK Creating IEventHubClient {connectionString} with {this. |
| 10 | 334 | | EventHubsConnectionStringBuilder csb = new EventHubsConnectionStringBuilder(connectionString); |
| 10 | 335 | | csb.OperationTimeout = receiveTimeout; |
| 10 | 336 | | return new EventHubClientMock(this.partitionCount, csb, this.tag); |
| | 337 | | } |
| | 338 | | } |
| | 339 | | } |
| | 340 | | } |