LeaseStoreManagerImpl.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseStore;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManagerSettings;
import com.azure.cosmos.implementation.changefeed.RequestOptionsFactory;
import com.azure.cosmos.implementation.changefeed.ServiceItemLease;
import com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Collections;
/**
* Provides flexible way to buildAsyncClient lease manager constructor parameters.
* For the actual creation of lease manager instance, delegates to lease manager factory.
*/
public class LeaseStoreManagerImpl implements LeaseStoreManager, LeaseStoreManager.LeaseStoreManagerBuilderDefinition {
private final String LEASE_STORE_MANAGER_LEASE_SUFFIX = "..";
private final Logger logger = LoggerFactory.getLogger(LeaseStoreManagerImpl.class);
private LeaseStoreManagerSettings settings;
private ChangeFeedContextClient leaseDocumentClient;
private RequestOptionsFactory requestOptionsFactory;
private ServiceItemLeaseUpdater leaseUpdater;
private LeaseStore leaseStore;
public static LeaseStoreManagerBuilderDefinition builder() {
return new LeaseStoreManagerImpl();
}
public LeaseStoreManagerImpl() {
this.settings = new LeaseStoreManagerSettings();
}
@Override
public LeaseStoreManagerBuilderDefinition leaseContextClient(ChangeFeedContextClient leaseContextClient) {
if (leaseContextClient == null) {
throw new IllegalArgumentException("leaseContextClient");
}
this.leaseDocumentClient = leaseContextClient;
return this;
}
@Override
public LeaseStoreManagerBuilderDefinition leasePrefix(String leasePrefix) {
if (leasePrefix == null) {
throw new IllegalArgumentException("leasePrefix");
}
this.settings.withContainerNamePrefix(leasePrefix);
return this;
}
@Override
public LeaseStoreManagerBuilderDefinition leaseCollectionLink(CosmosAsyncContainer leaseCollectionLink) {
if (leaseCollectionLink == null) {
throw new IllegalArgumentException("leaseCollectionLink");
}
this.settings.withLeaseCollectionLink(leaseCollectionLink);
return this;
}
@Override
public LeaseStoreManagerBuilderDefinition requestOptionsFactory(RequestOptionsFactory requestOptionsFactory) {
if (requestOptionsFactory == null) {
throw new IllegalArgumentException("requestOptionsFactory");
}
this.requestOptionsFactory = requestOptionsFactory;
return this;
}
@Override
public LeaseStoreManagerBuilderDefinition hostName(String hostName) {
if (hostName == null) {
throw new IllegalArgumentException("hostName");
}
this.settings.withHostName(hostName);
return this;
}
@Override
public Mono<LeaseStoreManager> build() {
if (this.settings == null) {
throw new IllegalArgumentException("properties");
}
if (this.settings.getContainerNamePrefix() == null) {
throw new IllegalArgumentException("properties.containerNamePrefix");
}
if (this.settings.getLeaseCollectionLink() == null) {
throw new IllegalArgumentException("properties.leaseCollectionLink");
}
if (this.settings.getHostName() == null || this.settings.getHostName().isEmpty()) {
throw new IllegalArgumentException("properties.hostName");
}
if (this.leaseDocumentClient == null) {
throw new IllegalArgumentException("leaseDocumentClient");
}
if (this.requestOptionsFactory == null) {
throw new IllegalArgumentException("requestOptionsFactory");
}
if (this.leaseUpdater == null) {
this.leaseUpdater = new DocumentServiceLeaseUpdaterImpl(leaseDocumentClient);
}
this.leaseStore = new DocumentServiceLeaseStore(
this.leaseDocumentClient,
this.settings.getContainerNamePrefix(),
this.settings.getLeaseCollectionLink(),
this.requestOptionsFactory);
return Mono.just(this);
}
@Override
public Flux<Lease> getAllLeases() {
return this.listDocuments(this.getPartitionLeasePrefix())
.map(documentServiceLease -> documentServiceLease);
}
@Override
public Flux<Lease> getOwnedLeases() {
return this.getAllLeases()
.filter(lease -> lease.getOwner() != null && lease.getOwner().equalsIgnoreCase(this.settings.getHostName()));
}
@Override
public Mono<Lease> createLeaseIfNotExist(String leaseToken, String continuationToken) {
if (leaseToken == null) {
throw new IllegalArgumentException("leaseToken");
}
String leaseDocId = this.getDocumentId(leaseToken);
ServiceItemLease documentServiceLease = new ServiceItemLease()
.withId(leaseDocId)
.withLeaseToken(leaseToken)
.withContinuationToken(continuationToken);
return this.leaseDocumentClient.createItem(this.settings.getLeaseCollectionLink(), documentServiceLease, null, false)
.onErrorResume( ex -> {
if (ex instanceof CosmosException) {
CosmosException e = (CosmosException) ex;
if (e.getStatusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_CONFLICT) {
logger.info("Some other host created lease for {}.", leaseToken);
return Mono.empty();
}
}
return Mono.error(ex);
})
.map(documentResourceResponse -> {
if (documentResourceResponse == null) {
return null;
}
InternalObjectNode document = BridgeInternal.getProperties(documentResourceResponse);
logger.info("Created lease for partition {}.", leaseToken);
return documentServiceLease
.withId(document.getId())
.withETag(document.getETag())
.withTs(ModelBridgeInternal.getStringFromJsonSerializable(document, Constants.Properties.LAST_MODIFIED));
});
}
@Override
public Mono<Void> delete(Lease lease) {
if (lease == null || lease.getId() == null) {
throw new IllegalArgumentException("lease");
}
return this.leaseDocumentClient
.deleteItem(lease.getId(), new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease))
.onErrorResume( ex -> {
if (ex instanceof CosmosException) {
CosmosException e = (CosmosException) ex;
if (e.getStatusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_NOT_FOUND) {
// Ignore - document was already deleted.
return Mono.empty();
}
}
return Mono.error(ex);
})
// return some add-hoc value since we don't actually care about the result.
.map( documentResourceResponse -> true)
.then();
}
@Override
public Mono<Lease> acquire(Lease lease) {
if (lease == null) {
throw new IllegalArgumentException("lease");
}
String oldOwner = lease.getOwner();
return this.leaseUpdater.updateLease(
lease,
lease.getId(),
new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
serverLease -> {
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(oldOwner)) {
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
serverLease.setOwner(this.settings.getHostName());
serverLease.setProperties(lease.getProperties());
return serverLease;
});
}
@Override
public Mono<Void> release(Lease lease) {
if (lease == null) {
throw new IllegalArgumentException("lease");
}
return this.leaseDocumentClient.readItem(lease.getId(),
new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
InternalObjectNode.class)
.onErrorResume( ex -> {
if (ex instanceof CosmosException) {
CosmosException e = (CosmosException) ex;
if (e.getStatusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_NOT_FOUND) {
logger.info("Partition {} failed to renew lease. The lease is gone already.", lease.getLeaseToken());
throw new LeaseLostException(lease);
}
}
return Mono.error(ex);
})
.map( documentResourceResponse -> ServiceItemLease.fromDocument(BridgeInternal.getProperties(documentResourceResponse)))
.flatMap( refreshedLease -> this.leaseUpdater.updateLease(
refreshedLease,
lease.getId(),
new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
serverLease ->
{
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
logger.info("Partition {} no need to release lease. The lease was already taken by another host '{}'.", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
serverLease.setOwner(null);
return serverLease;
})
).then();
}
@Override
public Mono<Lease> renew(Lease lease) {
if (lease == null) {
throw new IllegalArgumentException("lease");
}
// Get fresh lease. The assumption here is that check-pointing is done with higher frequency than lease renewal so almost
// certainly the lease was updated in between.
return this.leaseDocumentClient.readItem(lease.getId(),
new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
InternalObjectNode.class)
.onErrorResume( ex -> {
if (ex instanceof CosmosException) {
CosmosException e = (CosmosException) ex;
if (e.getStatusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_NOT_FOUND) {
logger.info("Partition {} failed to renew lease. The lease is gone already.", lease.getLeaseToken());
throw new LeaseLostException(lease);
}
}
return Mono.error(ex);
})
.map( documentResourceResponse -> ServiceItemLease.fromDocument(BridgeInternal.getProperties(documentResourceResponse)))
.flatMap( refreshedLease -> this.leaseUpdater.updateLease(
refreshedLease,
lease.getId(),
new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
serverLease ->
{
if (serverLease.getOwner() == null) {
logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken());
throw new LeaseLostException(lease);
}
else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
return serverLease;
})
);
}
@Override
public Mono<Lease> updateProperties(Lease lease) {
if (lease == null) {
throw new IllegalArgumentException("lease");
}
if (lease.getOwner() != null && !lease.getOwner().equalsIgnoreCase(this.settings.getHostName())) {
logger.info("Partition '{}' lease was taken over by owner '{}' before lease item update", lease.getLeaseToken(), lease.getOwner());
throw new LeaseLostException(lease);
}
return this.leaseUpdater.updateLease(
lease,
lease.getId(),
new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
serverLease -> {
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
logger.info("Partition '{}' lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
serverLease.setProperties(lease.getProperties());
return serverLease;
});
}
@Override
public Mono<Lease> checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken) {
if (lease == null) {
throw new IllegalArgumentException("lease");
}
if (continuationToken == null || continuationToken.isEmpty()) {
throw new IllegalArgumentException("continuationToken must be a non-empty string");
}
if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException());
return this.leaseDocumentClient.readItem(lease.getId(),
new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
InternalObjectNode.class)
.map( documentResourceResponse -> ServiceItemLease.fromDocument(BridgeInternal.getProperties(documentResourceResponse)))
.flatMap( refreshedLease -> {
if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException());
return this.leaseUpdater.updateLease(
refreshedLease,
lease.getId(), new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
serverLease -> {
if (serverLease.getOwner() == null) {
logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken());
throw new LeaseLostException(lease);
} else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
serverLease.setContinuationToken(continuationToken);
return serverLease;
});
})
.doOnError(throwable -> {
logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'",
lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken());
});
}
@Override
public Mono<Boolean> isInitialized() {
return this.leaseStore.isInitialized();
}
@Override
public Mono<Boolean> markInitialized() {
return this.leaseStore.markInitialized();
}
@Override
public Mono<Boolean> acquireInitializationLock(Duration lockExpirationTime) {
return this.leaseStore.acquireInitializationLock(lockExpirationTime);
}
@Override
public Mono<Boolean> releaseInitializationLock() {
return this.leaseStore.releaseInitializationLock();
}
private Flux<ServiceItemLease> listDocuments(String prefix) {
if (prefix == null || prefix.isEmpty()) {
throw new IllegalArgumentException("prefix");
}
SqlParameter param = new SqlParameter();
param.setName("@PartitionLeasePrefix");
param.setValue(prefix);
SqlQuerySpec querySpec = new SqlQuerySpec(
"SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix)",
Collections.singletonList(param));
Flux<FeedResponse<InternalObjectNode>> query = this.leaseDocumentClient.queryItems(
this.settings.getLeaseCollectionLink(),
querySpec,
this.requestOptionsFactory.createQueryRequestOptions(),
InternalObjectNode.class);
return query.flatMap( documentFeedResponse -> Flux.fromIterable(documentFeedResponse.getResults()))
.map(ServiceItemLease::fromDocument);
}
private String getDocumentId(String leaseToken)
{
return this.getPartitionLeasePrefix() + leaseToken;
}
private String getPartitionLeasePrefix() {
return this.settings.getContainerNamePrefix() + LEASE_STORE_MANAGER_LEASE_SUFFIX;
}
}