LeaseRenewerImpl.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.LeaseRenewer;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.Instant;
/**
* Implementation for the {@link LeaseRenewer}.
*/
class LeaseRenewerImpl implements LeaseRenewer {
private static final Logger logger = LoggerFactory.getLogger(LeaseRenewerImpl.class);
private final LeaseManager leaseManager;
private final Duration leaseRenewInterval;
private Lease lease;
private RuntimeException resultException;
public LeaseRenewerImpl(Lease lease, LeaseManager leaseManager, Duration leaseRenewInterval) {
this.lease = lease;
this.leaseManager = leaseManager;
this.leaseRenewInterval = leaseRenewInterval;
}
@Override
public Mono<Void> run(CancellationToken cancellationToken) {
logger.info("Partition {}: renewer task started.", this.lease.getLeaseToken());
return Mono.just(this)
.flatMap(value -> {
if (cancellationToken.isCancellationRequested()) {
return Mono.empty();
}
Instant stopTimer = Instant.now().plus(this.leaseRenewInterval);
return Mono.just(value)
.delayElement(Duration.ofMillis(100), CosmosSchedulers.COSMOS_PARALLEL)
.repeat( () -> {
Instant currentTime = Instant.now();
return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
}).last();
})
.flatMap(value -> {
if (cancellationToken.isCancellationRequested()) {
return Mono.empty();
}
return this.renew(cancellationToken);
})
.repeat(() -> {
if (cancellationToken.isCancellationRequested()) {
logger.info("Partition {}: renewer task stopped.", this.lease.getLeaseToken());
}
return !cancellationToken.isCancellationRequested();
})
.then()
.doOnError(throwable -> {
if (throwable instanceof LeaseLostException) {
logger.info("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
} else {
logger.error("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), throwable);
}
});
}
@Override
public RuntimeException getResultException() {
return this.resultException;
}
private Mono<Lease> renew(CancellationToken cancellationToken) {
if (cancellationToken.isCancellationRequested()) {
return Mono.empty();
}
return this.leaseManager.renew(this.lease)
.map(renewedLease -> {
if (renewedLease != null) {
this.lease = renewedLease;
}
logger.info("Partition {}: renewed lease with result {}", this.lease.getLeaseToken(), renewedLease != null);
return renewedLease;
})
.onErrorResume(throwable -> {
if (throwable instanceof LeaseLostException) {
LeaseLostException lle = (LeaseLostException) throwable;
this.resultException = lle;
logger.error("Partition {}: lost lease on renew.", this.lease.getLeaseToken(), lle);
return Mono.error(lle);
}
logger.error("Partition {}: failed to renew lease.", this.lease.getLeaseToken(), throwable);
return Mono.empty();
});
}
}