FluxAutoLockRenew.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.LockContainer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import org.reactivestreams.Publisher;
import reactor.util.context.Context;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Receives messages from to upstream, subscribe lock renewal subscriber.
*/
final class FluxAutoLockRenew extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
private final ClientLogger logger = new ClientLogger(FluxAutoLockRenew.class);
private final Function<String, Mono<OffsetDateTime>> onRenewLock;
private final LockContainer<LockRenewalOperation> messageLockContainer;
private final ReceiverOptions receivingOptions;
/**
* Build a {@link FluxOperator} wrapper around the passed parent {@link Publisher}
* @param source the {@link Publisher} to decorate
*
* @throws NullPointerException If {@code onRenewLock}, {@code messageLockContainer},
* {@code ReceiverOptions} or {@code maxAutoLockRenewDuration} is null.
*
* @throws IllegalArgumentException If eceiverOptions.maxLockRenewalDuration is zero or negative.
*/
FluxAutoLockRenew(
Flux<? extends ServiceBusMessageContext> source, ReceiverOptions receiverOptions,
LockContainer<LockRenewalOperation> messageLockContainer, Function<String,
Mono<OffsetDateTime>> onRenewLock) {
super(source);
this.receivingOptions = Objects.requireNonNull(receiverOptions, "'receiverOptions' cannot be null.");
this.onRenewLock = Objects.requireNonNull(onRenewLock, "'onRenewLock' cannot be null.");
this.messageLockContainer = Objects.requireNonNull(messageLockContainer,
"'messageLockContainer' cannot be null.");
Duration maxAutoLockRenewDuration = receiverOptions.getMaxLockRenewDuration();
Objects.requireNonNull(maxAutoLockRenewDuration,
"'receivingOptions.maxAutoLockRenewDuration' cannot be null.");
if (maxAutoLockRenewDuration.isNegative() || maxAutoLockRenewDuration.isZero()) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"'receivingOptions.maxLockRenewalDuration' should not be zero or negative."));
}
}
@Override
public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
final LockRenewSubscriber newLockRenewSubscriber = new LockRenewSubscriber(coreSubscriber,
receivingOptions.getMaxLockRenewDuration(), messageLockContainer, onRenewLock,
receivingOptions.isEnableAutoComplete());
source.subscribe(newLockRenewSubscriber);
}
/**
* Receives messages from to upstream, pushes them downstream and start lock renewal.
*/
static final class LockRenewSubscriber extends BaseSubscriber<ServiceBusMessageContext> {
private static final Consumer<ServiceBusMessageContext> LOCK_RENEW_NO_OP = messageContext -> { };
private final ClientLogger logger = new ClientLogger(LockRenewSubscriber.class);
private final Function<String, Mono<OffsetDateTime>> onRenewLock;
private final Duration maxAutoLockRenewal;
private final LockContainer<LockRenewalOperation> messageLockContainer;
private final CoreSubscriber<? super ServiceBusMessageContext> actual;
private final boolean isAutoCompleteEnabled;
LockRenewSubscriber(CoreSubscriber<? super ServiceBusMessageContext> actual,
Duration maxAutoLockRenewDuration, LockContainer<LockRenewalOperation> messageLockContainer,
Function<String, Mono<OffsetDateTime>> onRenewLock, boolean isAutoCompleteEnabled) {
this.onRenewLock = Objects.requireNonNull(onRenewLock, "'onRenewLock' cannot be null.");
this.actual = Objects.requireNonNull(actual, "'downstream' cannot be null.");
this.messageLockContainer = Objects.requireNonNull(messageLockContainer,
"'messageLockContainer' cannot be null.");
this.maxAutoLockRenewal = Objects.requireNonNull(maxAutoLockRenewDuration,
"'maxAutoLockRenewDuration' cannot be null.");
this.isAutoCompleteEnabled = isAutoCompleteEnabled;
}
/**
* On an initial subscription, will take the first work item, and request that amount of work for it.
* @param subscription Subscription for upstream.
*/
@Override
protected void hookOnSubscribe(Subscription subscription) {
Objects.requireNonNull(subscription, "'subscription' cannot be null.");
actual.onSubscribe(subscription);
}
/**
* When upstream has completed emitting messages.
*/
@Override
public void hookOnComplete() {
logger.verbose("Upstream has completed.");
actual.onComplete();
}
@Override
protected void hookOnError(Throwable throwable) {
logger.error("Errors occurred upstream.", throwable);
actual.onError(throwable);
}
@Override
protected void hookOnNext(ServiceBusMessageContext messageContext) {
final ServiceBusReceivedMessage message = messageContext.getMessage();
final Consumer<ServiceBusMessageContext> lockCleanup;
if (message != null) {
final String lockToken = message.getLockToken();
final OffsetDateTime lockedUntil = message.getLockedUntil();
final LockRenewalOperation renewOperation;
if (Objects.isNull(lockToken)) {
logger.warning("Unexpected, LockToken is not present in message. sequenceNumber[{}].",
message.getSequenceNumber());
return;
} else if (Objects.isNull(lockedUntil)) {
logger.warning("Unexpected, lockedUntil is not present in message. sequenceNumber[{}].",
message.getSequenceNumber());
return;
}
final Function<String, Mono<OffsetDateTime>> onRenewLockUpdateMessage = onRenewLock.andThen(updated ->
updated.map(newLockedUntil -> {
message.setLockedUntil(newLockedUntil);
return newLockedUntil;
}));
renewOperation = new LockRenewalOperation(lockToken, maxAutoLockRenewal, false,
onRenewLockUpdateMessage, lockedUntil);
try {
messageLockContainer.addOrUpdate(lockToken, OffsetDateTime.now().plus(maxAutoLockRenewal),
renewOperation);
} catch (Exception e) {
logger.info("Exception occurred while updating lockContainer for token [{}].", lockToken, e);
}
lockCleanup = context -> {
renewOperation.close();
messageLockContainer.remove(context.getMessage().getLockToken());
};
} else {
lockCleanup = LOCK_RENEW_NO_OP;
}
try {
actual.onNext(messageContext);
} catch (Exception e) {
logger.info("Exception occurred while handling downstream onNext operation.", e);
} finally {
if (isAutoCompleteEnabled) {
lockCleanup.accept(messageContext);
}
}
}
@Override
public Context currentContext() {
return actual.currentContext();
}
}
}