|   |  | 1 |  | // Copyright (c) Microsoft Corporation. All rights reserved. | 
|   |  | 2 |  | // Licensed under the MIT License. | 
|   |  | 3 |  |  | 
|   |  | 4 |  | using System; | 
|   |  | 5 |  | using System.Collections.Generic; | 
|   |  | 6 |  | using System.Collections.Concurrent; | 
|   |  | 7 |  | using System.Threading; | 
|   |  | 8 |  | using System.Threading.Tasks; | 
|   |  | 9 |  |  | 
|   |  | 10 |  | namespace Azure.Messaging.ServiceBus.Primitives | 
|   |  | 11 |  | { | 
|   |  | 12 |  | #pragma warning disable CA1001 // Types that own disposable fields should be disposable | 
|   |  | 13 |  |     internal sealed class ConcurrentExpiringSet<TKey> | 
|   |  | 14 |  | #pragma warning restore CA1001 // Types that own disposable fields should be disposable | 
|   |  | 15 |  |     { | 
|   |  | 16 |  |         public readonly ConcurrentDictionary<TKey, DateTimeOffset> dictionary; | 
|   |  | 17 |  |  | 
|   |  | 18 |  |         public readonly ICollection<KeyValuePair<TKey, DateTimeOffset>> dictionaryAsCollection; | 
|   |  | 19 |  |  | 
|   | 26 | 20 |  |         public readonly CancellationTokenSource tokenSource = new CancellationTokenSource(); | 
|   |  | 21 |  |  | 
|   | 26 | 22 |  |         public volatile TaskCompletionSource<bool> cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCrea | 
|   |  | 23 |  |  | 
|   |  | 24 |  |         public int closeSignaled; | 
|   |  | 25 |  |  | 
|   |  | 26 |  |         public bool closed; | 
|   |  | 27 |  |  | 
|   | 0 | 28 |  |         public static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); | 
|   |  | 29 |  |  | 
|   | 26 | 30 |  |         public ConcurrentExpiringSet() | 
|   |  | 31 |  |         { | 
|   | 26 | 32 |  |             dictionary = new ConcurrentDictionary<TKey, DateTimeOffset>(); | 
|   | 26 | 33 |  |             dictionaryAsCollection = dictionary; | 
|   | 26 | 34 |  |             _ = CollectExpiredEntriesAsync(tokenSource.Token); | 
|   | 26 | 35 |  |         } | 
|   |  | 36 |  |  | 
|   |  | 37 |  |         public void AddOrUpdate(TKey key, DateTimeOffset expiration) | 
|   |  | 38 |  |         { | 
|   | 0 | 39 |  |             ThrowIfClosed(); | 
|   |  | 40 |  |  | 
|   | 0 | 41 |  |             dictionary[key] = expiration; | 
|   | 0 | 42 |  |             cleanupTaskCompletionSource.TrySetResult(true); | 
|   | 0 | 43 |  |         } | 
|   |  | 44 |  |  | 
|   |  | 45 |  |         public bool Contains(TKey key) | 
|   |  | 46 |  |         { | 
|   | 0 | 47 |  |             ThrowIfClosed(); | 
|   |  | 48 |  |  | 
|   | 0 | 49 |  |             return dictionary.TryGetValue(key, out var expiration) && expiration > DateTimeOffset.UtcNow; | 
|   |  | 50 |  |         } | 
|   |  | 51 |  |  | 
|   |  | 52 |  |         public void Close() | 
|   |  | 53 |  |         { | 
|   | 0 | 54 |  |             if (Interlocked.Exchange(ref closeSignaled, 1) != 0) | 
|   |  | 55 |  |             { | 
|   | 0 | 56 |  |                 return; | 
|   |  | 57 |  |             } | 
|   |  | 58 |  |  | 
|   | 0 | 59 |  |             closed = true; | 
|   |  | 60 |  |  | 
|   | 0 | 61 |  |             tokenSource.Cancel(); | 
|   | 0 | 62 |  |             cleanupTaskCompletionSource.TrySetCanceled(); | 
|   | 0 | 63 |  |             dictionary.Clear(); | 
|   | 0 | 64 |  |             tokenSource.Dispose(); | 
|   | 0 | 65 |  |         } | 
|   |  | 66 |  |  | 
|   |  | 67 |  |         public async Task CollectExpiredEntriesAsync(CancellationToken token) | 
|   |  | 68 |  |         { | 
|   | 26 | 69 |  |             while (!token.IsCancellationRequested) | 
|   |  | 70 |  |             { | 
|   |  | 71 |  |                 try | 
|   |  | 72 |  |                 { | 
|   | 26 | 73 |  |                     await cleanupTaskCompletionSource.Task.ConfigureAwait(false); | 
|   | 0 | 74 |  |                     await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); | 
|   | 0 | 75 |  |                 } | 
|   | 0 | 76 |  |                 catch (OperationCanceledException) | 
|   |  | 77 |  |                 { | 
|   | 0 | 78 |  |                     return; | 
|   |  | 79 |  |                 } | 
|   |  | 80 |  |  | 
|   | 0 | 81 |  |                 var isEmpty = true; | 
|   | 0 | 82 |  |                 var utcNow = DateTimeOffset.UtcNow; | 
|   | 0 | 83 |  |                 foreach (var kvp in dictionary) | 
|   |  | 84 |  |                 { | 
|   | 0 | 85 |  |                     isEmpty = false; | 
|   | 0 | 86 |  |                     var expiration = kvp.Value; | 
|   | 0 | 87 |  |                     if (utcNow > expiration) | 
|   |  | 88 |  |                     { | 
|   | 0 | 89 |  |                         dictionaryAsCollection.Remove(kvp); | 
|   |  | 90 |  |                     } | 
|   |  | 91 |  |                 } | 
|   |  | 92 |  |  | 
|   | 0 | 93 |  |                 if (isEmpty) | 
|   |  | 94 |  |                 { | 
|   | 0 | 95 |  |                     cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsy | 
|   |  | 96 |  |                 } | 
|   |  | 97 |  |             } | 
|   | 0 | 98 |  |         } | 
|   |  | 99 |  |  | 
|   |  | 100 |  |         public void ThrowIfClosed() | 
|   |  | 101 |  |         { | 
|   | 0 | 102 |  |             if (closed) | 
|   |  | 103 |  |             { | 
|   | 0 | 104 |  |                 throw new ObjectDisposedException($"ConcurrentExpiringSet has already been closed. Please create a new s | 
|   |  | 105 |  |             } | 
|   | 0 | 106 |  |         } | 
|   |  | 107 |  |     } | 
|   |  | 108 |  | } |