| | 1 | | // Copyright (c) Microsoft. All rights reserved. |
| | 2 | | // Licensed under the MIT license. See LICENSE file in the project root for full license information.using System; |
| | 3 | |
|
| | 4 | | namespace Microsoft.Azure.EventHubs.ServiceFabricProcessor |
| | 5 | | { |
| | 6 | | using System.Threading; |
| | 7 | | using System.Threading.Tasks; |
| | 8 | |
|
| | 9 | | /// <summary> |
| | 10 | | /// Passed to an event processor instance to describe the environment. |
| | 11 | | /// </summary> |
| | 12 | | public class PartitionContext |
| | 13 | | { |
| | 14 | | readonly private ICheckpointMananger checkpointMananger; |
| | 15 | |
|
| | 16 | | /// <summary> |
| | 17 | | /// Construct an instance. |
| | 18 | | /// </summary> |
| | 19 | | /// <param name="cancellationToken">CancellationToken that the event processor should respect. Same as token pas |
| | 20 | | /// <param name="partitionId">Id of the partition for which the event processor is handling events.</param> |
| | 21 | | /// <param name="eventHubPath">Name of the event hub which is the source of events.</param> |
| | 22 | | /// <param name="consumerGroupName">Name of the consumer group on the event hub.</param> |
| | 23 | | /// <param name="checkpointMananger">The checkpoint manager instance to use.</param> |
| 8 | 24 | | public PartitionContext(CancellationToken cancellationToken, string partitionId, string eventHubPath, string con |
| | 25 | | { |
| 8 | 26 | | this.CancellationToken = cancellationToken; |
| 8 | 27 | | this.PartitionId = partitionId; |
| 8 | 28 | | this.EventHubPath = eventHubPath; |
| 8 | 29 | | this.ConsumerGroupName = consumerGroupName; |
| | 30 | |
|
| 8 | 31 | | this.RuntimeInformation = new ReceiverRuntimeInformation(this.PartitionId); |
| | 32 | |
|
| 8 | 33 | | this.checkpointMananger = checkpointMananger; |
| 8 | 34 | | } |
| | 35 | |
|
| | 36 | | /// <summary> |
| | 37 | | /// The event processor implementation should respect this CancellationToken. It is the same as the token passed |
| | 38 | | /// in to IEventProcessor methods. It is here primarily for compatibility with Event Processor Host. |
| | 39 | | /// </summary> |
| 0 | 40 | | public CancellationToken CancellationToken { get; private set; } |
| | 41 | |
|
| | 42 | | /// <summary> |
| | 43 | | /// Name of the consumer group on the event hub. |
| | 44 | | /// </summary> |
| 250 | 45 | | public string ConsumerGroupName { get; private set; } |
| | 46 | |
|
| | 47 | | /// <summary> |
| | 48 | | /// Name of the event hub. |
| | 49 | | /// </summary> |
| 250 | 50 | | public string EventHubPath { get; private set; } |
| | 51 | |
|
| | 52 | | /// <summary> |
| | 53 | | /// Id of the partition. |
| | 54 | | /// </summary> |
| 274 | 55 | | public string PartitionId { get; private set; } |
| | 56 | |
|
| | 57 | | /// <summary> |
| | 58 | | /// Gets the approximate receiver runtime information for a logical partition of an Event Hub. |
| | 59 | | /// To enable the setting, refer to <see cref="EventProcessorOptions.EnableReceiverRuntimeMetric"/> |
| | 60 | | /// </summary> |
| | 61 | | public ReceiverRuntimeInformation RuntimeInformation |
| | 62 | | { |
| 0 | 63 | | get; |
| 8 | 64 | | internal set; |
| | 65 | | } |
| | 66 | |
|
| 0 | 67 | | internal string Offset { get; set; } |
| | 68 | |
|
| 0 | 69 | | internal long SequenceNumber { get; set; } |
| | 70 | |
|
| | 71 | | internal void SetOffsetAndSequenceNumber(EventData eventData) |
| | 72 | | { |
| 226 | 73 | | this.Offset = eventData.SystemProperties.Offset; |
| 226 | 74 | | this.SequenceNumber = eventData.SystemProperties.SequenceNumber; |
| 226 | 75 | | } |
| | 76 | |
|
| | 77 | | /// <summary> |
| | 78 | | /// Mark the last event of the current batch and all previous events as processed. |
| | 79 | | /// </summary> |
| | 80 | | /// <returns></returns> |
| | 81 | | public async Task CheckpointAsync() |
| | 82 | | { |
| 0 | 83 | | await CheckpointAsync(new Checkpoint(this.Offset, this.SequenceNumber)).ConfigureAwait(false); |
| 0 | 84 | | } |
| | 85 | |
|
| | 86 | | /// <summary> |
| | 87 | | /// Mark the given event and all previous events as processed. |
| | 88 | | /// </summary> |
| | 89 | | /// <param name="eventData">Highest-processed event.</param> |
| | 90 | | /// <returns></returns> |
| | 91 | | public async Task CheckpointAsync(EventData eventData) |
| | 92 | | { |
| 0 | 93 | | await CheckpointAsync(new Checkpoint(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceN |
| 0 | 94 | | } |
| | 95 | |
|
| | 96 | | private async Task CheckpointAsync(Checkpoint checkpoint) |
| | 97 | | { |
| 0 | 98 | | await this.checkpointMananger.UpdateCheckpointAsync(this.PartitionId, checkpoint, this.CancellationToken).Co |
| 0 | 99 | | } |
| | 100 | | } |
| | 101 | | } |