SearchIndexingBufferedSenderOptions.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.search.documents;

import com.azure.core.util.logging.ClientLogger;
import com.azure.search.documents.models.IndexAction;

import java.time.Duration;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

/**
 * Configuration options used when constructing a {@link SearchIndexingBufferedSender} or {@link
 * SearchIndexingBufferedAsyncSender}.
 *
 * @see SearchIndexingBufferedSender
 * @see SearchIndexingBufferedAsyncSender
 */
public final class SearchIndexingBufferedSenderOptions<T> {
    private static final boolean DEFAULT_AUTO_FLUSH = true;
    private static final int DEFAULT_INITIAL_BATCH_ACTION_COUNT = 512;
    private static final Duration DEFAULT_FLUSH_WINDOW = Duration.ofSeconds(60);
    private static final int DEFAULT_MAX_RETRIES = 3;
    private static final Duration DEFAULT_RETRY_DELAY = Duration.ofMillis(800);
    private static final Duration DEFAULT_MAX_RETRY_DELAY = Duration.ofMinutes(1);

    private final ClientLogger logger = new ClientLogger(SearchIndexingBufferedSenderOptions.class);

    private boolean autoFlush = DEFAULT_AUTO_FLUSH;
    private Duration autoFlushWindow = DEFAULT_FLUSH_WINDOW;
    private int initialBatchActionCount = DEFAULT_INITIAL_BATCH_ACTION_COUNT;
    private int maxRetries = DEFAULT_MAX_RETRIES;
    private Duration retryDelay = DEFAULT_RETRY_DELAY;
    private Duration maxRetryDelay = DEFAULT_MAX_RETRY_DELAY;

    private Consumer<IndexAction<T>> onActionAddedConsumer;
    private Consumer<IndexAction<T>> onActionSucceededConsumer;
    private BiConsumer<IndexAction<T>, Throwable> onActionErrorBiConsumer;
    private Consumer<IndexAction<T>> onActionSentConsumer;

    private Function<T, String> documentKeyRetriever;

    /**
     * Sets the flag determining whether a buffered sender will automatically flush its document batch based on the
     * configurations of {@link #setAutoFlushWindow(Duration)} and {@link #setInitialBatchActionCount(int)}.
     *
     * @param autoFlush Flag determining whether a buffered sender will automatically flush.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     */
    public SearchIndexingBufferedSenderOptions<T> setAutoFlush(boolean autoFlush) {
        this.autoFlush = autoFlush;
        return this;
    }

    /**
     * Gets the flag that indicates whether the buffered sender will be configured to automatically flush.
     *
     * @return Flag indicating if the buffered sender will automatically flush.
     */
    public boolean getAutoFlush() {
        return autoFlush;
    }

    /**
     * Sets the duration between a buffered sender sending documents to be indexed.
     * <p>
     * The buffered sender will reset the duration when documents are sent for indexing, either by reaching {@link
     * #setInitialBatchActionCount(int)} or by a manual trigger.
     * <p>
     * If {@code flushWindow} is negative or zero and {@link #setAutoFlush(boolean)} is enabled the buffered sender will
     * only flush when {@link #setInitialBatchActionCount(int)} is met.
     *
     * @param autoFlushWindow Duration between document batches being sent for indexing.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     * @throws NullPointerException If {@code autoFlushWindow} is null.
     */
    public SearchIndexingBufferedSenderOptions<T> setAutoFlushWindow(Duration autoFlushWindow) {
        Objects.requireNonNull(autoFlushWindow, "'autoFlushWindow' cannot be null.");

        this.autoFlushWindow = autoFlushWindow;
        return this;
    }

    /**
     * Gets the {@link Duration} that the buffered sender will wait between sending documents to be indexed.
     * <p>
     * The buffered sender will reset the duration when documents are sent for indexing, either by reaching {@link
     * #setInitialBatchActionCount(int)} or by a manual trigger.
     * <p>
     * If the duration is less than or equal to zero the buffered sender will only flush when {@link
     * #getInitialBatchActionCount()} is triggered.
     * <p>
     * This configuration is only taken into account if {@link #getAutoFlush()} is true.
     *
     * @return The {@link Duration} to wait after the last document has been added to the batch before the batch is
     * flushed.
     */
    public Duration getAutoFlushWindow() {
        return autoFlushWindow;
    }

    /**
     * Sets the number of documents before a buffered sender will send the batch to be indexed.
     * <p>
     * This will only trigger a batch to be sent automatically if {@link #autoFlushWindow} is configured. Default value
     * is {@code 512}.
     *
     * @param initialBatchActionCount The number of documents in a batch that will trigger it to be indexed.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     * @throws IllegalArgumentException If {@code batchSize} is less than one.
     */
    public SearchIndexingBufferedSenderOptions<T> setInitialBatchActionCount(int initialBatchActionCount) {
        if (initialBatchActionCount < 1) {
            throw logger.logExceptionAsError(new IllegalArgumentException("'batchSize' cannot be less than one."));
        }

        this.initialBatchActionCount = initialBatchActionCount;
        return this;
    }

    /**
     * Gets the number of documents required in a batch for it to be flushed.
     * <p>
     * This configuration is only taken into account if {@link #getAutoFlush()} is true.
     *
     * @return The number of documents required before a flush is triggered.
     */
    public int getInitialBatchActionCount() {
        return initialBatchActionCount;
    }

    /**
     * Sets the number of times a document will retry indexing before it is considered failed.
     * <p>
     * Documents are only retried on retryable status codes.
     * <p>
     * Default value is {@code 3}.
     *
     * @param maxRetries The number of times a document will retry indexing before it is considered failed.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     * @throws IllegalArgumentException If {@code documentTryLimit} is less than one.
     */
    public SearchIndexingBufferedSenderOptions<T> setMaxRetries(int maxRetries) {
        if (maxRetries < 1) {
            throw logger.logExceptionAsError(
                new IllegalArgumentException("'maxRetries' cannot be less than one."));
        }

        this.maxRetries = maxRetries;
        return this;
    }

    /**
     * Gets the number of times a document will retry indexing before it is considered failed.
     *
     * @return The number of times a document will attempt indexing.
     */
    public int getMaxRetries() {
        return maxRetries;
    }

    /**
     * Sets the initial duration that requests will be delayed when the service is throttling.
     * <p>
     * Default value is {@code Duration.ofMillis(800)}.
     *
     * @param retryDelay The initial duration requests will delay when the service is throttling.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     * @throws IllegalArgumentException If {@code retryDelay.isNegative()} or {@code retryDelay.isZero()} is true.
     * @throws NullPointerException If {@code retryDelay} is null.
     */
    public SearchIndexingBufferedSenderOptions<T> setRetryDelay(Duration retryDelay) {
        Objects.requireNonNull(retryDelay, "'retryDelay' cannot be null.");

        if (retryDelay.isNegative() || retryDelay.isZero()) {
            throw logger.logExceptionAsError(new IllegalArgumentException("'retryDelay' cannot be negative or zero."));
        }

        this.retryDelay = retryDelay;
        return this;
    }

    /**
     * Gets the initial duration that requests will be delayed when the service is throttling.
     *
     * @return The initial duration requests will delay when the service is throttling.
     */
    public Duration getRetryDelay() {
        return retryDelay;
    }

    /**
     * Sets the maximum duration that requests will be delayed when the service is throttling.
     * <p>
     * If {@code maxRetryDelay} is less than {@link #getRetryDelay()} then {@link #getRetryDelay()} will be used as the
     * maximum delay.
     * <p>
     * Default value is {@code Duration.ofMinutes(1)}.
     *
     * @param maxRetryDelay The maximum duration requests will delay when the service is throttling.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     * @throws IllegalArgumentException If {@code maxRetryDelay.isNegative()} or {@code maxRetryDelay.isZero()} is
     * true.
     * @throws NullPointerException If {@code maxRetryDelay} is null.
     */
    public SearchIndexingBufferedSenderOptions<T> setMaxRetryDelay(Duration maxRetryDelay) {
        Objects.requireNonNull(maxRetryDelay, "'maxRetryDelay' cannot be null.");

        if (maxRetryDelay.isNegative() || maxRetryDelay.isZero()) {
            throw logger.logExceptionAsError(
                new IllegalArgumentException("'maxRetryDelay' cannot be negative or zero."));
        }

        this.maxRetryDelay = maxRetryDelay;
        return this;
    }

    /**
     * Gets the maximum duration that requests will delay when the service is throttling.
     *
     * @return The maximum duration requests will delay when the service is throttling.
     */
    public Duration getMaxRetryDelay() {
        return maxRetryDelay;
    }

    /**
     * Callback hook for when a document indexing action has been added to a batch queued.
     *
     * @param onActionAddedConsumer The {@link Consumer} that is called when a document has been added to a batch
     * queue.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     */
    public SearchIndexingBufferedSenderOptions<T> setOnActionAdded(Consumer<IndexAction<T>> onActionAddedConsumer) {
        this.onActionAddedConsumer = onActionAddedConsumer;
        return this;
    }

    /**
     * Gets the {@link Consumer} that will be called when a document is added to a batch.
     *
     * @return The {@link Consumer} called when a document is added to a batch.
     */
    public Consumer<IndexAction<T>> getOnActionAdded() {
        return onActionAddedConsumer;
    }

    /**
     * Sets the callback hook for when a document indexing action has successfully completed indexing.
     *
     * @param onActionSucceededConsumer The {@link Consumer} that is called when a document has been successfully
     * indexing.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     */
    public SearchIndexingBufferedSenderOptions<T> setOnActionSucceeded(
        Consumer<IndexAction<T>> onActionSucceededConsumer) {
        this.onActionSucceededConsumer = onActionSucceededConsumer;
        return this;
    }

    /**
     * Gets the {@link Consumer} that will be called when a document is successfully indexed.
     *
     * @return The {@link Consumer} called when a document is successfully indexed.
     */
    public Consumer<IndexAction<T>> getOnActionSucceeded() {
        return onActionSucceededConsumer;
    }

    /**
     * Sets the callback hook for when a document indexing action has failed to index and isn't retryable.
     *
     * @param onActionErrorBiConsumer The {@link BiConsumer} that is called when a document has failed to index and
     * isn't retryable.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     */
    public SearchIndexingBufferedSenderOptions<T> setOnActionError(
        BiConsumer<IndexAction<T>, Throwable> onActionErrorBiConsumer) {
        this.onActionErrorBiConsumer = onActionErrorBiConsumer;
        return this;
    }

    /**
     * Gets the {@link BiConsumer} that will be called when a document has failed to index.
     *
     * @return The {@link BiConsumer} called when a document has failed to index.
     */
    public BiConsumer<IndexAction<T>, Throwable> getOnActionError() {
        return onActionErrorBiConsumer;
    }

    /**
     * Sets the callback hook for when a document indexing has been sent in a batching request.
     *
     * @param onActionSentConsumer The {@link Consumer} that is called when a document has been sent in a batch
     * request.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     */
    public SearchIndexingBufferedSenderOptions<T> setOnActionSent(Consumer<IndexAction<T>> onActionSentConsumer) {
        this.onActionSentConsumer = onActionSentConsumer;
        return this;
    }

    /**
     * Gets the {@link Consumer} that will be called when a document is sent in a batch.
     *
     * @return The {@link Consumer} called when a document is sent in a batch.
     */
    public Consumer<IndexAction<T>> getOnActionSent() {
        return onActionSentConsumer;
    }

    /**
     * Sets the function that retrieves the key value from a document.
     * <p>
     * This function must be sent for a buffered sender to be properly constructed. It is used to correlate response
     * values to the originating document.
     *
     * @param documentKeyRetriever Function that retrieves the key from an {@link IndexAction}.
     * @return The updated SearchIndexingBufferedSenderOptions object.
     * @throws NullPointerException If {@code documentKeyRetriever} is null.
     */
    public SearchIndexingBufferedSenderOptions<T> setDocumentKeyRetriever(
        Function<T, String> documentKeyRetriever) {
        this.documentKeyRetriever = Objects.requireNonNull(documentKeyRetriever,
            "'documentKeyRetriever' cannot be null");
        return this;
    }

    /**
     * Gets the function that retrieves the key value from a document.
     *
     * @return The function that retrieves the key value from a document.
     */
    public Function<T, String> getDocumentKeyRetriever() {
        return documentKeyRetriever;
    }
}