ServiceBusReceiveLinkProcessor.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus.implementation;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static com.azure.core.util.FluxUtil.monoError;
/**
* Processes AMQP receive links into a stream of AMQP messages.
*
* This is almost a carbon copy of AmqpReceiveLinkProcessor. When we can abstract it from proton-j, it would be nice to
* unify this.
*/
public class ServiceBusReceiveLinkProcessor extends FluxProcessor<ServiceBusReceiveLink, Message>
implements Subscription {
private final ClientLogger logger = new ClientLogger(ServiceBusReceiveLinkProcessor.class);
private final Object lock = new Object();
private final Object queueLock = new Object();
private final AtomicBoolean isTerminated = new AtomicBoolean();
private final AtomicInteger retryAttempts = new AtomicInteger();
private final AtomicReference<String> linkName = new AtomicReference<>();
// Queue containing all the prefetched messages.
private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<>();
// size() on Deque is O(n) operation, so we use an integer to keep track. All reads and writes to this are gated by
// the `queueLock`.
private final AtomicInteger pendingMessages = new AtomicInteger();
private final int minimumNumberOfMessages;
private final int prefetch;
private final AtomicReference<CoreSubscriber<? super Message>> downstream = new AtomicReference<>();
private final AtomicInteger wip = new AtomicInteger();
private final AmqpRetryPolicy retryPolicy;
private volatile Throwable lastError;
private volatile boolean isCancelled;
private volatile ServiceBusReceiveLink currentLink;
private volatile Disposable currentLinkSubscriptions;
private volatile Disposable retrySubscription;
// Opting to use AtomicReferenceFieldUpdater because Project Reactor provides utility methods that calculates
// backpressure requests, sets the upstream correctly, and reports its state.
private volatile long requested;
private static final AtomicLongFieldUpdater<ServiceBusReceiveLinkProcessor> REQUESTED =
AtomicLongFieldUpdater.newUpdater(ServiceBusReceiveLinkProcessor.class, "requested");
private volatile Subscription upstream;
private static final AtomicReferenceFieldUpdater<ServiceBusReceiveLinkProcessor, Subscription> UPSTREAM =
AtomicReferenceFieldUpdater.newUpdater(ServiceBusReceiveLinkProcessor.class, Subscription.class,
"upstream");
/**
* Creates an instance of {@link ServiceBusReceiveLinkProcessor}.
*
* @param prefetch The number if messages to initially fetch.
* @param retryPolicy Retry policy to apply when fetching a new AMQP channel.
*
* @throws NullPointerException if {@code retryPolicy} is null.
* @throws IllegalArgumentException if {@code prefetch} is less than 0.
*/
public ServiceBusReceiveLinkProcessor(int prefetch, AmqpRetryPolicy retryPolicy) {
this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
if (prefetch < 0) {
throw logger.logExceptionAsError(
new IllegalArgumentException("'prefetch' cannot be less than 0."));
}
this.prefetch = prefetch;
// When the queue has this number of messages left, it's time to add more credits to refill the prefetch queue.
this.minimumNumberOfMessages = Math.floorDiv(prefetch, 3);
}
public String getLinkName() {
return linkName.get();
}
public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
if (isDisposed()) {
return monoError(logger, new IllegalStateException(String.format(
"lockToken[%s]. state[%s]. Cannot update disposition on closed processor.", lockToken, deliveryState)));
}
final ServiceBusReceiveLink link = currentLink;
if (link == null) {
return monoError(logger, new IllegalStateException(String.format(
"lockToken[%s]. state[%s]. Cannot update disposition with no link.", lockToken, deliveryState)));
}
return link.updateDisposition(lockToken, deliveryState).onErrorResume(error -> {
if (error instanceof AmqpException) {
AmqpException amqpException = (AmqpException) error;
if (AmqpErrorCondition.TIMEOUT_ERROR.equals(amqpException.getErrorCondition())) {
return link.closeAsync().then(Mono.error(error));
}
}
return Mono.error(error);
});
}
/**
* Gets the error associated with this processor.
*
* @return Error associated with this processor. {@code null} if there is no error.
*/
@Override
public Throwable getError() {
return lastError;
}
/**
* Gets whether or not the processor is terminated.
*
* @return {@code true} if the processor has terminated; false otherwise.
*/
@Override
public boolean isTerminated() {
return isTerminated.get() || isCancelled;
}
@Override
public int getPrefetch() {
return prefetch;
}
/**
* When a subscription is obtained from upstream publisher.
*
* @param subscription Subscription to upstream publisher.
*/
@Override
public void onSubscribe(Subscription subscription) {
Objects.requireNonNull(subscription, "'subscription' cannot be null");
if (!Operators.setOnce(UPSTREAM, this, subscription)) {
throw logger.logExceptionAsError(new IllegalStateException("Cannot set upstream twice."));
}
requestUpstream();
}
/**
* When the next AMQP link is fetched.
*
* @param next The next AMQP receive link.
*/
@Override
public void onNext(ServiceBusReceiveLink next) {
Objects.requireNonNull(next, "'next' cannot be null.");
if (isTerminated()) {
logger.warning("linkName[{}] entityPath[{}]. Got another link when we have already terminated processor.",
next.getLinkName(), next.getEntityPath());
Operators.onNextDropped(next, currentContext());
return;
}
final String linkName = next.getLinkName();
final String entityPath = next.getEntityPath();
logger.info("linkName[{}] entityPath[{}]. Setting next AMQP receive link.", linkName, entityPath);
final AmqpReceiveLink oldChannel;
final Disposable oldSubscription;
synchronized (lock) {
oldChannel = currentLink;
oldSubscription = currentLinkSubscriptions;
currentLink = next;
next.setEmptyCreditListener(() -> 0);
currentLinkSubscriptions = Disposables.composite(
next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> {
synchronized (queueLock) {
messageQueue.add(message);
pendingMessages.incrementAndGet();
}
drain();
}),
next.getEndpointStates().subscribeOn(Schedulers.boundedElastic()).subscribe(
state -> {
// Connection was successfully opened, we can reset the retry interval.
if (state == AmqpEndpointState.ACTIVE) {
retryAttempts.set(0);
}
},
error -> {
currentLink = null;
onError(error);
},
() -> {
if (isTerminated()) {
logger.info("Processor is terminated. Disposing of link processor.");
dispose();
} else if (upstream == Operators.cancelledSubscription()) {
logger.info("Upstream has completed. Disposing of link processor.");
dispose();
} else {
logger.info("Receive link endpoint states are closed. Requesting another.");
final AmqpReceiveLink existing = currentLink;
currentLink = null;
disposeReceiver(existing);
requestUpstream();
}
}));
}
checkAndAddCredits(next);
disposeReceiver(oldChannel);
if (oldSubscription != null) {
oldSubscription.dispose();
}
}
/**
* Sets up the downstream subscriber.
*
* @param actual The downstream subscriber.
*
* @throws IllegalStateException if there is already a downstream subscriber.
*/
@Override
public void subscribe(CoreSubscriber<? super Message> actual) {
Objects.requireNonNull(actual, "'actual' cannot be null.");
final boolean terminateSubscriber = isTerminated()
|| (currentLink == null && upstream == Operators.cancelledSubscription());
if (isTerminated()) {
final AmqpReceiveLink link = currentLink;
final String linkName = link != null ? link.getLinkName() : "n/a";
final String entityPath = link != null ? link.getEntityPath() : "n/a";
logger.info("linkName[{}] entityPath[{}]. AmqpReceiveLink is already terminated.", linkName, entityPath);
} else if (currentLink == null && upstream == Operators.cancelledSubscription()) {
logger.info("There is no current link and upstream is terminated.");
}
if (terminateSubscriber) {
actual.onSubscribe(Operators.emptySubscription());
if (hasError()) {
actual.onError(lastError);
} else {
actual.onComplete();
}
return;
}
if (downstream.compareAndSet(null, actual)) {
actual.onSubscribe(this);
drain();
} else {
Operators.error(actual, logger.logExceptionAsError(new IllegalStateException(
"There is already one downstream subscriber.'")));
}
}
/**
* When an error occurs from the upstream publisher. If the {@code throwable} is a transient failure, another AMQP
* element is requested if the {@link AmqpRetryPolicy} allows. Otherwise, the processor closes.
*
* @param throwable Error that occurred in upstream publisher.
*/
@Override
public void onError(Throwable throwable) {
Objects.requireNonNull(throwable, "'throwable' is required.");
if (isTerminated()) {
logger.info("AmqpReceiveLinkProcessor is terminated. Not reopening on error.");
return;
}
final int attempt = retryAttempts.incrementAndGet();
final Duration retryInterval = retryPolicy.calculateRetryDelay(throwable, attempt);
final AmqpReceiveLink link = currentLink;
final String linkName = link != null ? link.getLinkName() : "n/a";
final String entityPath = link != null ? link.getEntityPath() : "n/a";
if (retryInterval != null && upstream != Operators.cancelledSubscription()) {
logger.warning("linkName[{}] entityPath[{}]. Transient error occurred. Attempt: {}. Retrying after {} ms.",
linkName, entityPath, attempt, retryInterval.toMillis(), throwable);
retrySubscription = Mono.delay(retryInterval).subscribe(i -> requestUpstream());
return;
}
logger.warning("linkName[{}] entityPath[{}]. Non-retryable error occurred in AMQP receive link.",
linkName, entityPath, throwable);
lastError = throwable;
isTerminated.set(true);
final CoreSubscriber<? super Message> subscriber = downstream.get();
if (subscriber != null) {
subscriber.onError(throwable);
}
onDispose();
}
@Override
public void dispose() {
if (isTerminated.getAndSet(true)) {
return;
}
drain();
onDispose();
}
/**
* When upstream has completed emitting messages.
*/
@Override
public void onComplete() {
this.upstream = Operators.cancelledSubscription();
}
/**
* When downstream subscriber makes a back-pressure request.
*/
@Override
public void request(long request) {
if (!Operators.validate(request)) {
logger.warning("Invalid request: {}", request);
return;
}
Operators.addCap(REQUESTED, this, request);
final AmqpReceiveLink link = currentLink;
if (link == null) {
return;
}
checkAndAddCredits(link);
drain();
}
/**
* When downstream subscriber cancels their subscription.
*/
@Override
public void cancel() {
if (isCancelled) {
return;
}
isCancelled = true;
drain();
}
/**
* Requests another receive link from upstream.
*/
private void requestUpstream() {
if (isTerminated()) {
logger.info("Processor is terminated. Not requesting another link.");
return;
} else if (upstream == null) {
logger.info("There is no upstream. Not requesting another link.");
return;
} else if (upstream == Operators.cancelledSubscription()) {
logger.info("Upstream is cancelled or complete. Not requesting another link.");
return;
}
synchronized (lock) {
if (currentLink != null) {
logger.info("Current link exists. Not requesting another link.");
return;
}
}
logger.info("Requesting a new AmqpReceiveLink from upstream.");
upstream.request(1L);
}
private void onDispose() {
if (retrySubscription != null && !retrySubscription.isDisposed()) {
retrySubscription.dispose();
}
disposeReceiver(currentLink);
currentLink = null;
if (currentLinkSubscriptions != null) {
currentLinkSubscriptions.dispose();
}
}
private void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
while (missed != 0) {
drainQueue();
missed = wip.addAndGet(-missed);
}
}
private void drainQueue() {
final CoreSubscriber<? super Message> subscriber = downstream.get();
if (subscriber == null || checkAndSetTerminated()) {
return;
}
long numberRequested = REQUESTED.get(this);
boolean isEmpty = messageQueue.isEmpty();
while (numberRequested != 0L && !isEmpty) {
if (checkAndSetTerminated()) {
break;
}
long numberEmitted = 0L;
while (numberRequested != numberEmitted) {
if (isEmpty && checkAndSetTerminated()) {
break;
}
final Message message = messageQueue.poll();
if (message == null) {
break;
}
if (isCancelled) {
Operators.onDiscard(message, subscriber.currentContext());
synchronized (queueLock) {
Operators.onDiscardQueueWithClear(messageQueue, subscriber.currentContext(), null);
pendingMessages.set(0);
}
return;
}
try {
subscriber.onNext(message);
// RECEIVE_DELETE Mode: No need to settle message because they're automatically settled by the link.
// PEEK_LOCK Mode: Consider message processed, as `onNext` is complete, So decrement the count.
pendingMessages.decrementAndGet();
if (prefetch > 0) { // re-fill messageQueue if there is prefetch configured.
checkAndAddCredits(currentLink);
}
} catch (Exception e) {
logger.error("Exception occurred while handling downstream onNext operation.", e);
throw logger.logExceptionAsError(Exceptions.propagate(
Operators.onOperatorError(upstream, e, message, subscriber.currentContext())));
}
numberEmitted++;
isEmpty = messageQueue.isEmpty();
}
if (REQUESTED.get(this) != Long.MAX_VALUE) {
numberRequested = REQUESTED.addAndGet(this, -numberEmitted);
}
}
}
private boolean checkAndSetTerminated() {
if (!isTerminated()) {
return false;
}
final CoreSubscriber<? super Message> subscriber = downstream.get();
final Throwable error = lastError;
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onComplete();
}
disposeReceiver(currentLink);
synchronized (queueLock) {
messageQueue.clear();
pendingMessages.set(0);
}
return true;
}
private void checkAndAddCredits(AmqpReceiveLink link) {
if (link == null) {
return;
}
synchronized (lock) {
final int linkCredits = link.getCredits();
final int credits = getCreditsToAdd(linkCredits);
if (credits > 0) {
link.addCredits(credits).subscribe();
}
}
}
private int getCreditsToAdd(int linkCredits) {
final CoreSubscriber<? super Message> subscriber = downstream.get();
final long r = REQUESTED.get(this);
final boolean hasBackpressure = r != Long.MAX_VALUE;
if (subscriber == null || r == 0) {
logger.info("Not adding credits. No downstream subscribers or items requested.");
return 0;
}
final int creditsToAdd;
final int expectedTotalCredit;
if (prefetch == 0) {
if (r <= Integer.MAX_VALUE) {
expectedTotalCredit = (int) r;
} else {
//This won't really happen in reality.
//For async client, receiveMessages() calls "return receiveMessagesNoBackPressure().limitRate(1, 0);".
//So it will request one by one from this link processor, even though the user's request has no
//back pressure.
//For sync client, the sync subscriber has back pressure.
//The request count uses the argument of method receiveMessages(int maxMessages).
//It's at most Integer.MAX_VALUE.
expectedTotalCredit = Integer.MAX_VALUE;
}
} else {
expectedTotalCredit = prefetch;
}
synchronized (queueLock) {
final int queuedMessages = pendingMessages.get();
final int pending = queuedMessages + linkCredits;
if (hasBackpressure) {
creditsToAdd = Math.max(expectedTotalCredit - pending, 0);
} else {
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
creditsToAdd = minimumNumberOfMessages >= queuedMessages
? Math.max(expectedTotalCredit - pending, 0)
: 0;
}
logger.info("prefetch: '{}', requested: '{}', linkCredits: '{}', expectedTotalCredit: '{}', queuedMessages:"
+ "'{}', creditsToAdd: '{}', messageQueue.size(): '{}'", getPrefetch(), r, linkCredits,
expectedTotalCredit, queuedMessages, creditsToAdd, messageQueue.size());
}
return creditsToAdd;
}
private void disposeReceiver(AmqpReceiveLink link) {
if (link == null) {
return;
}
try {
((AsyncCloseable) link).closeAsync().subscribe();
} catch (Exception error) {
logger.warning("linkName[{}] entityPath[{}] Unable to dispose of link.", link.getLinkName(),
link.getEntityPath(), error);
}
}
}