< Summary

Class:Microsoft.Azure.EventHubs.ServiceFabricProcessor.EventHubMocks
Assembly:Microsoft.Azure.EventHubs.ServiceFabricProcessor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.ServiceFabricProcessor\src\EventHubMocks.cs
Covered lines:80
Uncovered lines:10
Coverable lines:90
Total lines:340
Line coverage:88.8% (80 of 90)
Covered branches:20
Total branches:22
Branch coverage:90.9% (20 of 22)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-0%100%
get_HandlerBatchSize()-0%100%
get_ReceiveTimeout()-0%100%
get_Options()-0%100%
.ctor(...)-90%50%
get_PrefetchCount()-0%100%
ReceiveAsync(...)-100%100%
SetReceiveHandler(...)-100%100%
CloseAsync()-100%100%
PumpLoop()-85.71%90%
.ctor(...)-100%100%
SetCancellationToken(...)-0%100%
GetRuntimeInformationAsync()-100%100%
CreateEpochReceiver(...)-100%100%
CalculateStartSeq(...)-100%100%
CloseAsync()-100%100%
.ctor(...)-100%100%
Create(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.ServiceFabricProcessor\src\EventHubMocks.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
 3
 4namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Reflection;
 9    using System.Threading;
 10    using System.Threading.Tasks;
 11
 12    /// <summary>
 13    /// Mocks for the underlying event hub client. Using these instead of the regular wrappers allows unit testing witho
 14    /// By default, EventProcessorService.EventHubClientFactory is a EventHubWrappers.EventHubClientFactory.
 15    /// To use the mocks, change it to a EventHubMocks.EventHubClientFactoryMock.
 16    /// </summary>
 17    public class EventHubMocks
 18    {
 19        /// <summary>
 20        /// Mock of an Event Hub partition receiver.
 21        /// </summary>
 22        public class PartitionReceiverMock : EventHubWrappers.IPartitionReceiver
 23        {
 24            /// <summary>
 25            ///
 26            /// </summary>
 027            public static Dictionary<string, PartitionReceiverMock> receivers = new Dictionary<string, PartitionReceiver
 28
 29            /// <summary>
 30            ///
 31            /// </summary>
 32            protected readonly string partitionId;
 33            /// <summary>
 34            ///
 35            /// </summary>
 36            protected long sequenceNumber;
 37            /// <summary>
 38            ///
 39            /// </summary>
 40            protected volatile IPartitionReceiveHandler outerHandler;
 41            /// <summary>
 42            ///
 43            /// </summary>
 44            protected int handlerBatchSize;
 45            /// <summary>
 46            ///
 47            /// </summary>
 048            public int HandlerBatchSize { get => this.handlerBatchSize; }
 49            /// <summary>
 50            ///
 51            /// </summary>
 52            protected bool invokeWhenNoEvents;
 53            /// <summary>
 54            ///
 55            /// </summary>
 56            protected TimeSpan pumpTimeout;
 57            /// <summary>
 58            ///
 59            /// </summary>
 060            public TimeSpan ReceiveTimeout { get => this.pumpTimeout;  }
 61            /// <summary>
 62            /// Not meaningful in this mock but exposed so that tests can verify.
 63            /// </summary>
 064            public ReceiverOptions Options { get; private set; }
 65            /// <summary>
 66            ///
 67            /// </summary>
 68            protected readonly CancellationToken token;
 69
 70            /// <summary>
 71            /// Construct the partition receiver mock.
 72            /// </summary>
 73            /// <param name="partitionId"></param>
 74            /// <param name="sequenceNumber"></param>
 75            /// <param name="token"></param>
 76            /// <param name="pumpTimeout"></param>
 77            /// <param name="options"></param>
 78            /// <param name="tag"></param>
 879            public PartitionReceiverMock(string partitionId, long sequenceNumber, CancellationToken token, TimeSpan pump
 880                ReceiverOptions options, string tag)
 81            {
 882                this.partitionId = partitionId;
 883                this.sequenceNumber = sequenceNumber;
 884                this.token = token;
 885                this.pumpTimeout = pumpTimeout;
 886                this.Options = options;
 87
 888                if (tag != null)
 89                {
 090                    PartitionReceiverMock.receivers[tag] = this;
 91                }
 892            }
 93
 94            /// <summary>
 95            /// Not meaningful in this mock but exposed so that tests can verify.
 96            /// </summary>
 097            public int PrefetchCount { get; set; }
 98
 99            /// <summary>
 100            /// Receive mock events.
 101            /// </summary>
 102            /// <param name="maxEventCount"></param>
 103            /// <param name="waitTime"></param>
 104            /// <returns></returns>
 105            public virtual Task<IEnumerable<EventData>> ReceiveAsync(int maxEventCount, TimeSpan waitTime)
 106            {
 232107                List<EventData> events = null;
 232108                events = new List<EventData>();
 232109                long lastSeq = this.sequenceNumber + maxEventCount;
 5104110                for (int i = 0; i < maxEventCount; i++)
 111                {
 2320112                    this.sequenceNumber++;
 2320113                    byte[] body = new byte[] { 0x4D, 0x4F, 0x43, 0x4B, 0x42, 0x4F, 0x44, 0x59 }; // M O C K B O D Y
 2320114                    EventData e = new EventData(body);
 2320115                    PropertyInfo propertyInfo = e.GetType().GetProperty("LastSequenceNumber", BindingFlags.NonPublic | B
 2320116                    propertyInfo.SetValue(e, lastSeq);
 2320117                    e.SystemProperties = new EventData.SystemPropertiesCollection(this.sequenceNumber, DateTime.UtcNow, 
 2320118                    e.Properties.Add("userkey", "uservalue");
 2320119                    events.Add(e);
 120                }
 232121                Thread.Sleep(50);
 232122                EventProcessorEventSource.Current.Message($"MOCK ReceiveAsync returning {maxEventCount} events for parti
 232123                return Task.FromResult<IEnumerable<EventData>>(events);
 124            }
 125
 126            /// <summary>
 127            /// Set a mock receive handler.
 128            /// </summary>
 129            /// <param name="receiveHandler"></param>
 130            /// <param name="invokeWhenNoEvents"></param>
 131            public virtual void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invokeWhenNoEvents = fal
 132            {
 16133                EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.SetReceiveHandler");
 16134                this.outerHandler = receiveHandler;
 16135                this.invokeWhenNoEvents = invokeWhenNoEvents;
 16136                if (this.outerHandler != null)
 137                {
 8138                    this.handlerBatchSize = this.outerHandler.MaxBatchSize;
 16139                    Task.Run(() => PumpLoop());
 140                }
 141                else
 142                {
 8143                    EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.SetReceiveHandler with NULL handl
 144                }
 8145            }
 146
 147            /// <summary>
 148            /// Close the mock receiver.
 149            /// </summary>
 150            /// <returns></returns>
 151            public virtual Task CloseAsync()
 152            {
 8153                EventProcessorEventSource.Current.Message("MOCK IPartitionReceiver.CloseAsync");
 8154                return Task.CompletedTask;
 155            }
 156
 157            private async void PumpLoop()
 158            {
 400159                while ((!this.token.IsCancellationRequested) && (this.outerHandler != null))
 160                {
 161                    // TODO random batch sizes
 392162                    IEnumerable<EventData> events = ReceiveAsync(this.handlerBatchSize, this.pumpTimeout).Result;
 392163                    IPartitionReceiveHandler capturedHandler = this.outerHandler;
 392164                    if (capturedHandler != null)
 165                    {
 384166                        if (events != null)
 167                        {
 226168                            EventProcessorEventSource.Current.Message("MOCK Sending messages to handler");
 226169                            await capturedHandler.ProcessEventsAsync(events).ConfigureAwait(false);
 170                        }
 158171                        else if (this.invokeWhenNoEvents)
 172                        {
 0173                            EventProcessorEventSource.Current.Message("MOCK Sending empty batch to handler");
 0174                            await capturedHandler.ProcessEventsAsync(events);
 175                        }
 176                        else
 177                        {
 158178                            EventProcessorEventSource.Current.Message("MOCK Suppressing empty batch");
 179                        }
 180                    }
 181                    else
 182                    {
 8183                        EventProcessorEventSource.Current.Message("MOCK Handler has been detached");
 184                    }
 185                }
 8186                EventProcessorEventSource.Current.Message("MOCK Message generation ending");
 8187            }
 188        }
 189
 190        /// <summary>
 191        /// Mock of EventHubClient class.
 192        /// </summary>
 193        public class EventHubClientMock : EventHubWrappers.IEventHubClient
 194        {
 195            /// <summary>
 196            ///
 197            /// </summary>
 198            protected readonly int partitionCount;
 199            /// <summary>
 200            ///
 201            /// </summary>
 202            protected readonly EventHubsConnectionStringBuilder csb;
 203            /// <summary>
 204            ///
 205            /// </summary>
 206            protected readonly string tag;
 207            /// <summary>
 208            ///
 209            /// </summary>
 210            protected CancellationToken token = new CancellationToken();
 211
 212            /// <summary>
 213            /// Construct the mock.
 214            /// </summary>
 215            /// <param name="partitionCount"></param>
 216            /// <param name="csb"></param>
 217            /// <param name="tag"></param>
 12218            public EventHubClientMock(int partitionCount, EventHubsConnectionStringBuilder csb, string tag)
 219            {
 12220                this.partitionCount = partitionCount;
 12221                this.csb = csb;
 12222                this.tag = tag;
 12223            }
 224
 225            internal void SetCancellationToken(CancellationToken t)
 226            {
 0227                this.token = t;
 0228            }
 229
 230            /// <summary>
 231            /// Get runtime info of the fake event hub.
 232            /// </summary>
 233            /// <returns></returns>
 234            public virtual Task<EventHubRuntimeInformation> GetRuntimeInformationAsync()
 235            {
 12236                EventHubRuntimeInformation ehri = new EventHubRuntimeInformation();
 12237                ehri.PartitionCount = this.partitionCount;
 12238                ehri.PartitionIds = new string[this.partitionCount];
 112239                for (int i = 0; i < this.partitionCount; i++)
 240                {
 44241                    ehri.PartitionIds[i] = i.ToString();
 242                }
 12243                ehri.Path = this.csb.EntityPath;
 12244                EventProcessorEventSource.Current.Message($"MOCK GetRuntimeInformationAsync for {ehri.Path}");
 12245                return Task.FromResult<EventHubRuntimeInformation>(ehri);
 246            }
 247
 248            /// <summary>
 249            /// Create a mock receiver on the fake event hub.
 250            /// </summary>
 251            /// <param name="consumerGroupName"></param>
 252            /// <param name="partitionId"></param>
 253            /// <param name="eventPosition"></param>
 254            /// <param name="epoch"></param>
 255            /// <param name="receiverOptions"></param>
 256            /// <returns></returns>
 257            public virtual EventHubWrappers.IPartitionReceiver CreateEpochReceiver(string consumerGroupName, string part
 258            {
 6259                EventProcessorEventSource.Current.Message($"MOCK CreateEpochReceiver(CG {consumerGroupName}, part {parti
 260                // TODO implement epoch semantics
 6261                long startSeq = CalculateStartSeq(eventPosition);
 6262                return new PartitionReceiverMock(partitionId, startSeq, this.token, this.csb.OperationTimeout, receiverO
 263            }
 264
 265            /// <summary>
 266            ///
 267            /// </summary>
 268            /// <param name="eventPosition"></param>
 269            /// <returns></returns>
 270            protected long CalculateStartSeq(EventPosition eventPosition)
 271            {
 8272                long startSeq = 0L;
 8273                if (eventPosition.SequenceNumber.HasValue)
 274                {
 2275                    startSeq = eventPosition.SequenceNumber.Value;
 276                }
 277                else
 278                {
 6279                    PropertyInfo propertyInfo = eventPosition.GetType().GetProperty("Offset", BindingFlags.NonPublic | B
 6280                    string offset = (string)propertyInfo.GetValue(eventPosition);
 6281                    if (!string.IsNullOrEmpty(offset))
 282                    {
 6283                        startSeq = (long.Parse(offset) / 100L);
 284                    }
 285                }
 8286                return startSeq;
 287            }
 288
 289            /// <summary>
 290            /// Close the mock EventHubClient.
 291            /// </summary>
 292            /// <returns></returns>
 293            public virtual Task CloseAsync()
 294            {
 12295                EventProcessorEventSource.Current.Message("MOCK IEventHubClient.CloseAsync");
 12296                return Task.CompletedTask;
 297            }
 298        }
 299
 300        /// <summary>
 301        /// An EventHubClient factory which dispenses mocks.
 302        /// </summary>
 303        public class EventHubClientFactoryMock : EventHubWrappers.IEventHubClientFactory
 304        {
 305            /// <summary>
 306            ///
 307            /// </summary>
 308            protected readonly int partitionCount;
 309            /// <summary>
 310            ///
 311            /// </summary>
 312            protected readonly string tag;
 313
 314            /// <summary>
 315            /// Construct the mock factory.
 316            /// </summary>
 317            /// <param name="partitionCount"></param>
 318            /// <param name="tag"></param>
 10319            public EventHubClientFactoryMock(int partitionCount, string tag = null)
 320            {
 10321                this.partitionCount = partitionCount;
 10322                this.tag = tag;
 10323            }
 324
 325            /// <summary>
 326            /// Dispense a mock instance operating on a fake event hub with name taken from the connection string.
 327            /// </summary>
 328            /// <param name="connectionString"></param>
 329            /// <param name="receiveTimeout"></param>
 330            /// <returns></returns>
 331            public virtual EventHubWrappers.IEventHubClient Create(string connectionString, TimeSpan receiveTimeout)
 332            {
 10333                EventProcessorEventSource.Current.Message($"MOCK Creating IEventHubClient {connectionString} with {this.
 10334                EventHubsConnectionStringBuilder csb = new EventHubsConnectionStringBuilder(connectionString);
 10335                csb.OperationTimeout = receiveTimeout;
 10336                return new EventHubClientMock(this.partitionCount, csb, this.tag);
 337            }
 338        }
 339    }
 340}