ChangeFeedProcessorBuilderImpl.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;
import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverFactory;
import com.azure.cosmos.implementation.changefeed.CheckpointFrequency;
import com.azure.cosmos.implementation.changefeed.HealthMonitor;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.PartitionController;
import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancer;
import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancingStrategy;
import com.azure.cosmos.implementation.changefeed.PartitionManager;
import com.azure.cosmos.implementation.changefeed.PartitionSupervisorFactory;
import com.azure.cosmos.implementation.changefeed.RequestOptionsFactory;
import com.azure.cosmos.models.ChangeFeedProcessorState;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import static com.azure.cosmos.CosmosBridgeInternal.getContextClient;
/**
* Helper class to buildAsyncClient {@link ChangeFeedProcessor} instances
* as logical representation of the Azure Cosmos DB database service.
*
* <pre>
* {@code
* ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
* .hostName(hostName)
* .feedContainer(feedContainer)
* .leaseContainer(leaseContainer)
* .handleChanges(docs -> {
* for (JsonNode item : docs) {
* // Implementation for handling and processing of each JsonNode item goes here
* }
* })
* .buildChangeFeedProcessor();
* }
* </pre>
*/
public class ChangeFeedProcessorBuilderImpl implements ChangeFeedProcessor, AutoCloseable {
private static final String PK_RANGE_ID_SEPARATOR = ":";
private static final String SEGMENT_SEPARATOR = "#";
private static final String PROPERTY_NAME_LSN = "_lsn";
private final Logger logger = LoggerFactory.getLogger(ChangeFeedProcessorBuilderImpl.class);
private final Duration sleepTime = Duration.ofSeconds(15);
private final Duration lockTime = Duration.ofSeconds(30);
private static final int DEFAULT_QUERY_PARTITIONS_MAX_BATCH_SIZE = 100;
private final static int DEFAULT_DEGREE_OF_PARALLELISM = 25; // default
private String hostName;
private ChangeFeedContextClient feedContextClient;
private ChangeFeedProcessorOptions changeFeedProcessorOptions;
private ChangeFeedObserverFactory observerFactory;
private volatile String databaseResourceId;
private volatile String collectionResourceId;
private ChangeFeedContextClient leaseContextClient;
private PartitionLoadBalancingStrategy loadBalancingStrategy;
private LeaseStoreManager leaseStoreManager;
private HealthMonitor healthMonitor;
private volatile PartitionManager partitionManager;
private Scheduler scheduler;
/**
* Start listening for changes asynchronously.
*
* @return a representation of the deferred computation of this call.
*/
@Override
public Mono<Void> start() {
if (this.partitionManager == null) {
return this.initializeCollectionPropertiesForBuild()
.flatMap( value -> this.getLeaseStoreManager()
.flatMap(this::buildPartitionManager))
.flatMap(partitionManager1 -> {
this.partitionManager = partitionManager1;
return this.partitionManager.start();
});
} else {
return partitionManager.start();
}
}
/**
* Stops listening for changes asynchronously.
*
* @return a representation of the deferred computation of this call.
*/
@Override
public Mono<Void> stop() {
if (this.partitionManager == null || !this.partitionManager.isRunning()) {
throw new IllegalStateException("The ChangeFeedProcessor instance has not fully started");
}
return this.partitionManager.stop();
}
/**
* Returns the state of the change feed processor.
*
* @return true if the change feed processor is currently active and running.
*/
@Override
public boolean isStarted() {
return this.partitionManager != null && this.partitionManager.isRunning();
}
/**
* Returns the current owner (host) and an approximation of the difference between the last processed item (defined
* by the state of the feed container) and the latest change in the container for each partition (lease
* document).
* <p>
* An empty map will be returned if the processor was not started or no lease documents matching the current
* {@link ChangeFeedProcessor} instance's lease prefix could be found.
*
* @return a map representing the current owner and lease token, the current LSN and latest LSN, and the estimated
* lag, asynchronously.
*/
@Override
public Mono<Map<String, Integer>> getEstimatedLag() {
Map<String, Integer> earlyResult = new ConcurrentHashMap<>();
if (this.leaseStoreManager == null || this.feedContextClient == null) {
return Mono.just(earlyResult);
}
return this.leaseStoreManager.getAllLeases()
.flatMap(lease -> {
final FeedRangeInternal feedRange = new FeedRangePartitionKeyRangeImpl(lease.getLeaseToken());
final CosmosChangeFeedRequestOptions options =
ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(
lease.getContinuationState(
this.collectionResourceId,
feedRange));
options.setMaxItemCount(1);
return this.feedContextClient.createDocumentChangeFeedQuery(
this.feedContextClient.getContainerClient(),
options)
.take(1)
.map(feedResponse -> {
String ownerValue = lease.getOwner();
String sessionTokenLsn = feedResponse.getSessionToken();
String parsedSessionToken = sessionTokenLsn.substring(
sessionTokenLsn.indexOf(PK_RANGE_ID_SEPARATOR));
String[] segments = StringUtils.split(parsedSessionToken, SEGMENT_SEPARATOR);
String latestLsn = segments[0];
if (segments.length >= 2) {
// default to Global LSN
latestLsn = segments[1];
}
if (ownerValue == null) {
ownerValue = "";
}
// An empty list of documents returned means that we are current (zero lag)
if (feedResponse.getResults() == null || feedResponse.getResults().size() == 0) {
return Pair.of(ownerValue + "_" + lease.getLeaseToken(), 0);
}
int currentLsn = 0;
int estimatedLag;
try {
currentLsn = Integer.parseInt(feedResponse.getResults().get(0).get(PROPERTY_NAME_LSN).asText("0"));
estimatedLag = Integer.parseInt(latestLsn);
estimatedLag = estimatedLag - currentLsn + 1;
} catch (NumberFormatException ex) {
logger.warn("Unexpected Cosmos LSN found", ex);
estimatedLag = -1;
}
return Pair.of(
ownerValue + "_" + lease.getLeaseToken() + "_" + currentLsn + "_" + latestLsn,
estimatedLag);
});
})
.collectList()
.map(valueList -> {
Map<String, Integer> result = new ConcurrentHashMap<>();
for (Pair<String, Integer> pair : valueList) {
result.put(pair.getKey(), pair.getValue());
}
return result;
});
}
/**
* Returns a list of states each representing one scoped worker item.
* <p>
* An empty list will be returned if the processor was not started or no lease items matching the current
* {@link ChangeFeedProcessor} instance's lease prefix could be found.
*
* @return a list of states each representing one scoped worker item.
*/
@Override
public Mono<List<ChangeFeedProcessorState>> getCurrentState() {
if (this.leaseStoreManager == null || this.feedContextClient == null) {
return Mono.just(Collections.unmodifiableList(new ArrayList<>()));
}
return this.leaseStoreManager.getAllLeases()
.flatMap(lease -> {
final FeedRangeInternal feedRange = new FeedRangePartitionKeyRangeImpl(lease.getLeaseToken());
final CosmosChangeFeedRequestOptions options =
ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(
lease.getContinuationState(
this.collectionResourceId,
feedRange));
options.setMaxItemCount(1);
return this.feedContextClient.createDocumentChangeFeedQuery(
this.feedContextClient.getContainerClient(),
options)
.take(1)
.map(feedResponse -> {
String sessionTokenLsn = feedResponse.getSessionToken();
String parsedSessionToken = sessionTokenLsn.substring(
sessionTokenLsn.indexOf(PK_RANGE_ID_SEPARATOR));
String[] segments = StringUtils.split(parsedSessionToken, SEGMENT_SEPARATOR);
String latestLsn = segments[0];
if (segments.length >= 2) {
// default to Global LSN
latestLsn = segments[1];
}
// lease.getId() - the ID of the lease item representing the persistent state of a
// change feed processor worker.
// latestLsn - a marker representing the latest item that will be processed.
ChangeFeedProcessorState changeFeedProcessorState = new ChangeFeedProcessorState()
.setHostName(lease.getOwner())
.setLeaseToken(lease.getLeaseToken());
// An empty list of documents returned means that we are current (zero lag)
if (feedResponse.getResults() == null || feedResponse.getResults().size() == 0) {
changeFeedProcessorState.setEstimatedLag(0)
.setContinuationToken(latestLsn);
return changeFeedProcessorState;
}
changeFeedProcessorState.setContinuationToken(
feedResponse.getResults().get(0).get(PROPERTY_NAME_LSN).asText(null));
int currentLsn;
int estimatedLag;
try {
currentLsn = Integer.parseInt(feedResponse.getResults().get(0).get(PROPERTY_NAME_LSN).asText("0"));
estimatedLag = Integer.parseInt(latestLsn);
estimatedLag = estimatedLag - currentLsn + 1;
changeFeedProcessorState.setEstimatedLag(estimatedLag);
} catch (NumberFormatException ex) {
logger.warn("Unexpected Cosmos LSN found", ex);
changeFeedProcessorState.setEstimatedLag(-1);
}
return changeFeedProcessorState;
});
})
.collectList()
.map(Collections::unmodifiableList);
}
/**
* Sets the host name.
*
* @param hostName the name to be used for the host. When using multiple hosts, each host must have a unique name.
* @return current Builder.
*/
public ChangeFeedProcessorBuilderImpl hostName(String hostName) {
this.hostName = hostName;
return this;
}
/**
* Sets and existing {@link CosmosAsyncContainer} to be used to read from the monitored collection.
*
* @param feedDocumentClient the instance of {@link CosmosAsyncContainer} to be used.
* @return current Builder.
*/
public ChangeFeedProcessorBuilderImpl feedContainer(CosmosAsyncContainer feedDocumentClient) {
if (feedDocumentClient == null) {
throw new IllegalArgumentException("feedContextClient");
}
this.feedContextClient = new ChangeFeedContextClientImpl(feedDocumentClient);
return this;
}
/**
* Sets the {@link ChangeFeedProcessorOptions} to be used.
*
* @param changeFeedProcessorOptions the change feed processor options to use.
* @return current Builder.
*/
public ChangeFeedProcessorBuilderImpl options(ChangeFeedProcessorOptions changeFeedProcessorOptions) {
if (changeFeedProcessorOptions == null) {
throw new IllegalArgumentException("changeFeedProcessorOptions");
}
this.changeFeedProcessorOptions = changeFeedProcessorOptions;
return this;
}
/**
* Sets the {@link ChangeFeedObserverFactory} to be used to generate {@link ChangeFeedObserver}
*
* @param observerFactory The instance of {@link ChangeFeedObserverFactory} to use.
* @return current Builder.
*/
public ChangeFeedProcessorBuilderImpl observerFactory(ChangeFeedObserverFactory observerFactory) {
if (observerFactory == null) {
throw new IllegalArgumentException("observerFactory");
}
this.observerFactory = observerFactory;
return this;
}
public ChangeFeedProcessorBuilderImpl handleChanges(Consumer<List<JsonNode>> consumer) {
return this.observerFactory(new DefaultObserverFactory(consumer));
}
/**
* Sets an existing {@link CosmosAsyncContainer} to be used to read from the leases collection.
*
* @param leaseClient the instance of {@link CosmosAsyncContainer} to use.
* @return current Builder.
*/
public ChangeFeedProcessorBuilderImpl leaseContainer(CosmosAsyncContainer leaseClient) {
if (leaseClient == null) {
throw new IllegalArgumentException("leaseClient");
}
if (!getContextClient(leaseClient).isContentResponseOnWriteEnabled()) {
throw new IllegalArgumentException("leaseClient: content response on write setting must be enabled");
}
ConsistencyLevel consistencyLevel = getContextClient(leaseClient).getConsistencyLevel();
if (consistencyLevel == ConsistencyLevel.CONSISTENT_PREFIX || consistencyLevel == ConsistencyLevel.EVENTUAL) {
logger.warn("leaseClient consistency level setting are less then expected which is SESSION");
}
this.leaseContextClient = new ChangeFeedContextClientImpl(leaseClient);
return this;
}
/**
* Builds a new instance of the {@link ChangeFeedProcessor} with the specified configuration asynchronously.
*
* @return an instance of {@link ChangeFeedProcessor}.
*/
public ChangeFeedProcessor build() {
if (this.hostName == null) {
throw new IllegalArgumentException("Host name was not specified");
}
if (this.observerFactory == null) {
throw new IllegalArgumentException("Observer was not specified");
}
if (this.changeFeedProcessorOptions != null && this.changeFeedProcessorOptions.getLeaseAcquireInterval().compareTo(ChangeFeedProcessorOptions.DEFAULT_ACQUIRE_INTERVAL) < 0) {
logger.warn("Found lower than expected setting for leaseAcquireInterval");
}
if (this.scheduler == null) {
this.scheduler = Schedulers.boundedElastic();
}
return this;
}
public ChangeFeedProcessorBuilderImpl() {
}
private Mono<ChangeFeedProcessor> initializeCollectionPropertiesForBuild() {
if (this.changeFeedProcessorOptions == null) {
this.changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
}
return this.feedContextClient
.readDatabase(this.feedContextClient.getDatabaseClient(), null)
.map( databaseResourceResponse -> {
this.databaseResourceId = databaseResourceResponse.getProperties().getId();
return this.databaseResourceId;
})
.flatMap( id -> this.feedContextClient
.readContainer(this.feedContextClient.getContainerClient(), null)
.map(documentCollectionResourceResponse -> {
this.collectionResourceId = documentCollectionResourceResponse.getProperties().getId();
return this;
}));
}
private Mono<LeaseStoreManager> getLeaseStoreManager() {
if (this.leaseStoreManager == null) {
return this.leaseContextClient.readContainerSettings(this.leaseContextClient.getContainerClient(), null)
.flatMap( collectionSettings -> {
boolean isPartitioned =
collectionSettings.getPartitionKeyDefinition() != null &&
collectionSettings.getPartitionKeyDefinition().getPaths() != null &&
collectionSettings.getPartitionKeyDefinition().getPaths().size() > 0;
if (!isPartitioned || (collectionSettings.getPartitionKeyDefinition().getPaths().size() != 1 || !collectionSettings.getPartitionKeyDefinition().getPaths().get(0).equals("/id"))) {
// throw new IllegalArgumentException("The lease collection, if partitioned, must have partition key equal to id.");
return Mono.error(new IllegalArgumentException("The lease collection must have partition key equal to id."));
}
RequestOptionsFactory requestOptionsFactory = new PartitionedByIdCollectionRequestOptionsFactory();
String leasePrefix = this.getLeasePrefix();
return LeaseStoreManager.builder()
.leasePrefix(leasePrefix)
.leaseCollectionLink(this.leaseContextClient.getContainerClient())
.leaseContextClient(this.leaseContextClient)
.requestOptionsFactory(requestOptionsFactory)
.hostName(this.hostName)
.build()
.map(manager -> {
this.leaseStoreManager = manager;
return this.leaseStoreManager;
});
});
}
return Mono.just(this.leaseStoreManager);
}
private String getLeasePrefix() {
String optionsPrefix = this.changeFeedProcessorOptions.getLeasePrefix();
if (optionsPrefix == null) {
optionsPrefix = "";
}
URI uri = this.feedContextClient.getServiceEndpoint();
return String.format(
"%s%s_%s_%s",
optionsPrefix,
uri.getHost(),
this.databaseResourceId,
this.collectionResourceId);
}
private Mono<PartitionManager> buildPartitionManager(LeaseStoreManager leaseStoreManager) {
CheckpointerObserverFactory factory = new CheckpointerObserverFactory(this.observerFactory, new CheckpointFrequency());
PartitionSynchronizerImpl synchronizer = new PartitionSynchronizerImpl(
this.feedContextClient,
this.feedContextClient.getContainerClient(),
leaseStoreManager,
leaseStoreManager,
DEFAULT_DEGREE_OF_PARALLELISM,
DEFAULT_QUERY_PARTITIONS_MAX_BATCH_SIZE,
this.collectionResourceId
);
Bootstrapper bootstrapper = new BootstrapperImpl(synchronizer, leaseStoreManager, this.lockTime, this.sleepTime);
PartitionSupervisorFactory partitionSupervisorFactory = new PartitionSupervisorFactoryImpl(
factory,
leaseStoreManager,
new PartitionProcessorFactoryImpl(
this.feedContextClient,
this.changeFeedProcessorOptions,
leaseStoreManager,
this.feedContextClient.getContainerClient(),
this.collectionResourceId),
this.changeFeedProcessorOptions,
this.scheduler
);
if (this.loadBalancingStrategy == null) {
this.loadBalancingStrategy = new EqualPartitionsBalancingStrategy(
this.hostName,
this.changeFeedProcessorOptions.getMinScaleCount(),
this.changeFeedProcessorOptions.getMaxScaleCount(),
this.changeFeedProcessorOptions.getLeaseExpirationInterval());
}
PartitionController partitionController = new PartitionControllerImpl(leaseStoreManager, leaseStoreManager, partitionSupervisorFactory, synchronizer, scheduler);
if (this.healthMonitor == null) {
this.healthMonitor = new TraceHealthMonitor();
}
PartitionController partitionController2 = new HealthMonitoringPartitionControllerDecorator(partitionController, this.healthMonitor);
PartitionLoadBalancer partitionLoadBalancer = new PartitionLoadBalancerImpl(
partitionController2,
leaseStoreManager,
this.loadBalancingStrategy,
this.changeFeedProcessorOptions.getLeaseAcquireInterval(),
this.scheduler
);
PartitionManager partitionManager = new PartitionManagerImpl(bootstrapper, partitionController, partitionLoadBalancer);
return Mono.just(partitionManager);
}
@Override
public void close() {
this.stop().subscribeOn(Schedulers.boundedElastic()).subscribe();
}
}