EventContext.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventhubs.models;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import java.util.Objects;
import java.util.function.Consumer;
import reactor.core.publisher.Mono;
/**
* A class that contains {@link EventData} and the partition information the event belongs to. This is given to the
* {@link EventProcessorClientBuilder#processEvent(Consumer) processEvent} handler each time an event is received from
* the Event Hub. This class also includes methods to update checkpoint in {@link CheckpointStore} and retrieve the last
* enqueued event information.
*/
public class EventContext {
private final PartitionContext partitionContext;
private final EventData eventData;
private final CheckpointStore checkpointStore;
private final LastEnqueuedEventProperties lastEnqueuedEventProperties;
/**
* Creates an instance of {@link EventContext}.
*
* @param partitionContext The partition information associated with the received event.
* @param eventData The event received from Event Hub.
* @param checkpointStore The checkpoint store that is used for updating checkpoints.
* @param lastEnqueuedEventProperties The properties of the last enqueued event in this partition. If {@link
* EventProcessorClientBuilder#trackLastEnqueuedEventProperties(boolean)} is set to {@code false}, this will be
* {@code null}.
* @throws NullPointerException If {@code partitionContext}, {@code eventData} or {@code checkpointStore} is null.
*/
public EventContext(PartitionContext partitionContext, EventData eventData,
CheckpointStore checkpointStore, LastEnqueuedEventProperties lastEnqueuedEventProperties) {
this.partitionContext = Objects.requireNonNull(partitionContext, "'partitionContext' cannot be null.");
this.eventData = eventData;
this.checkpointStore = Objects.requireNonNull(checkpointStore, "'checkpointStore' cannot be null.");
this.lastEnqueuedEventProperties = lastEnqueuedEventProperties;
}
/**
* Returns the partition information associated with the received event.
*
* @return The partition information of the received event.
*/
public PartitionContext getPartitionContext() {
return partitionContext;
}
/**
* Returns the event data received from Event Hub.
*
* @return The event data received from Event Hub.
*/
public EventData getEventData() {
return eventData;
}
/**
* Returns the properties of the last enqueued event in this partition. If {@link
* EventProcessorClientBuilder#trackLastEnqueuedEventProperties(boolean)} is set to {@code false}, this method will
* return {@code null}.
*
* @return The properties of the last enqueued event in this partition. If
* {@link EventProcessorClientBuilder#trackLastEnqueuedEventProperties(boolean)} is
* set to {@code false}, this method will return {@code null}.
*/
public LastEnqueuedEventProperties getLastEnqueuedEventProperties() {
return lastEnqueuedEventProperties;
}
/**
* Updates the checkpoint asynchronously for this partition using the event data in this {@link EventContext}. This
* will serve as the last known successfully processed event in this partition if the update is successful.
*
* @return a representation of deferred execution of this call.
*/
public Mono<Void> updateCheckpointAsync() {
if (eventData == null) {
return Mono.empty();
}
Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace(partitionContext.getFullyQualifiedNamespace())
.setEventHubName(partitionContext.getEventHubName())
.setConsumerGroup(partitionContext.getConsumerGroup())
.setPartitionId(partitionContext.getPartitionId())
.setSequenceNumber(eventData.getSequenceNumber())
.setOffset(eventData.getOffset());
return this.checkpointStore.updateCheckpoint(checkpoint);
}
/**
* Updates the checkpoint synchronously for this partition using the event data. This will serve as the last known
* successfully processed event in this partition if the update is successful.
*/
public void updateCheckpoint() {
this.updateCheckpointAsync().block();
}
}