ChangeFeedProcessorOptions.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.models;
import com.azure.cosmos.ChangeFeedProcessor;
import java.time.Duration;
import java.time.Instant;
/**
* Specifies the options associated with {@link ChangeFeedProcessor}.
*/
public final class ChangeFeedProcessorOptions {
/**
* Default renew interval.
*/
public static final Duration DEFAULT_RENEW_INTERVAL = Duration.ofMillis(0).plusSeconds(17);
/**
* Default acquire interval.
*/
public static final Duration DEFAULT_ACQUIRE_INTERVAL = Duration.ofMillis(0).plusSeconds(13);
/**
* Default expiration interval.
*/
public static final Duration DEFAULT_EXPIRATION_INTERVAL = Duration.ofMillis(0).plusSeconds(60);
/**
* Default feed poll delay.
*/
public static final Duration DEFAULT_FEED_POLL_DELAY = Duration.ofMillis(0).plusSeconds(5);
private Duration leaseRenewInterval;
private Duration leaseAcquireInterval;
private Duration leaseExpirationInterval;
private Duration feedPollDelay;
private String leasePrefix;
private int maxItemCount;
private String startContinuation;
private Instant startTime;
private boolean startFromBeginning;
private int minScaleCount;
private int maxScaleCount;
/**
* Instantiates a new Change feed processor options.
*/
public ChangeFeedProcessorOptions() {
this.maxItemCount = 100;
this.startFromBeginning = false;
this.leaseRenewInterval = DEFAULT_RENEW_INTERVAL;
this.leaseAcquireInterval = DEFAULT_ACQUIRE_INTERVAL;
this.leaseExpirationInterval = DEFAULT_EXPIRATION_INTERVAL;
this.feedPollDelay = DEFAULT_FEED_POLL_DELAY;
this.maxScaleCount = 0; // unlimited
}
/**
* Gets the renew interval for all leases for partitions currently held by {@link ChangeFeedProcessor} instance.
*
* @return the renew interval for all leases for partitions.
*/
public Duration getLeaseRenewInterval() {
return this.leaseRenewInterval;
}
/**
* Sets the renew interval for all leases for partitions currently held by {@link ChangeFeedProcessor} instance.
*
* @param leaseRenewInterval the renew interval for all leases for partitions currently held by
* {@link ChangeFeedProcessor} instance.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setLeaseRenewInterval(Duration leaseRenewInterval) {
this.leaseRenewInterval = leaseRenewInterval;
return this;
}
/**
* Gets the interval to kick off a task to compute if partitions are distributed evenly among known host instances.
*
* @return the interval to kick off a task to compute if partitions are distributed evenly among known host
* instances.
*/
public Duration getLeaseAcquireInterval() {
return this.leaseAcquireInterval;
}
/**
* Sets he interval to kick off a task to compute if partitions are distributed evenly among known host instances.
*
* @param leaseAcquireInterval he interval to kick off a task to compute if partitions are distributed evenly
* among known host instances.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setLeaseAcquireInterval(Duration leaseAcquireInterval) {
this.leaseAcquireInterval = leaseAcquireInterval;
return this;
}
/**
* Gets the interval for which the lease is taken on a lease representing a partition.
*
* <p>
* If the lease is not renewed within this interval, it will cause it to expire and ownership of the partition will
* move to another {@link ChangeFeedProcessor} instance.
*
* @return the interval for which the lease is taken on a lease representing a partition.
*/
public Duration getLeaseExpirationInterval() {
return this.leaseExpirationInterval;
}
/**
* Sets the interval for which the lease is taken on a lease representing a partition.
*
* <p>
* If the lease is not renewed within this interval, it will cause it to expire and ownership of the partition will
* move to another {@link ChangeFeedProcessor} instance.
*
* @param leaseExpirationInterval the interval for which the lease is taken on a lease representing a partition.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setLeaseExpirationInterval(Duration leaseExpirationInterval) {
this.leaseExpirationInterval = leaseExpirationInterval;
return this;
}
/**
* Gets the delay in between polling a partition for new changes on the feed, after all current changes are drained.
*
* @return the delay in between polling a partition for new changes on the feed.
*/
public Duration getFeedPollDelay() {
return this.feedPollDelay;
}
/**
* Sets the delay in between polling a partition for new changes on the feed, after all current changes are drained.
*
* @param feedPollDelay the delay in between polling a partition for new changes on the feed, after all current
* changes are drained.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setFeedPollDelay(Duration feedPollDelay) {
this.feedPollDelay = feedPollDelay;
return this;
}
/**
* Gets a prefix to be used as part of the lease ID.
* <p>
* This can be used to support multiple instances of {@link ChangeFeedProcessor} instances pointing at the same
* feed while using the same auxiliary container.
*
* @return a prefix to be used as part of the lease ID.
*/
public String getLeasePrefix() {
return this.leasePrefix;
}
/**
* Sets a prefix to be used as part of the lease ID.
*
* @param leasePrefix a prefix to be used as part of the lease ID.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setLeasePrefix(String leasePrefix) {
this.leasePrefix = leasePrefix;
return this;
}
/**
* Gets the maximum number of items to be returned in the enumeration operation in the Azure Cosmos DB service.
*
* @return the maximum number of items to be returned in the enumeration operation in the Azure Cosmos DB service.
*/
public int getMaxItemCount() {
return this.maxItemCount;
}
/**
* Sets the maximum number of items to be returned in the enumeration operation.
*
* @param maxItemCount the maximum number of items to be returned in the enumeration operation.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setMaxItemCount(int maxItemCount) {
this.maxItemCount = maxItemCount;
return this;
}
/**
* Gets the start request continuation token to start looking for changes after.
* <p>
* This option can be used when lease store is not initialized and it is ignored if a lease item exists and
* has continuation token that is not null. If this is specified, both StartTime and StartFromBeginning are ignored.
*
* @return the string representing a continuation token that will be used to get item feeds starting with.
*/
public String getStartContinuation() {
return this.startContinuation;
}
/**
* Sets the start request continuation token to start looking for changes after.
* <p>
* This option can be used when lease store is not initialized and it is ignored if a lease item exists and
* has continuation token that is not null. If this is specified, both StartTime and StartFromBeginning are ignored.
*
* @param startContinuation the start request continuation token to start looking for changes after.
* @return the string representing a continuation token that will be used to get item feeds starting with.
*/
public ChangeFeedProcessorOptions setStartContinuation(String startContinuation) {
this.startContinuation = startContinuation;
return this;
}
/**
* Gets the time (exclusive) to start looking for changes after.
* <p>
* This option can be used when:
* (1) Lease items are not initialized; this setting will be ignored if the lease items exists and have a
* valid continuation token.
* (2) Start continuation token option is not specified.
* If this option is specified, "start from beginning" option is ignored.
*
* @return the time (exclusive) to start looking for changes after.
*/
public Instant getStartTime() {
return this.startTime;
}
/**
* Sets the time (exclusive) to start looking for changes after (UTC time).
* <p>
* This option can be used when:
* (1) Lease items are not initialized; this setting will be ignored if the lease items exists and have a
* valid continuation token.
* (2) Start continuation token option is not specified.
* If this option is specified, "start from beginning" option is ignored.
*
* @param startTime the time (exclusive) to start looking for changes after.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setStartTime(Instant startTime) {
this.startTime = startTime;
return this;
}
/**
* Gets a value indicating whether change feed in the Azure Cosmos DB service should start from beginning (true)
* or from current (false). By default it's start from current (false).
* <p>
* This option can be used when:
* (1) Lease items are not initialized; this setting will be ignored if the lease items exists and have a
* valid continuation token.
* (2) Start continuation token option is not specified.
* (3) Start time option is not specified.
*
* @return a value indicating whether change feed in the Azure Cosmos DB service should start from.
*/
public boolean isStartFromBeginning() {
return this.startFromBeginning;
}
/**
* Sets a value indicating whether change feed in the Azure Cosmos DB service should start from beginning.
* <p>
* This option can be used when:
* (1) Lease items are not initialized; this setting will be ignored if the lease items exists and have a
* valid continuation token.
* (2) Start continuation token option is not specified.
* (3) Start time option is not specified.
*
* @param startFromBeginning Indicates to start from beginning if true
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setStartFromBeginning(boolean startFromBeginning) {
this.startFromBeginning = startFromBeginning;
return this;
}
/**
* Gets the minimum partition count (parallel workers) for the current host.
* <p>
* This option can be used to increase the number of partitions (parallel workers) for the host and thus override
* the default equal distribution of leases between multiple hosts.
*
* @return the minimum scale count for the host.
*/
public int getMinScaleCount() {
return this.minScaleCount;
}
/**
* Sets the minimum partition count (parallel workers) for the current host.
* <p>
* This option can be used to increase the number of partitions (parallel workers) for the host and thus override
* the default equal distribution of leases between multiple hosts.
*
* @param minScaleCount the minimum partition count for the host.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setMinScaleCount(int minScaleCount) {
this.minScaleCount = minScaleCount;
return this;
}
/**
* Gets the maximum number of partitions (parallel workers) the host can run.
* <p>
* This option can be used to limit the number of partitions (parallel workers) for the host and thus override
* the default equal distribution of leases between multiple hosts. Default setting is "0", unlimited.
*
* @return the maximum number of partitions (parallel workers) the host can run.
*/
public int getMaxScaleCount() {
return this.maxScaleCount;
}
/**
* Sets the maximum number of partitions (parallel workers) the host can run.
* <p>
* This option can be used to limit the number of partitions (parallel workers) for the host and thus override
* the default equal distribution of leases between multiple hosts. Default setting is "0", unlimited.
*
* @param maxScaleCount the maximum number of partitions (parallel workers) the host can run.
* @return the current ChangeFeedProcessorOptions instance.
*/
public ChangeFeedProcessorOptions setMaxScaleCount(int maxScaleCount) {
this.maxScaleCount = maxScaleCount;
return this;
}
}