EventBatchContext.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.models;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import reactor.core.publisher.Mono;

/**
 * A class that contains a batch of {@link EventData} and the partition information the event batch belongs to. This is
 * given to the {@link EventProcessorClientBuilder#processEventBatch(Consumer, int) processEventBatch} handler each
 * time an event batch is received from the Event Hub. This class also includes methods to update checkpoint in
 * {@link CheckpointStore} and retrieve the last enqueued event information.
 *
 * @see EventProcessorClientBuilder#processEventBatch(Consumer, int)
 * @see EventProcessorClientBuilder#processEventBatch(Consumer, int, Duration)
 */
public class EventBatchContext {

    private final PartitionContext partitionContext;
    private final List<EventData> events;
    private final CheckpointStore checkpointStore;
    private final LastEnqueuedEventProperties lastEnqueuedEventProperties;

    /**
     * Creates an instance of {@link EventContext}.
     *
     * @param partitionContext The partition information associated with the received event.
     * @param events The list of events received from Event Hub.
     * @param checkpointStore The checkpoint store that is used for updating checkpoints.
     * @param lastEnqueuedEventProperties The properties of the last enqueued event in this partition. If {@link
     * EventProcessorClientBuilder#trackLastEnqueuedEventProperties(boolean)} is set to {@code false}, this will be
     * {@code null}.
     * @throws NullPointerException If {@code partitionContext}, {@code eventData} or {@code checkpointStore} is null.
     */
    public EventBatchContext(PartitionContext partitionContext, List<EventData> events,
        CheckpointStore checkpointStore, LastEnqueuedEventProperties lastEnqueuedEventProperties) {
        this.checkpointStore = Objects.requireNonNull(checkpointStore, "'checkpointStore' cannot be null.");
        this.events = Objects.requireNonNull(events, "'events' cannot be null.");
        this.partitionContext = Objects.requireNonNull(partitionContext, "'partitionContext' cannot be null.");

        this.lastEnqueuedEventProperties = lastEnqueuedEventProperties;
    }

    /**
     * Returns the partition information associated with the received event.
     *
     * @return The partition information of the received event.
     */
    public PartitionContext getPartitionContext() {
        return partitionContext;
    }

    /**
     * Returns a list of event data received from Event Hub.
     *
     * @return The list of event data received from Event Hub.
     */
    public List<EventData> getEvents() {
        return events;
    }

    /**
     * Returns the properties of the last enqueued event in this partition. If {@link
     * EventProcessorClientBuilder#trackLastEnqueuedEventProperties(boolean)} is set to {@code false} or if
     * {@link #getEvents()} is empty, this method will return {@code null}.
     *
     * @return The properties of the last enqueued event in this partition. If
     * {@link EventProcessorClientBuilder#trackLastEnqueuedEventProperties(boolean)} is
     * set to {@code false} or if {@link #getEvents()} is empty, this method will return {@code null}.
     */
    public LastEnqueuedEventProperties getLastEnqueuedEventProperties() {
        return lastEnqueuedEventProperties;
    }

    /**
     * Updates the checkpoint asynchronously for this partition using the last event in the list provided by
     * {@link #getEvents()}. This will serve as the last known successfully processed event in this partition
     * if the update is successful. If {@link #getEvents()} returns an empty, no update to checkpoint will be
     * done.
     *
     * @return Gets a {@link Mono} that completes when the checkpoint is updated.
     * @throws AmqpException if an error occurs when updating the checkpoint.
     */
    public Mono<Void> updateCheckpointAsync() {
        if (this.events.isEmpty()) {
            return Mono.empty();
        }

        // update checkpoint of the last event in the batch
        Checkpoint checkpoint = new Checkpoint()
            .setFullyQualifiedNamespace(partitionContext.getFullyQualifiedNamespace())
            .setEventHubName(partitionContext.getEventHubName())
            .setConsumerGroup(partitionContext.getConsumerGroup())
            .setPartitionId(partitionContext.getPartitionId())
            .setSequenceNumber(events.get(events.size() - 1).getSequenceNumber())
            .setOffset(events.get(events.size() - 1).getOffset());
        return this.checkpointStore.updateCheckpoint(checkpoint);
    }

    /**
     * Updates the checkpoint synchronously for this partition using the last event in the list provided by
     * {@link #getEvents()}. This will serve as the last known successfully processed event in this partition
     * if the update is successful. If {@link #getEvents()} returns an empty, no update to checkpoint will be
     * done.
     *
     * @throws AmqpException if an error occurs while updating the checkpoint.
     */
    public void updateCheckpoint() {
        this.updateCheckpointAsync().block();
    }
}