PartitionSynchronizerImpl.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.PartitionSynchronizer;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashSet;
import java.util.Set;
import static com.azure.cosmos.BridgeInternal.extractContainerSelfLink;
/**
* Implementation for the partition synchronizer.
*/
class PartitionSynchronizerImpl implements PartitionSynchronizer {
private final Logger logger = LoggerFactory.getLogger(PartitionSynchronizerImpl.class);
private final ChangeFeedContextClient documentClient;
private final CosmosAsyncContainer collectionSelfLink;
private final LeaseContainer leaseContainer;
private final LeaseManager leaseManager;
private final int degreeOfParallelism;
private final int maxBatchSize;
private final String collectionResourceId;
public PartitionSynchronizerImpl(
ChangeFeedContextClient documentClient,
CosmosAsyncContainer collectionSelfLink,
LeaseContainer leaseContainer,
LeaseManager leaseManager,
int degreeOfParallelism,
int maxBatchSize,
String collectionResourceId) {
this.documentClient = documentClient;
this.collectionSelfLink = collectionSelfLink;
this.leaseContainer = leaseContainer;
this.leaseManager = leaseManager;
this.degreeOfParallelism = degreeOfParallelism;
this.maxBatchSize = maxBatchSize;
this.collectionResourceId = collectionResourceId;
}
@Override
public Mono<Void> createMissingLeases() {
// TODO: log the partition getKey ID found.
return this.enumPartitionKeyRanges()
.map(Resource::getId)
.collectList()
.flatMap( partitionKeyRangeIds -> {
Set<String> leaseTokens = new HashSet<>(partitionKeyRangeIds);
return this.createLeases(leaseTokens).then();
})
.onErrorResume( throwable -> {
// TODO: log the exception.
return Mono.empty();
});
}
@Override
public Flux<Lease> splitPartition(Lease lease) {
if (lease == null) {
throw new IllegalArgumentException("lease");
}
final String leaseToken = lease.getLeaseToken();
// TODO fabianm - this needs more elaborate processing in case the initial
// FeedRangeContinuation has continuation state for multiple feed Ranges
// and with merge multiple CompositeContinuationItems
// Means Split/Merge needs to be pushed into the FeedRangeContinuation
// Will be necessary for merge anyway
// but efficient testing only works if at least EPK filtering is available in Emulator
// or at least Service - this will be part of the next set of changes
// For now - no merge just simple V0 of lease contract
// this simplification will work
//
//ChangeFeedState lastContinuationState = lease.getContinuationState(
// this.collectionResourceId,
// new FeedRangePartitionKeyRangeImpl(leaseToken)
//);
//
//final String lastContinuationToken = lastContinuationState.getContinuation() != null ?
// lastContinuationState.getContinuation().getCurrentContinuationToken().getToken() :
// null;
// "Push" ChangeFeedProcessor is not merge-proof currently. For such cases we need a specific handler that can
// take multiple leases and "converge" them in a thread safe manner while also merging the various continuation
// tokens for each merged lease.
// We will directly reuse the original/parent continuation token as the seed for the new leases until then.
final String lastContinuationToken = lease.getContinuationToken();
logger.info("Partition {} is gone due to split; will attempt to resume using continuation token {}.", leaseToken, lastContinuationToken);
// After a split, the children are either all or none available
return this.enumPartitionKeyRanges()
.filter(range -> range != null && range.getParents() != null && range.getParents().contains(leaseToken))
.map(PartitionKeyRange::getId)
.collectList()
.flatMapMany(addedLeaseTokens -> {
if (addedLeaseTokens.size() == 0) {
logger.error("Partition {} had split but we failed to find at least one child partition", leaseToken);
throw new RuntimeException(String.format("Partition %s had split but we failed to find at least one child partition", leaseToken));
}
return Flux.fromIterable(addedLeaseTokens);
})
.flatMap(addedRangeId -> {
// Creating new lease.
return this.leaseManager.createLeaseIfNotExist(addedRangeId, lastContinuationToken);
}, this.degreeOfParallelism)
.map(newLease -> {
logger.info("Partition {} split into new partition with lease token {} and continuation token {}.", leaseToken, newLease.getLeaseToken(), lastContinuationToken);
return newLease;
});
}
private Flux<PartitionKeyRange> enumPartitionKeyRanges() {
String partitionKeyRangesPath = extractContainerSelfLink(this.collectionSelfLink);
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
ModelBridgeInternal.setQueryRequestOptionsContinuationTokenAndMaxItemCount(cosmosQueryRequestOptions, null, this.maxBatchSize);
return this.documentClient.readPartitionKeyRangeFeed(partitionKeyRangesPath, cosmosQueryRequestOptions)
.map(FeedResponse::getResults)
.flatMap(Flux::fromIterable)
.onErrorResume(throwable -> {
// TODO: Log the exception.
return Flux.empty();
});
}
/**
* Creates leases if they do not exist. This might happen on initial start or if some lease was unexpectedly lost.
* <p>
* Leases are created without the continuation token. It means partitions will be read according to
* 'From Beginning' or 'From current time'.
* Same applies also to split partitions. We do not search for parent lease and take continuation token since this
* might end up of reprocessing all the events since the split.
*
* @param leaseTokens a hash set of all the lease tokens.
* @return a deferred computation of this call.
*/
private Flux<Lease> createLeases(Set<String> leaseTokens)
{
Set<String> addedLeaseTokens = new HashSet<>(leaseTokens);
return this.leaseContainer.getAllLeases()
.map(lease -> {
if (lease != null) {
// Get leases after getting ranges, to make sure that no other hosts checked in continuation for
// split partition after we got leases.
addedLeaseTokens.remove(lease.getLeaseToken());
}
return lease;
})
.thenMany(Flux.fromIterable(addedLeaseTokens)
.flatMap( addedRangeId ->
this.leaseManager.createLeaseIfNotExist(addedRangeId, null), this.degreeOfParallelism)
.map( lease -> {
// TODO: log the lease info that was added.
return lease;
})
);
}
}