< Summary

Class:Azure.Messaging.ServiceBus.Primitives.ConcurrentExpiringSet`1
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Primitives\ConcurrentExpiringSet.cs
Covered lines:9
Uncovered lines:32
Coverable lines:41
Total lines:108
Line coverage:21.9% (9 of 41)
Covered branches:2
Total branches:14
Branch coverage:14.2% (2 of 14)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor()-87.5%100%
.cctor()-0%100%
AddOrUpdate(...)-0%100%
Contains(...)-0%0%
Close()-0%0%
CollectExpiredEntriesAsync()-12.5%25%
ThrowIfClosed()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Primitives\ConcurrentExpiringSet.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Collections.Concurrent;
 7using System.Threading;
 8using System.Threading.Tasks;
 9
 10namespace Azure.Messaging.ServiceBus.Primitives
 11{
 12#pragma warning disable CA1001 // Types that own disposable fields should be disposable
 13    internal sealed class ConcurrentExpiringSet<TKey>
 14#pragma warning restore CA1001 // Types that own disposable fields should be disposable
 15    {
 16        public readonly ConcurrentDictionary<TKey, DateTimeOffset> dictionary;
 17
 18        public readonly ICollection<KeyValuePair<TKey, DateTimeOffset>> dictionaryAsCollection;
 19
 2620        public readonly CancellationTokenSource tokenSource = new CancellationTokenSource();
 21
 2622        public volatile TaskCompletionSource<bool> cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCrea
 23
 24        public int closeSignaled;
 25
 26        public bool closed;
 27
 028        public static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30);
 29
 2630        public ConcurrentExpiringSet()
 31        {
 2632            dictionary = new ConcurrentDictionary<TKey, DateTimeOffset>();
 2633            dictionaryAsCollection = dictionary;
 2634            _ = CollectExpiredEntriesAsync(tokenSource.Token);
 2635        }
 36
 37        public void AddOrUpdate(TKey key, DateTimeOffset expiration)
 38        {
 039            ThrowIfClosed();
 40
 041            dictionary[key] = expiration;
 042            cleanupTaskCompletionSource.TrySetResult(true);
 043        }
 44
 45        public bool Contains(TKey key)
 46        {
 047            ThrowIfClosed();
 48
 049            return dictionary.TryGetValue(key, out var expiration) && expiration > DateTimeOffset.UtcNow;
 50        }
 51
 52        public void Close()
 53        {
 054            if (Interlocked.Exchange(ref closeSignaled, 1) != 0)
 55            {
 056                return;
 57            }
 58
 059            closed = true;
 60
 061            tokenSource.Cancel();
 062            cleanupTaskCompletionSource.TrySetCanceled();
 063            dictionary.Clear();
 064            tokenSource.Dispose();
 065        }
 66
 67        public async Task CollectExpiredEntriesAsync(CancellationToken token)
 68        {
 2669            while (!token.IsCancellationRequested)
 70            {
 71                try
 72                {
 2673                    await cleanupTaskCompletionSource.Task.ConfigureAwait(false);
 074                    await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false);
 075                }
 076                catch (OperationCanceledException)
 77                {
 078                    return;
 79                }
 80
 081                var isEmpty = true;
 082                var utcNow = DateTimeOffset.UtcNow;
 083                foreach (var kvp in dictionary)
 84                {
 085                    isEmpty = false;
 086                    var expiration = kvp.Value;
 087                    if (utcNow > expiration)
 88                    {
 089                        dictionaryAsCollection.Remove(kvp);
 90                    }
 91                }
 92
 093                if (isEmpty)
 94                {
 095                    cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsy
 96                }
 97            }
 098        }
 99
 100        public void ThrowIfClosed()
 101        {
 0102            if (closed)
 103            {
 0104                throw new ObjectDisposedException($"ConcurrentExpiringSet has already been closed. Please create a new s
 105            }
 0106        }
 107    }
 108}