< Summary

Class:Microsoft.Azure.EventHubs.ServiceFabricProcessor.ReliableDictionaryCheckpointMananger
Assembly:Microsoft.Azure.EventHubs.ServiceFabricProcessor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.ServiceFabricProcessor\src\ReliableDictionaryCheckpointMananger.cs
Covered lines:45
Uncovered lines:19
Coverable lines:64
Total lines:159
Line coverage:70.3% (45 of 64)
Covered branches:12
Total branches:14
Branch coverage:85.7% (12 of 14)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-100%100%
CheckpointStoreExistsAsync()-0%100%
CreateCheckpointStoreIfNotExistsAsync()-100%100%
CreateCheckpointIfNotExistsAsync()-100%100%
GetCheckpointAsync()-0%100%
UpdateCheckpointAsync()-0%100%
GetWithRetry()-79.17%87.5%
PutWithRetry()-70.59%75%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.ServiceFabricProcessor\src\ReliableDictionaryCheckpointMananger.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.using System;
 3
 4namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor
 5{
 6    using Microsoft.ServiceFabric.Data;
 7    using Microsoft.ServiceFabric.Data.Collections;
 8    using System;
 9    using System.Collections.Generic;
 10    using System.Threading;
 11    using System.Threading.Tasks;
 12
 13    class ReliableDictionaryCheckpointMananger : ICheckpointMananger
 14    {
 15        private IReliableStateManager reliableStateManager = null;
 16        private IReliableDictionary<string, Dictionary<string, object>> store = null;
 17
 1218        internal ReliableDictionaryCheckpointMananger(IReliableStateManager rsm)
 19        {
 1220            this.reliableStateManager = rsm;
 1221        }
 22
 23        public async Task<bool> CheckpointStoreExistsAsync(CancellationToken cancellationToken)
 24        {
 025            ConditionalValue<IReliableDictionary<string, Dictionary<string, object>>> tryStore = await
 026                this.reliableStateManager.TryGetAsync<IReliableDictionary<string, Dictionary<string, object>>>(Constants
 027            EventProcessorEventSource.Current.Message($"CheckpointStoreExistsAsync = {tryStore.HasValue}");
 028            return tryStore.HasValue;
 029        }
 30
 31        public async Task<bool> CreateCheckpointStoreIfNotExistsAsync(CancellationToken cancellationToken)
 32        {
 33            // Create or get access to the dictionary.
 834            this.store = await reliableStateManager.GetOrAddAsync<IReliableDictionary<string, Dictionary<string, object>
 835            EventProcessorEventSource.Current.Message("CreateCheckpointStoreIfNotExistsAsync OK");
 836            return true;
 837        }
 38
 39        public async Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId, CancellationToken cancellatio
 40        {
 841            Checkpoint existingCheckpoint = await GetWithRetry(partitionId, cancellationToken).ConfigureAwait(false);
 42
 843            if (existingCheckpoint == null)
 44            {
 645                existingCheckpoint = new Checkpoint(1);
 646                await PutWithRetry(partitionId, existingCheckpoint, cancellationToken).ConfigureAwait(false);
 47            }
 848            EventProcessorEventSource.Current.Message("CreateCheckpointIfNotExists OK");
 49
 850            return existingCheckpoint;
 851        }
 52
 53        public async Task<Checkpoint> GetCheckpointAsync(string partitionId, CancellationToken cancellationToken)
 54        {
 055            return await GetWithRetry(partitionId, cancellationToken).ConfigureAwait(false);
 056        }
 57
 58        public async Task UpdateCheckpointAsync(string partitionId, Checkpoint checkpoint, CancellationToken cancellatio
 59        {
 060            await PutWithRetry(partitionId, checkpoint, cancellationToken).ConfigureAwait(false);
 061        }
 62
 63        // Throws on error or if cancelled.
 64        // Returns null if there is no entry for the given partition.
 65        private async Task<Checkpoint> GetWithRetry(string partitionId, CancellationToken cancellationToken)
 66        {
 867            EventProcessorEventSource.Current.Message($"Getting checkpoint for {partitionId}");
 68
 869            Checkpoint result = null;
 870            Exception lastException = null;
 071            for (int i = 0; i < Constants.RetryCount; i++)
 72            {
 873                cancellationToken.ThrowIfCancellationRequested();
 874                lastException = null;
 75
 76                try
 77                {
 878                    using (ITransaction tx = this.reliableStateManager.CreateTransaction())
 79                    {
 880                        ConditionalValue<Dictionary<string, object>> rawCheckpoint = await
 881                            this.store.TryGetValueAsync(tx, partitionId, Constants.ReliableDictionaryTimeout, cancellati
 82
 883                        await tx.CommitAsync().ConfigureAwait(false);
 84
 85                        // Success! Save the result, if any, and break out of the retry loop.
 886                        if (rawCheckpoint.HasValue)
 87                        {
 288                            result = Checkpoint.CreateFromDictionary(rawCheckpoint.Value);
 89                        }
 90                        else
 91                        {
 692                            result = null;
 93                        }
 894                        break;
 95                    }
 96                }
 097                catch (TimeoutException e)
 98                {
 099                    lastException = e;
 0100                }
 101            }
 102
 8103            if (lastException != null)
 104            {
 105                // Ran out of retries, throw.
 0106                throw new Exception("Ran out of retries creating checkpoint", lastException);
 107            }
 108
 8109            if (result != null)
 110            {
 2111                EventProcessorEventSource.Current.Message($"Got checkpoint for {partitionId}: {result.Offset}//{result.S
 112            }
 113            else
 114            {
 6115                EventProcessorEventSource.Current.Message($"No checkpoint found for {partitionId}: returning null");
 116            }
 117
 8118            return result;
 8119        }
 120
 121        private async Task PutWithRetry(string partitionId, Checkpoint checkpoint, CancellationToken cancellationToken)
 122        {
 6123            EventProcessorEventSource.Current.Message($"Setting checkpoint for {partitionId}: {checkpoint.Offset}//{chec
 124
 6125            Exception lastException = null;
 0126            for (int i = 0; i < Constants.RetryCount; i++)
 127            {
 6128                cancellationToken.ThrowIfCancellationRequested();
 6129                lastException = null;
 130
 6131                Dictionary<string, object> putThis = checkpoint.ToDictionary();
 132
 133                try
 134                {
 6135                    using (ITransaction tx = this.reliableStateManager.CreateTransaction())
 136                    {
 6137                        await this.store.SetAsync(tx, partitionId, putThis, Constants.ReliableDictionaryTimeout, cancell
 6138                        await tx.CommitAsync().ConfigureAwait(false);
 139
 140                        // Success! Break out of the retry loop.
 6141                        break;
 142                    }
 143                }
 0144                catch (TimeoutException e)
 145                {
 0146                    lastException = e;
 0147                }
 148            }
 149
 6150            if (lastException != null)
 151            {
 152                // Ran out of retries, throw.
 0153                throw new Exception("Ran out of retries creating checkpoint", lastException);
 154            }
 155
 6156            EventProcessorEventSource.Current.Message($"Set checkpoint for {partitionId} OK");
 6157        }
 158    }
 159}