< Summary

Class:Azure.Messaging.EventHubs.Core.TransportProducerPool
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Core\TransportProducerPool.cs
Covered lines:117
Uncovered lines:7
Coverable lines:124
Total lines:355
Line coverage:94.3% (117 of 124)
Covered branches:22
Total branches:26
Branch coverage:84.6% (22 of 26)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-100%100%
get_Pool()-100%100%
get_EventHubProducer()-100%100%
get_Connection()-100%100%
get_RetryPolicy()-100%100%
get_ExpirationTimer()-100%100%
.ctor()-100%100%
.ctor(...)-100%100%
GetPooledProducer(...)-88.57%66.67%
CloseAsync()-81.82%100%
CreateExpirationTimerCallback()-100%100%
.cctor()-100%100%
get_PartitionProducer()-100%100%
get_PartitionId()-0%100%
get_ActiveInstances()-100%100%
get_RemoveAfter()-100%100%
ExtendRemoveAfter(...)-100%100%
.ctor(...)-100%100%
get_CleanUp()-100%100%
get_TransportProducer()-100%100%
.ctor(...)-100%100%
DisposeAsync()-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Core\TransportProducerPool.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Concurrent;
 6using System.Collections.Generic;
 7using System.Linq;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using Azure.Core;
 11
 12namespace Azure.Messaging.EventHubs.Core
 13{
 14    /// <summary>
 15    ///   A pool of <see cref="TransportProducer" /> instances that automatically expire after a period of inactivity.
 16    /// </summary>
 17    ///
 18    internal class TransportProducerPool
 19    {
 20        /// <summary>The period after which <see cref="CreateExpirationTimerCallback" /> is run.</summary>
 221        private static readonly TimeSpan DefaultPerformExpirationPeriod = TimeSpan.FromMinutes(10);
 22
 23        /// <summary>
 24        ///   The set of active Event Hub transport-specific producers specific to a given partition;
 25        ///   intended to perform delegated operations.
 26        /// </summary>
 27        ///
 8028        private ConcurrentDictionary<string, PoolItem> Pool { get; }
 29
 30        /// <summary>
 31        ///   An abstracted Event Hub transport-specific producer that is associated with the
 32        ///   Event Hub gateway rather than a specific partition; intended to perform delegated operations.
 33        /// </summary>
 34        ///
 5035        public TransportProducer EventHubProducer { get; }
 36
 37        /// <summary>
 38        ///   The active connection to the Azure Event Hubs service, enabling client communications for metadata
 39        ///   about the associated Event Hub and access to a transport-aware producer.
 40        /// </summary>
 41        ///
 1042        private EventHubConnection Connection { get; }
 43
 44        /// <summary>
 45        ///   The policy to use for determining retry behavior for when an operation fails.
 46        /// </summary>
 47        ///
 1048        private EventHubsRetryPolicy RetryPolicy { get; }
 49
 50        /// <summary>
 51        ///   A reference to a <see cref="Timer" /> periodically checking every <see cref="DefaultPerformExpirationPerio
 52        ///   the <see cref="TransportProducer" /> that are in use and those that can be closed.
 53        /// </summary>
 54        ///
 1055        private Timer ExpirationTimer { get; }
 56
 57        /// <summary>
 58        ///   Initializes a new instance of the <see cref="TransportProducerPool" /> class.
 59        /// </summary>
 60        ///
 261        internal TransportProducerPool()
 62        {
 263        }
 64
 65        /// <summary>
 66        ///   Initializes a new instance of the <see cref="TransportProducerPool" /> class.
 67        /// </summary>
 68        ///
 69        /// <param name="connection">The <see cref="EventHubConnection" /> connection to use for communication with the 
 70        /// <param name="retryPolicy">The policy to use for determining retry behavior for when an operation fails.</par
 71        /// <param name="pool">The pool of <see cref="PoolItem" /> that is going to be used to store the partition speci
 72        /// <param name="performExpirationPeriod">The period after which <see cref="CreateExpirationTimerCallback" /> is
 73        /// <param name="eventHubProducer">An abstracted Event Hub transport-specific producer that is associated with t
 74        ///
 13075        public TransportProducerPool(EventHubConnection connection,
 13076                                     EventHubsRetryPolicy retryPolicy,
 13077                                     ConcurrentDictionary<string, PoolItem> pool = default,
 13078                                     TimeSpan? performExpirationPeriod = default,
 13079                                     TransportProducer eventHubProducer = default)
 80        {
 13081            Connection = connection;
 13082            RetryPolicy = retryPolicy;
 13083            Pool = pool ?? new ConcurrentDictionary<string, PoolItem>();
 13084            performExpirationPeriod ??= DefaultPerformExpirationPeriod;
 13085            EventHubProducer = eventHubProducer ?? connection.CreateTransportProducer(null, retryPolicy);
 86
 13087            ExpirationTimer = new Timer(CreateExpirationTimerCallback(),
 13088                                        null,
 13089                                        performExpirationPeriod.Value,
 13090                                        performExpirationPeriod.Value);
 13091        }
 92
 93        /// <summary>
 94        ///   Retrieves a <see cref="PooledProducer" /> for the requested partition,
 95        ///   creating one if needed or extending the expiration for an existing instance.
 96        /// </summary>
 97        ///
 98        /// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
 99        /// <param name="removeAfterDuration">The period of inactivity after which a <see cref="PoolItem" /> will become
 100        ///
 101        /// <returns>A <see cref="PooledProducer" /> mapping to the partition id passed in as parameter.</returns>
 102        ///
 103        /// <remarks>
 104        ///   There is a slight probability that the returned producer may be closed at any time
 105        ///   after it is returned and the caller may want to handle that scenario.
 106        /// </remarks>
 107        ///
 108        public virtual PooledProducer GetPooledProducer(string partitionId,
 109                                                        TimeSpan? removeAfterDuration = default)
 110        {
 44111            if (string.IsNullOrEmpty(partitionId))
 112            {
 28113                return new PooledProducer(EventHubProducer);
 114            }
 115
 16116            var identifier = Guid.NewGuid().ToString();
 117
 26118            var item = Pool.GetOrAdd(partitionId, id => new PoolItem(partitionId, Connection.CreateTransportProducer(id,
 119
 120            // A race condition at this point may end with CloseAsync called on
 121            // the returned PoolItem if it had expired. The probability is very low and
 122            // possible exceptions should be handled by the invoking methods.
 123
 16124            if (item.PartitionProducer.IsClosed || !item.ActiveInstances.TryAdd(identifier, 0))
 125            {
 0126                identifier = Guid.NewGuid().ToString();
 0127                item = Pool.GetOrAdd(partitionId, id => new PoolItem(partitionId, Connection.CreateTransportProducer(id,
 0128                item.ActiveInstances.TryAdd(identifier, 0);
 129            }
 130
 16131            item.ExtendRemoveAfter(removeAfterDuration);
 132
 16133            return new PooledProducer(item.PartitionProducer, cleanUp: producer =>
 16134            {
 28135                Argument.AssertNotNull(producer, nameof(producer));
 16136
 28137                if (Pool.TryGetValue(partitionId, out PoolItem pooledItem))
 16138                {
 28139                    pooledItem.ExtendRemoveAfter(removeAfterDuration);
 16140                }
 16141
 16142                // If TryRemove returned false the PoolItem would not be closed deterministically
 16143                // and the ExpirationTimer callback would eventually remove it from the
 16144                // Pool leaving to the Garbage Collector the responsability of closing
 16145                // the TransportProducer and the AMQP link.
 16146
 28147                item.ActiveInstances.TryRemove(identifier, out _);
 16148
 16149                // The second TryGetValue runs after the extension would have been seen, so it
 16150                // is intended to be sure that the item wasn't removed in the meantime.
 16151
 28152                if (!Pool.TryGetValue(partitionId, out _) && !item.ActiveInstances.Any())
 16153                {
 0154                    return producer.CloseAsync(CancellationToken.None);
 16155                }
 16156
 28157                return Task.CompletedTask;
 16158            });
 159        }
 160
 161        /// <summary>
 162        ///   Closes the producers in the pool and performs any cleanup necessary
 163        ///   for resources used by the <see cref="TransportProducerPool" />.
 164        /// </summary>
 165        ///
 166        /// <returns>A task to be resolved on when the operation has completed.</returns>
 167        ///
 168        public virtual async Task CloseAsync(CancellationToken cancellationToken = default)
 169        {
 170            try
 171            {
 10172                ExpirationTimer.Dispose();
 10173            }
 0174            catch (Exception)
 175            {
 0176            }
 177
 10178            var pendingCloses = new List<Task>();
 179
 10180            pendingCloses.Add(EventHubProducer.CloseAsync(cancellationToken));
 181
 32182            foreach (var poolItem in Pool.Values)
 183            {
 6184                pendingCloses.Add(poolItem.PartitionProducer.CloseAsync(cancellationToken));
 185            }
 186
 10187            Pool.Clear();
 188
 10189            await Task.WhenAll(pendingCloses).ConfigureAwait(false);
 6190        }
 191
 192        /// <summary>
 193        ///   Returns a <see cref="TimerCallback" /> that will search for all the expired <see cref="PoolItem" />
 194        ///   in the <see cref="Pool" /> and will try to close those that have expired.
 195        /// </summary>
 196        ///
 197        /// <returns>A <see cref="TimerCallback" /> that is periodically run every <see cref="DefaultPerformExpirationPe
 198        ///
 199        private TimerCallback CreateExpirationTimerCallback()
 200        {
 136201            return _ =>
 136202            {
 136203                // Capture the timestamp to use a consistent value.
 142204                var now = DateTimeOffset.UtcNow;
 136205
 168206                foreach (var key in Pool.Keys.ToList())
 136207                {
 146208                    if (Pool.TryGetValue(key, out var poolItem))
 136209                    {
 146210                        if (poolItem.RemoveAfter <= now)
 136211                        {
 140212                            if (Pool.TryRemove(key, out var _) && !poolItem.ActiveInstances.Any())
 136213                            {
 136214                                // At this point the pool item may have been closed already
 136215                                // if there was a context switch between the if conditions
 136216                                // and the pool item clean up kicked in.
 136217
 136218#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi
 140219                                poolItem.PartitionProducer.CloseAsync(CancellationToken.None).GetAwaiter().GetResult();
 136220#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi
 136221                            }
 136222                        }
 136223                    }
 136224                }
 142225            };
 226        }
 227
 228        /// <summary>
 229        ///   An item of the pool, adding tracking information to a <see cref="TransportProducer" />.
 230        /// </summary>
 231        ///
 232        internal class PoolItem
 233        {
 234            /// <summary>The period of inactivity after which an item would be removed from the pool.</summary>
 2235            internal static readonly TimeSpan DefaultRemoveAfterDuration = TimeSpan.FromMinutes(10);
 236
 237            /// <summary>
 238            ///   An abstracted Event Hub transport-specific <see cref="TransportProducer" /> that is associated with a 
 239            /// </summary>
 240            ///
 74241            public TransportProducer PartitionProducer { get; private set; }
 242
 243            /// <summary>
 244            ///   The unique identifier of a partition associated with the Event Hub.
 245            /// </summary>
 246            ///
 0247            public string PartitionId { get; private set; }
 248
 249            /// <summary>
 250            ///   A set of unique identifiers used to track which instances of a <see cref="PoolItem" /> are active.
 251            /// </summary>
 252            ///
 68253            public ConcurrentDictionary<string, byte> ActiveInstances { get; } = new ConcurrentDictionary<string, byte>(
 254
 255            /// <summary>
 256            ///   The UTC date and time when a <see cref="PoolItem" /> will become eligible for eviction.
 257            /// </summary>
 258            ///
 72259            public DateTimeOffset RemoveAfter { get; set; }
 260
 261            /// <summary>
 262            ///   Extends the UTC date and time when <see cref="PoolItem" /> will become eligible for eviction.
 263            /// </summary>
 264            ///
 265            /// <param name="removeAfterDuration">The period of inactivity after which a <see cref="PoolItem" /> will be
 266            ///
 267            public void ExtendRemoveAfter(TimeSpan? removeAfterDuration)
 268            {
 54269                RemoveAfter = DateTimeOffset.UtcNow.Add(removeAfterDuration ?? DefaultRemoveAfterDuration);
 54270            }
 271
 272            /// <summary>
 273            ///   Initializes a new instance of the <see cref="PoolItem" /> class with a default timespan of <see cref="
 274            /// </summary>
 275            ///
 276            /// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
 277            /// <param name="partitionProducer">An Event Hub transport-specific producer specific to a given partition.<
 278            /// <param name="removeAfterDuration">The interval after which a <see cref="PoolItem" /> will become eligibl
 279            /// <param name="removeAfter">The UTC date and time when a <see cref="PoolItem" /> will become eligible for 
 280            ///
 32281            public PoolItem(string partitionId,
 32282                            TransportProducer partitionProducer,
 32283                            TimeSpan? removeAfterDuration = default,
 32284                            DateTimeOffset? removeAfter = default)
 285            {
 32286                Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
 32287                Argument.AssertNotNull(partitionProducer, nameof(partitionProducer));
 288
 32289                PartitionProducer = partitionProducer;
 32290                PartitionId = partitionId;
 291
 32292                if (removeAfter == default)
 293                {
 26294                    ExtendRemoveAfter(removeAfterDuration);
 295                }
 296                else
 297                {
 6298                    RemoveAfter = removeAfter.Value;
 299                }
 6300            }
 301        }
 302
 303        /// <summary>
 304        ///   A class wrapping a <see cref="Core.TransportProducer" />, triggering a clean-up when the object is dispose
 305        /// </summary>
 306        ///
 307        internal class PooledProducer: IAsyncDisposable
 308        {
 309            /// <summary>
 310            ///   A function responsible of cleaning up the resources in use.
 311            /// </summary>
 312            ///
 48313            private Func<TransportProducer, Task> CleanUp { get; }
 314
 315            /// <summary>
 316            ///   An abstracted Event Hub transport-specific producer that is associated with the
 317            ///   Event Hub gateway or a specific partition.
 318            /// </summary>
 319            ///
 110320            public TransportProducer TransportProducer { get; }
 321
 322            /// <summary>
 323            ///   Initializes a new instance of the <see cref="PooledProducer" /> class.
 324            /// </summary>
 325            ///
 326            /// <param name="transportProducer">An abstracted Event Hub transport-specific producer that is associated w
 327            /// <param name="cleanUp">The function responsible of cleaning up the resources in use.</param>
 328            ///
 80329            public PooledProducer(TransportProducer transportProducer,
 80330                                  Func<TransportProducer, Task> cleanUp = default)
 331            {
 80332                Argument.AssertNotNull(transportProducer, nameof(transportProducer));
 333
 80334                TransportProducer = transportProducer;
 80335                CleanUp = cleanUp;
 80336            }
 337
 338            /// <summary>
 339            ///   Performs the task needed to clean up resources used by the <see cref="PooledProducer" />.
 340            /// </summary>
 341            ///
 342            /// <returns>A task to be resolved on when the operation has completed.</returns>
 343            ///
 344            public virtual ValueTask DisposeAsync()
 345            {
 36346                if (CleanUp != default)
 347                {
 12348                    return new ValueTask(CleanUp(TransportProducer));
 349                }
 350
 24351                return new ValueTask(Task.CompletedTask);
 352            }
 353        }
 354    }
 355}