PartitionProcessor.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventhubs.implementation;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import java.util.function.Consumer;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;
/**
* An abstract class defining all the operations that a partition processor can perform. Users of {@link
* EventProcessorClient} should extend from this class and implement {@link #processEvent(EventContext)} for
* processing events. Additionally, users can override:
* <ul>
* <li>{@link #initialize(InitializationContext)} - This method is called before at the beginning of processing a
* partition.</li>
* <li>{@link #processError(ErrorContext)} - This method is called if there is an error while
* processing events</li>
* <li>{@link #close(CloseContext)} - This method is called at the end of processing a partition.
* The {@link CloseReason} specifies why the processing of a partition stopped.</li>
* </ul>
* <p>
* An instance of partition processor will process events from a single partition only.
* </p>
* <p>Implementations of this abstract class also have the responsibility of updating checkpoints when appropriate.</p>
*/
public abstract class PartitionProcessor {
private final ClientLogger logger = new ClientLogger(PartitionProcessor.class);
/**
* This method is called when this {@link EventProcessorClient} takes ownership of a new partition and before any
* events from this partition are received.
*
* @param initializationContext The initialization context before events from the partition are processed.
*/
public void initialize(InitializationContext initializationContext) {
logger.atInfo()
.addKeyValue(PARTITION_ID_KEY, initializationContext.getPartitionContext().getPartitionId())
.log("Initializing partition processor for partition");
}
/**
* This method is called when a new event is received for this partition.
*
* @param eventContext The partition information and the next event data from this partition.
*/
public abstract void processEvent(EventContext eventContext);
/**
* This method is called when a batch of events is received for this partition. To receive events in batches,
* {@link EventProcessorClientBuilder#processEventBatch(Consumer, int) processEventBatch} has to be
* setup when creating {@link EventProcessorClient} instance.
*
* @param eventBatchContext The event batch context containing the batch of events along with partition information.
*/
public void processEventBatch(EventBatchContext eventBatchContext) {
throw logger.logExceptionAsError(new UnsupportedOperationException("Processing event batch not implemented"));
}
/**
* This method is called when an error occurs while receiving events from Event Hub. An error also marks the end of
* event data stream.
*
* @param errorContext The error details and partition information where the error occurred.
*/
public abstract void processError(ErrorContext errorContext);
/**
* This method is called before the partition processor is closed. A partition processor could be closed for various
* reasons and the reasons and implementations of this interface can take appropriate actions to cleanup before the
* partition processor is shutdown.
*
* @param closeContext Contains the reason for closing and the partition information for which the processing of
* events is closed.
*/
public void close(CloseContext closeContext) {
logger.atInfo()
.addKeyValue(PARTITION_ID_KEY, closeContext.getPartitionContext().getPartitionId())
.log("Closing partition processor with close reason {}", closeContext.getCloseReason());
}
}