LockRenewalOperation.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.MAX_RENEWAL_BUFFER_DURATION;

/**
 * Represents a renewal session or message lock renewal operation that.
 */
class LockRenewalOperation implements AutoCloseable {
    private final ClientLogger logger = new ClientLogger(LockRenewalOperation.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicReference<OffsetDateTime> lockedUntil = new AtomicReference<>();
    private final AtomicReference<Throwable> throwable = new AtomicReference<>();
    private final AtomicReference<LockRenewalStatus> status = new AtomicReference<>(LockRenewalStatus.RUNNING);
    private final MonoProcessor<Void> cancellationProcessor = MonoProcessor.create();
    private final Mono<Void> completionMono;

    private final String lockToken;
    private final boolean isSession;
    private final Function<String, Mono<OffsetDateTime>> renewalOperation;
    private final Disposable subscription;

    /**
     * Creates a new lock renewal operation. The lock is initially renewed.
     *
     * @param lockToken Message lock or session id to renew.
     * @param maxLockRenewalDuration The maximum duration this lock should be renewed.
     * @param isSession Whether the lock represents a session lock or message lock.
     * @param renewalOperation The renewal operation to call.
     */
    LockRenewalOperation(String lockToken, Duration maxLockRenewalDuration, boolean isSession,
        Function<String, Mono<OffsetDateTime>> renewalOperation) {
        this(lockToken, maxLockRenewalDuration, isSession, renewalOperation, OffsetDateTime.now());
    }

    /**
     * Creates a new lock renewal operation.
     *
     * @param lockToken Lock or session id to renew.
     * @param tokenLockedUntil The initial period the message or session is locked until.
     * @param maxLockRenewalDuration The maximum duration this lock should be renewed.
     * @param isSession Whether the lock represents a session lock or message lock.
     * @param renewalOperation The renewal operation to call.
     */
    LockRenewalOperation(String lockToken, Duration maxLockRenewalDuration, boolean isSession,
        Function<String, Mono<OffsetDateTime>> renewalOperation, OffsetDateTime tokenLockedUntil) {
        this.lockToken = Objects.requireNonNull(lockToken, "'lockToken' cannot be null.");
        this.renewalOperation = Objects.requireNonNull(renewalOperation, "'renewalOperation' cannot be null.");
        this.isSession = isSession;

        Objects.requireNonNull(tokenLockedUntil, "'lockedUntil cannot be null.'");
        Objects.requireNonNull(maxLockRenewalDuration, "'maxLockRenewalDuration' cannot be null.");

        if (maxLockRenewalDuration.isNegative()) {
            throw logger.logExceptionAsError(new IllegalArgumentException(
                "'maxLockRenewalDuration' cannot be negative."));
        }

        this.lockedUntil.set(tokenLockedUntil);

        final Flux<OffsetDateTime> renewLockOperation = getRenewLockOperation(tokenLockedUntil,
            maxLockRenewalDuration)
            .takeUntilOther(cancellationProcessor)
            .cache(Duration.ofMinutes(2));

        this.completionMono = renewLockOperation.then();
        this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until),
            error -> {
                logger.error("token[{}]. Error occurred while renewing lock token.", error);
                status.set(LockRenewalStatus.FAILED);
                throwable.set(error);
                cancellationProcessor.onComplete();
            }, () -> {
                if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
                    logger.verbose("token[{}]. Renewing session lock task completed.", lockToken);
                }

                cancellationProcessor.onComplete();
            });
    }

    /**
     * Gets a mono that completes when the operation does.
     *
     * @return A mono that completes when the renewal operation does.
     */
    Mono<Void> getCompletionOperation() {
        return completionMono;
    }

    /**
     * Gets the current datetime the message or session is locked until.
     *
     * @return the datetime the message or session is locked until.
     */
    OffsetDateTime getLockedUntil() {
        return lockedUntil.get();
    }

    /**
     * Gets the message lock token for the renewal operation.
     *
     * @return The message lock token or {@code null} if a session is being renewed instead.
     */
    String getLockToken() {
        return isSession ? null : lockToken;
    }

    /**
     * Gets the session id for this lock renewal operation.
     *
     * @return The session id or {@code null} if it is not a session renewal.
     */
    String getSessionId() {
        return isSession ? lockToken : null;
    }

    /**
     * Gets the current status of the renewal operation.
     *
     * @return The current status of the renewal operation.
     */
    LockRenewalStatus getStatus() {
        return status.get();
    }

    /**
     * Gets the exception if an error occurred whilst renewing the message or session lock.
     *
     * @return the exception if an error occurred whilst renewing the message or session lock, otherwise {@code null}.
     */
    Throwable getThrowable() {
        return throwable.get();
    }

    /**
     * Cancels the lock renewal operation.
     */
    @Override
    public void close() {
        if (isDisposed.getAndSet(true)) {
            return;
        }

        if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.CANCELLED)) {
            logger.verbose("token[{}] Cancelled operation.", lockToken);
        }

        cancellationProcessor.onComplete();
        subscription.dispose();
    }

    /**
     * Gets the lock renewal operation. if the {@code maxLockRenewalDuration} is {@link Duration#isZero()}, then the
     * lock is never renewed.
     *
     * @param initialLockedUntil When the initial call is locked until.
     * @param maxLockRenewalDuration Duration to renew lock for.
     * @return The subscription for the operation.
     */
    private Flux<OffsetDateTime> getRenewLockOperation(OffsetDateTime initialLockedUntil,
        Duration maxLockRenewalDuration) {
        if (maxLockRenewalDuration.isZero()) {
            status.set(LockRenewalStatus.COMPLETE);
            return Flux.empty();
        }

        final EmitterProcessor<Duration> emitterProcessor = EmitterProcessor.create();
        final FluxSink<Duration> sink = emitterProcessor.sink();

        sink.next(calculateRenewalDelay(initialLockedUntil));

        final Flux<Object> cancellationSignals = Flux.first(cancellationProcessor, Mono.delay(maxLockRenewalDuration));
        return Flux.switchOnNext(emitterProcessor.map(interval -> Mono.delay(interval)
            .thenReturn(Flux.create(s -> s.next(interval)))))
            .takeUntilOther(cancellationSignals)
            .flatMap(delay -> {
                logger.info("token[{}]. now[{}]. Starting lock renewal.", lockToken, OffsetDateTime.now());
                return renewalOperation.apply(lockToken);
            })
            .map(offsetDateTime -> {
                final Duration next = Duration.between(OffsetDateTime.now(), offsetDateTime);
                logger.info("token[{}]. nextExpiration[{}]. next: [{}]. isSession[{}]", lockToken, offsetDateTime, next,
                    isSession);

                sink.next(calculateRenewalDelay(offsetDateTime));
                return offsetDateTime;
            });
    }

    private Duration calculateRenewalDelay(OffsetDateTime initialLockedUntil) {
        final OffsetDateTime now = OffsetDateTime.now();
        final Duration remainingTime = Duration.between(now, initialLockedUntil);

        // If remaining time to renew is less than 400ms, renew immediately
        // Lock renewal to follow the same behavior as C# SDK
        // C# class : github.com/azure-sdk-for-net/.../Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs#L367

        if (remainingTime.toMillis() < 400) {
            logger.info("Duration was less than 400ms. now[{}] lockedUntil[{}]", now, initialLockedUntil);
            return Duration.ZERO;
        } else {
            // Adjust the interval, so we can buffer time for the time it'll take to refresh.
            final long bufferInMilliSec = Math.min(remainingTime.toMillis() / 2,
                MAX_RENEWAL_BUFFER_DURATION.toMillis());
            final Duration renewAfter = Duration.ofMillis(remainingTime.toMillis() - bufferInMilliSec);
            if (renewAfter.isNegative()) {
                logger.info("Adjusted duration is negative. renewAfter: {}ms. Buffer: {}ms.",
                    remainingTime.toMillis(), bufferInMilliSec);
            }
            return renewAfter;
        }
    }
}