CosmosChangeFeedRequestOptions.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.models;

import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedStartFromInternal;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedState;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.util.Beta;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
 * Encapsulates options that can be specified for an operation within a change feed request.
 */
@Beta(value = Beta.SinceVersion.V4_12_0, warningText =
    Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public final class CosmosChangeFeedRequestOptions {
    private static final int DEFAULT_MAX_ITEM_COUNT = 100;
    private static final int DEFAULT_MAX_PREFETCH_PAGE_COUNT = 1;
    private final ChangeFeedState continuationState;
    private final FeedRangeInternal feedRangeInternal;
    private final Map<String, Object> properties;
    private int maxItemCount;
    private int maxPrefetchPageCount;
    private ChangeFeedMode mode;
    private ChangeFeedStartFromInternal startFromInternal;
    private boolean isSplitHandlingDisabled;
    private boolean quotaInfoEnabled;
    private String throughputControlGroupName;
    private Map<String, String> customOptions;

    private CosmosChangeFeedRequestOptions(
        FeedRangeInternal feedRange,
        ChangeFeedStartFromInternal startFromInternal,
        ChangeFeedMode mode,
        ChangeFeedState continuationState) {
        super();

        if (feedRange == null) {
            throw new NullPointerException("feedRange");
        }

        if (startFromInternal == null) {
            throw new NullPointerException("startFromInternal");
        }

        this.maxItemCount = DEFAULT_MAX_ITEM_COUNT;
        this.maxPrefetchPageCount = DEFAULT_MAX_PREFETCH_PAGE_COUNT;
        this.feedRangeInternal = feedRange;
        this.startFromInternal = startFromInternal;
        this.continuationState = continuationState;

        if (mode != ChangeFeedMode.INCREMENTAL && mode != ChangeFeedMode.FULL_FIDELITY) {
            throw new IllegalArgumentException(
                String.format(
                    "Argument 'mode' has unsupported change feed mode %s",
                    mode.toString()));
        }

        this.mode = mode;
        this.properties = new HashMap<>();
        this.isSplitHandlingDisabled = false;
    }

    ChangeFeedState getContinuation() {
        return this.continuationState;
    }

    /**
     * Gets the feed range.
     *
     * @return the feed range.
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public FeedRange getFeedRange() {
        return this.feedRangeInternal;
    }

    /**
     * Gets the maximum number of items to be returned in the enumeration
     * operation.
     *
     * @return the max number of items.
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public int getMaxItemCount() {
        return this.maxItemCount;
    }

    /**
     * Sets the maximum number of items to be returned in the enumeration
     * operation.
     *
     * @param maxItemCount the max number of items.
     * @return the FeedOptionsBase.
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public CosmosChangeFeedRequestOptions setMaxItemCount(int maxItemCount) {
        this.maxItemCount = maxItemCount;
        return this;
    }

    /**
     * Gets the maximum number of pages that will be prefetched from the backend asynchronously
     * in the background. By pre-fetching these changes the throughput of processing the
     * change feed records can be increased because the processing doesn't have to stop while
     * waiting for the IO operations to retrieve a new page form the backend to complete. The
     * only scenario where it can be useful to disable prefetching pages (with
     * setMaxPrefetchPageCount(0))
     * would be when the caller only plans to retrieve just one page - so any prefetched pages
     * would not be used anyway.
     *
     * @return the modified change feed request options.
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public int getMaxPrefetchPageCount() {
        return this.maxPrefetchPageCount;
    }

    /**
     * Sets the maximum number of pages that will be prefetched from the backend asynchronously
     * in the background. By pre-fetching these changes the throughput of processing the
     * change feed records can be increased because the processing doesn't have to stop while
     * waiting for the IO operations to retrieve a new page form the backend to complete. The
     * only scenario where it can be useful to disable prefetching pages (with
     * setMaxPrefetchPageCount(0))
     * would be when the caller only plans to retrieve just one page - so any prefetched pages
     * would not be used anyway.
     *
     * @param maxPrefetchPageCount the max number of pages that will be prefetched from the backend
     *                             asynchronously in the background
     * @return the modified change feed request options.
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public CosmosChangeFeedRequestOptions setMaxPrefetchPageCount(int maxPrefetchPageCount) {
        checkArgument(
            maxPrefetchPageCount > 0,
            "Argument 'maxPrefetchCount' must be larger than 0.");
        this.maxPrefetchPageCount = maxPrefetchPageCount;

        return this;
    }

    /**
     * Gets the quotaInfoEnabled setting for change feed request in the Azure Cosmos DB database service.
     * quotaInfoEnabled is used to enable/disable getting quota related stats
     *
     * @return true if quotaInfoEnabled is enabled
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public boolean isQuotaInfoEnabled() {
        return quotaInfoEnabled;
    }

    /**
     * Gets the quotaInfoEnabled setting for change feed request in the Azure Cosmos DB database service.
     * quotaInfoEnabled is used to enable/disable getting quota related stats
     *
     * @param quotaInfoEnabled a boolean value indicating whether quotaInfoEnabled is enabled or not
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public void setQuotaInfoEnabled(boolean quotaInfoEnabled) {
        this.quotaInfoEnabled = quotaInfoEnabled;
    }

    boolean isSplitHandlingDisabled() {
        return this.isSplitHandlingDisabled;
    }

    CosmosChangeFeedRequestOptions disableSplitHandling() {
        this.isSplitHandlingDisabled = true;
        return this;
    }

    ChangeFeedMode getMode() {
        return this.mode;
    }

    /**
     * Gets the properties
     *
     * @return Map of request options properties
     */
    Map<String, Object> getProperties() {
        return properties;
    }

    ChangeFeedStartFromInternal getStartFromSettings() {
        return this.startFromInternal;
    }

    /**
     * Creates a new {@link CosmosChangeFeedRequestOptions} instance to start processing
     * change feed items from the beginning of the change feed
     *
     * @param feedRange The {@link FeedRange} that is used to define the scope (entire container,
     *                  logical partition or subset of a container)
     * @return a new {@link CosmosChangeFeedRequestOptions} instance
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public static CosmosChangeFeedRequestOptions createForProcessingFromBeginning(FeedRange feedRange) {
        checkNotNull(feedRange, "Argument 'feedRange' must not be null.");

        return new CosmosChangeFeedRequestOptions(
            FeedRangeInternal.convert(feedRange),
            ChangeFeedStartFromInternal.createFromBeginning(),
            ChangeFeedMode.INCREMENTAL,
            null);
    }

    /**
     * Creates a new {@link CosmosChangeFeedRequestOptions} instance to start processing
     * change feed items from a previous continuation
     *
     * @param continuation The continuation that was retrieved from a previously retrieved
     *                     FeedResponse
     * @return a new {@link CosmosChangeFeedRequestOptions} instance
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public static CosmosChangeFeedRequestOptions createForProcessingFromContinuation(
        String continuation) {

        final ChangeFeedState changeFeedState = ChangeFeedState.fromString(continuation);

        return createForProcessingFromContinuation(changeFeedState);
    }

    static CosmosChangeFeedRequestOptions createForProcessingFromContinuation(
        ChangeFeedState changeFeedState) {

        FeedRangeInternal feedRange = changeFeedState.getFeedRange();
        FeedRangeContinuation continuation = changeFeedState.getContinuation();
        ChangeFeedMode mode = changeFeedState.getMode();

        if (continuation != null) {
            CompositeContinuationToken continuationToken =
                continuation.getCurrentContinuationToken();
            if (continuationToken != null) {
                String etag = continuationToken.getToken();
                return new CosmosChangeFeedRequestOptions(
                    feedRange,
                    ChangeFeedStartFromInternal.createFromETagAndFeedRange(etag, feedRange),
                    mode,
                    changeFeedState);
            }

            return new CosmosChangeFeedRequestOptions(
                feedRange,
                ChangeFeedStartFromInternal.createFromBeginning(),
                mode,
                changeFeedState);
        }

        return new CosmosChangeFeedRequestOptions(
            feedRange,
            changeFeedState.getStartFromSettings(),
            mode,
            changeFeedState);
    }

    static CosmosChangeFeedRequestOptions createForProcessingFromEtagAndFeedRange(
        String etag,
        FeedRange feedRange) {

        if (etag != null) {
            return new CosmosChangeFeedRequestOptions(
                FeedRangeInternal.convert(feedRange),
                ChangeFeedStartFromInternal.createFromETagAndFeedRange(etag,
                    FeedRangeInternal.convert(feedRange)),
                ChangeFeedMode.INCREMENTAL,
                null);
        }

        return new CosmosChangeFeedRequestOptions(
            FeedRangeInternal.convert(feedRange),
            ChangeFeedStartFromInternal.createFromBeginning(),
            ChangeFeedMode.INCREMENTAL,
            null);
    }

    /**
     * Creates a new {@link CosmosChangeFeedRequestOptions} instance to start processing
     * change feed items from the current time - so only events for all future changes will be
     * retrieved
     *
     * @param feedRange The {@link FeedRange} that is used to define the scope (entire container,
     *                  logical partition or subset of a container)
     * @return a new {@link CosmosChangeFeedRequestOptions} instance
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public static CosmosChangeFeedRequestOptions createForProcessingFromNow(FeedRange feedRange) {
        if (feedRange == null) {
            throw new NullPointerException("feedRange");
        }

        return new CosmosChangeFeedRequestOptions(
            FeedRangeInternal.convert(feedRange),
            ChangeFeedStartFromInternal.createFromNow(),
            ChangeFeedMode.INCREMENTAL,
            null);
    }

    /**
     * Creates a new {@link CosmosChangeFeedRequestOptions} instance to start processing
     * change feed items from a certain point in time
     *
     * @param pointInTime The point in time from which processing of change feed events should start
     * @param feedRange   The {@link FeedRange} that is used to define the scope (entire container,
     *                    logical partition or subset of a container)
     * @return a new {@link CosmosChangeFeedRequestOptions} instance
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public static CosmosChangeFeedRequestOptions createForProcessingFromPointInTime(
        Instant pointInTime,
        FeedRange feedRange) {

        if (pointInTime == null) {
            throw new NullPointerException("pointInTime");
        }

        if (feedRange == null) {
            throw new NullPointerException("feedRange");
        }

        return new CosmosChangeFeedRequestOptions(
            FeedRangeInternal.convert(feedRange),
            ChangeFeedStartFromInternal.createFromPointInTime(pointInTime),
            ChangeFeedMode.INCREMENTAL,
            null);
    }

    void setRequestContinuation(String etag) {
        this.startFromInternal = ChangeFeedStartFromInternal.createFromETagAndFeedRange(
            etag,
            this.feedRangeInternal);
    }

    CosmosChangeFeedRequestOptions withCosmosPagedFluxOptions(
        CosmosPagedFluxOptions pagedFluxOptions) {

        if (pagedFluxOptions == null) {
            return this;
        }

        CosmosChangeFeedRequestOptions effectiveRequestOptions = this;

        if (pagedFluxOptions.getRequestContinuation() != null) {
            effectiveRequestOptions =
                CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(
                    pagedFluxOptions.getRequestContinuation());
            effectiveRequestOptions.setMaxPrefetchPageCount(this.getMaxPrefetchPageCount());
            effectiveRequestOptions.setThroughputControlGroupName(this.getThroughputControlGroupName());
        }

        if (pagedFluxOptions.getMaxItemCount() != null) {
            effectiveRequestOptions.setMaxItemCount(pagedFluxOptions.getMaxItemCount());
        }

        return effectiveRequestOptions;
    }

    /**
     * Changes the change feed mode so that the change feed will contain events for creations,
     * deletes as well as all intermediary snapshots for updates. Enabling full fidelity change feed
     * mode requires configuring a retention duration in the change feed policy of the
     * container. {@link ChangeFeedPolicy}
     * <p>
     * Intermediary snapshots of changes as well as deleted documents would be
     * available for processing for 8 minutes before they vanish.
     * When enabling full fidelity mode you will only be able to process change feed events
     * within the retention window configured in the change feed policy of the container.
     * If you attempt to process a change feed after more than the retention window
     * an error (Status Code 400) will be returned because the events for intermediary
     * updates and deletes have vanished.
     * It would still be possible to process changes using Incremental mode even when
     * configuring a full fidelity change feed policy with retention window on the container
     * and when using Incremental mode it doesn't matter whether your are out of the retention
     * window or not - but no events for deletes or intermediary updates would be included.
     * When events are not getting processed within the retention window it is also possible
     * to continue processing future events in full fidelity mode by querying the change feed
     * with a new CosmosChangeFeedRequestOptions instance.
     * </p>
     *
     * @return a {@link CosmosChangeFeedRequestOptions} instance with full fidelity mode enabled
     */
    @Beta(value = Beta.SinceVersion.V4_12_0, warningText =
        Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public CosmosChangeFeedRequestOptions fullFidelity() {

        if (!this.startFromInternal.supportsFullFidelityRetention()) {
            throw new IllegalStateException(
                "Full fidelity retention is not supported for the chosen change feed start from " +
                    "option. Use CosmosChangeFeedRequestOptions.createForProcessingFromNow or " +
                    "CosmosChangeFeedRequestOptions.createFromContinuation instead."
            );
        }

        this.mode = ChangeFeedMode.FULL_FIDELITY;
        return this;
    }

    /**
     * Get the throughput control group name.
     *
     * @return The throughput control group name.
     */
    @Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public String getThroughputControlGroupName() {
        return this.throughputControlGroupName;
    }

    /**
     * Set the throughput control group name.
     *
     * @param throughputControlGroupName The throughput control group name.
     * @return A {@link CosmosChangeFeedRequestOptions}.
     */
    @Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public CosmosChangeFeedRequestOptions setThroughputControlGroupName(String throughputControlGroupName) {
        this.throughputControlGroupName = throughputControlGroupName;
        return this;
    }

    /**
     * Sets the custom change feed request option value by key
     *
     * @param name  a string representing the custom option's name
     * @param value a string representing the custom option's value
     *
     * @return the CosmosChangeFeedRequestOptions.
     */
    CosmosChangeFeedRequestOptions setHeader(String name, String value) {
        if (this.customOptions == null) {
            this.customOptions = new HashMap<>();
        }
        this.customOptions.put(name, value);
        return this;
    }

    /**
     * Gets the custom change feed request options
     *
     * @return Map of custom request options
     */
    Map<String, String> getHeaders() {
        return this.customOptions;
    }

    ///////////////////////////////////////////////////////////////////////////////////////////
    // the following helper/accessor only helps to access this class outside of this package.//
    ///////////////////////////////////////////////////////////////////////////////////////////

    static {
        ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.setCosmosChangeFeedRequestOptionsAccessor(
            new ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.CosmosChangeFeedRequestOptionsAccessor() {

                @Override
                public CosmosChangeFeedRequestOptions setHeader(CosmosChangeFeedRequestOptions changeFeedRequestOptions, String name, String value) {
                    return changeFeedRequestOptions.setHeader(name, value);
                }

                @Override
                public Map<String, String> getHeader(CosmosChangeFeedRequestOptions changeFeedRequestOptions) {
                    return changeFeedRequestOptions.getHeaders();
                }
            });
    }
}