< Summary

Class:Microsoft.Azure.EventHubs.Processor.PartitionContext
Assembly:Microsoft.Azure.EventHubs.Processor
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\PartitionContext.cs
Covered lines:0
Uncovered lines:74
Coverable lines:74
Total lines:213
Line coverage:0% (0 of 74)
Covered branches:0
Total branches:12
Branch coverage:0% (0 of 12)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%100%
get_CancellationToken()-0%100%
get_ConsumerGroupName()-0%100%
get_EventHubPath()-0%100%
get_PartitionId()-0%100%
get_Owner()-0%100%
get_RuntimeInformation()-0%100%
get_Offset()-0%100%
get_SequenceNumber()-0%100%
get_Lease()-0%100%
get_ThisLock()-0%100%
SetOffsetAndSequenceNumber(...)-0%100%
GetInitialOffsetAsync()-0%0%
CheckpointAsync()-0%0%
CheckpointAsync(...)-0%0%
CheckpointAsync(...)-0%100%
ToString()-0%100%
PersistCheckpointAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Microsoft.Azure.EventHubs.Processor\src\PartitionContext.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.EventHubs.Processor
 5{
 6    using System;
 7    using System.Threading;
 8    using System.Threading.Tasks;
 9    using Microsoft.Azure.EventHubs.Primitives;
 10
 11    /// <summary>
 12    /// Encapsulates information related to an Event Hubs partition used by <see cref="IEventProcessor"/>.
 13    /// </summary>
 14    public class PartitionContext
 15    {
 16        readonly EventProcessorHost host;
 17
 18        /// Creates a new <see cref="PartitionContext"/> object.
 019        public PartitionContext(EventProcessorHost host, string partitionId, string eventHubPath, string consumerGroupNa
 20        {
 021            this.host = host;
 022            this.PartitionId = partitionId;
 023            this.EventHubPath = eventHubPath;
 024            this.ConsumerGroupName = consumerGroupName;
 025            this.CancellationToken = cancellationToken;
 026            this.ThisLock = new object();
 027            this.Offset = EventPosition.FromStart().Offset;
 028            this.SequenceNumber = 0;
 029            this.RuntimeInformation = new ReceiverRuntimeInformation(partitionId);
 030        }
 31
 32        /// <summary>
 33        /// Gets triggered when the partition gets closed.
 34        /// </summary>
 035        public CancellationToken CancellationToken { get; }
 36
 37        /// <summary>
 38        /// Gets the name of the consumer group.
 39        /// </summary>
 040        public string ConsumerGroupName { get; }
 41
 42        /// <summary>
 43        /// Gets the path of the event hub.
 44        /// </summary>
 045        public string EventHubPath { get; }
 46
 47        /// <summary>
 48        /// Gets the partition ID for the context.
 49        /// </summary>
 050        public string PartitionId { get; }
 51
 52        /// <summary>
 53        /// Gets the host owner for the partition.
 54        /// </summary>
 055        public string Owner => this.Lease.Owner;
 56
 57        /// <summary>
 58        /// Gets the approximate receiver runtime information for a logical partition of an Event Hub.
 59        /// To enable the setting, refer to <see cref="EventProcessorOptions.EnableReceiverRuntimeMetric"/>
 60        /// </summary>
 061        public ReceiverRuntimeInformation RuntimeInformation { get; }
 62
 063        internal string Offset { get; set; }
 64
 065        internal long SequenceNumber { get; set; }
 66
 67        /// <summary>
 68        /// Gets the most recent checkpointed lease.
 69        /// </summary>
 70        // Unlike other properties which are immutable after creation, the lease is updated dynamically and needs a sett
 071        public Lease Lease { get; internal set; }
 72
 073        object ThisLock { get; }
 74
 75        internal void SetOffsetAndSequenceNumber(EventData eventData)
 76        {
 077            Guard.ArgumentNotNull(nameof(eventData), eventData);
 78
 079            lock (this.ThisLock)
 80            {
 081                this.Offset = eventData.SystemProperties.Offset;
 082                this.SequenceNumber = eventData.SystemProperties.SequenceNumber;
 083            }
 084        }
 85
 86        internal async Task<EventPosition> GetInitialOffsetAsync() // throws InterruptedException, ExecutionException
 87        {
 088            Checkpoint startingCheckpoint = await this.host.CheckpointManager.GetCheckpointAsync(this.PartitionId).Confi
 89            EventPosition eventPosition;
 90
 091            if (startingCheckpoint == null)
 92            {
 93                // No checkpoint was ever stored. Use the initialOffsetProvider instead.
 094                ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, this.PartitionId, "Calling user-provided 
 095                eventPosition = this.host.EventProcessorOptions.InitialOffsetProvider(this.PartitionId);
 096                ProcessorEventSource.Log.PartitionPumpInfo(
 097                    this.host.HostName,
 098                    this.PartitionId,
 099                    $"Initial Position Provider. Offset:{eventPosition.Offset}, SequenceNumber:{eventPosition.SequenceNu
 100            }
 101            else
 102            {
 0103                this.Offset = startingCheckpoint.Offset;
 0104                this.SequenceNumber = startingCheckpoint.SequenceNumber;
 0105                ProcessorEventSource.Log.PartitionPumpInfo(
 0106                    this.host.HostName,
 0107                    this.PartitionId, $"Retrieved starting offset/sequenceNumber: {this.Offset}/{this.SequenceNumber}");
 0108                eventPosition = EventPosition.FromOffset(this.Offset);
 109            }
 110
 0111            return eventPosition;
 0112        }
 113
 114        /// <summary>
 115        /// Writes the current offset and sequenceNumber to the checkpoint store via the checkpoint manager.
 116        /// </summary>
 117        public Task CheckpointAsync()
 118        {
 119            // Capture the current offset and sequenceNumber. Synchronize to be sure we get a matched pair
 120            // instead of catching an update halfway through. Do the capturing here because by the time the checkpoint
 121            // task runs, the fields in this object may have changed, but we should only write to store what the user
 122            // has directed us to write.
 123            Checkpoint capturedCheckpoint;
 0124            lock(this.ThisLock)
 125            {
 126                // Don't checkpoint for start of stream so that we can still respect initial offset provider.
 0127                if (this.Offset == "-1")
 128                {
 0129                    return Task.CompletedTask;
 130                }
 131
 0132                capturedCheckpoint = new Checkpoint(this.PartitionId, this.Offset, this.SequenceNumber);
 0133            }
 134
 0135            return this.PersistCheckpointAsync(capturedCheckpoint);
 0136        }
 137
 138        /// <summary>
 139        /// Stores the offset and sequenceNumber from the provided received EventData instance, then writes those
 140        /// values to the checkpoint store via the checkpoint manager.
 141        /// </summary>
 142        /// <param name="eventData">A received EventData with valid offset and sequenceNumber</param>
 143        /// <exception cref="ArgumentNullException">If suplied eventData is null</exception>
 144        /// <exception cref="ArgumentOutOfRangeException">If the sequenceNumber is less than the last checkpointed value
 145        public Task CheckpointAsync(EventData eventData)
 146        {
 0147            Guard.ArgumentNotNull(nameof(eventData), eventData);
 148
 149            // We have never seen this sequence number yet
 0150            if (eventData.SystemProperties.SequenceNumber > this.SequenceNumber)
 151            {
 0152                throw new ArgumentOutOfRangeException("eventData.SystemProperties.SequenceNumber");
 153            }
 154
 0155            return this.PersistCheckpointAsync(new Checkpoint(this.PartitionId, eventData.SystemProperties.Offset, event
 156        }
 157
 158    /// <summary>
 159    /// Writes the current offset and sequenceNumber to the checkpoint store via the checkpoint manager.
 160    /// </summary>
 161    public Task CheckpointAsync(Checkpoint checkPoint)
 162    {
 0163      return this.PersistCheckpointAsync(checkPoint);
 164    }
 165
 166    /// <summary>
 167    /// Provides the parition context in the following format:"PartitionContext({EventHubPath}/{ConsumerGroupName}/{Part
 168    /// </summary>
 169    /// <returns></returns>
 170    public override string ToString()
 171        {
 0172            return $"PartitionContext({this.EventHubPath}/{this.ConsumerGroupName}/{this.PartitionId}/{this.SequenceNumb
 173        }
 174
 175        async Task PersistCheckpointAsync(Checkpoint checkpoint)
 176        {
 0177            ProcessorEventSource.Log.PartitionPumpCheckpointStart(this.host.HostName, checkpoint.PartitionId, checkpoint
 178            try
 179            {
 0180                Checkpoint inStoreCheckpoint = await this.host.CheckpointManager.GetCheckpointAsync(checkpoint.Partition
 0181                if (inStoreCheckpoint == null || checkpoint.SequenceNumber >= inStoreCheckpoint.SequenceNumber)
 182                {
 0183                    if (inStoreCheckpoint == null)
 184                    {
 0185                        await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).Confi
 186                    }
 187
 0188                    await this.host.CheckpointManager.UpdateCheckpointAsync(this.Lease, checkpoint).ConfigureAwait(false
 189
 190                    // Update internal lease if update call above is successful.
 0191                    this.Lease.Offset = checkpoint.Offset;
 0192                    this.Lease.SequenceNumber = checkpoint.SequenceNumber;
 193                }
 194                else
 195                {
 0196                    string msg = $"Ignoring out of date checkpoint with offset {checkpoint.Offset}/sequence number {chec
 0197                            $" because current persisted checkpoint has higher offset {inStoreCheckpoint.Offset}/sequenc
 0198                    ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, checkpoint.PartitionId, msg);
 0199                    throw new ArgumentOutOfRangeException("offset/sequenceNumber", msg);
 200                }
 0201            }
 0202            catch (Exception e)
 203            {
 0204                ProcessorEventSource.Log.PartitionPumpCheckpointError(this.host.HostName, checkpoint.PartitionId, e.ToSt
 0205                throw;
 206            }
 207            finally
 208            {
 0209                ProcessorEventSource.Log.PartitionPumpCheckpointStop(this.host.HostName, checkpoint.PartitionId);
 210            }
 0211        }
 212    }
 213}