| | | 1 | | // Copyright (c) Microsoft. All rights reserved. |
| | | 2 | | // Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; |
| | | 3 | | |
| | | 4 | | namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor |
| | | 5 | | { |
| | | 6 | | using System.Threading; |
| | | 7 | | using System.Threading.Tasks; |
| | | 8 | | |
| | | 9 | | /// <summary> |
| | | 10 | | /// Passed to an event processor instance to describe the environment. |
| | | 11 | | /// </summary> |
| | | 12 | | public class PartitionContext |
| | | 13 | | { |
| | | 14 | | readonly private ICheckpointMananger checkpointMananger; |
| | | 15 | | |
| | | 16 | | /// <summary> |
| | | 17 | | /// Construct an instance. |
| | | 18 | | /// </summary> |
| | | 19 | | /// <param name="cancellationToken">CancellationToken that the event processor should respect. Same as token pas |
| | | 20 | | /// <param name="partitionId">Id of the partition for which the event processor is handling events.</param> |
| | | 21 | | /// <param name="eventHubPath">Name of the event hub which is the source of events.</param> |
| | | 22 | | /// <param name="consumerGroupName">Name of the consumer group on the event hub.</param> |
| | | 23 | | /// <param name="checkpointMananger">The checkpoint manager instance to use.</param> |
| | 8 | 24 | | public PartitionContext(CancellationToken cancellationToken, string partitionId, string eventHubPath, string con |
| | | 25 | | { |
| | 8 | 26 | | this.CancellationToken = cancellationToken; |
| | 8 | 27 | | this.PartitionId = partitionId; |
| | 8 | 28 | | this.EventHubPath = eventHubPath; |
| | 8 | 29 | | this.ConsumerGroupName = consumerGroupName; |
| | | 30 | | |
| | 8 | 31 | | this.RuntimeInformation = new ReceiverRuntimeInformation(this.PartitionId); |
| | | 32 | | |
| | 8 | 33 | | this.checkpointMananger = checkpointMananger; |
| | 8 | 34 | | } |
| | | 35 | | |
| | | 36 | | /// <summary> |
| | | 37 | | /// The event processor implementation should respect this CancellationToken. It is the same as the token passed |
| | | 38 | | /// in to IEventProcessor methods. It is here primarily for compatibility with Event Processor Host. |
| | | 39 | | /// </summary> |
| | 0 | 40 | | public CancellationToken CancellationToken { get; private set; } |
| | | 41 | | |
| | | 42 | | /// <summary> |
| | | 43 | | /// Name of the consumer group on the event hub. |
| | | 44 | | /// </summary> |
| | 250 | 45 | | public string ConsumerGroupName { get; private set; } |
| | | 46 | | |
| | | 47 | | /// <summary> |
| | | 48 | | /// Name of the event hub. |
| | | 49 | | /// </summary> |
| | 250 | 50 | | public string EventHubPath { get; private set; } |
| | | 51 | | |
| | | 52 | | /// <summary> |
| | | 53 | | /// Id of the partition. |
| | | 54 | | /// </summary> |
| | 274 | 55 | | public string PartitionId { get; private set; } |
| | | 56 | | |
| | | 57 | | /// <summary> |
| | | 58 | | /// Gets the approximate receiver runtime information for a logical partition of an Event Hub. |
| | | 59 | | /// To enable the setting, refer to <see cref="EventProcessorOptions.EnableReceiverRuntimeMetric"/> |
| | | 60 | | /// </summary> |
| | | 61 | | public ReceiverRuntimeInformation RuntimeInformation |
| | | 62 | | { |
| | 0 | 63 | | get; |
| | 8 | 64 | | internal set; |
| | | 65 | | } |
| | | 66 | | |
| | 0 | 67 | | internal string Offset { get; set; } |
| | | 68 | | |
| | 0 | 69 | | internal long SequenceNumber { get; set; } |
| | | 70 | | |
| | | 71 | | internal void SetOffsetAndSequenceNumber(EventData eventData) |
| | | 72 | | { |
| | 226 | 73 | | this.Offset = eventData.SystemProperties.Offset; |
| | 226 | 74 | | this.SequenceNumber = eventData.SystemProperties.SequenceNumber; |
| | 226 | 75 | | } |
| | | 76 | | |
| | | 77 | | /// <summary> |
| | | 78 | | /// Mark the last event of the current batch and all previous events as processed. |
| | | 79 | | /// </summary> |
| | | 80 | | /// <returns></returns> |
| | | 81 | | public async Task CheckpointAsync() |
| | | 82 | | { |
| | 0 | 83 | | await CheckpointAsync(new Checkpoint(this.Offset, this.SequenceNumber)).ConfigureAwait(false); |
| | 0 | 84 | | } |
| | | 85 | | |
| | | 86 | | /// <summary> |
| | | 87 | | /// Mark the given event and all previous events as processed. |
| | | 88 | | /// </summary> |
| | | 89 | | /// <param name="eventData">Highest-processed event.</param> |
| | | 90 | | /// <returns></returns> |
| | | 91 | | public async Task CheckpointAsync(EventData eventData) |
| | | 92 | | { |
| | 0 | 93 | | await CheckpointAsync(new Checkpoint(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceN |
| | 0 | 94 | | } |
| | | 95 | | |
| | | 96 | | private async Task CheckpointAsync(Checkpoint checkpoint) |
| | | 97 | | { |
| | 0 | 98 | | await this.checkpointMananger.UpdateCheckpointAsync(this.PartitionId, checkpoint, this.CancellationToken).Co |
| | 0 | 99 | | } |
| | | 100 | | } |
| | | 101 | | } |