AmqpReceiveLinkProcessor.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;

import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.CREDITS_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.TRACKING_ID_KEY;

/**
 * Processes AMQP receive links into a stream of AMQP messages.
 */
public class AmqpReceiveLinkProcessor extends FluxProcessor<AmqpReceiveLink, Message> implements Subscription {
    private final ClientLogger logger = new ClientLogger(AmqpReceiveLinkProcessor.class);
    private final Object lock = new Object();
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final AtomicInteger retryAttempts = new AtomicInteger();
    private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<>();
    private final AtomicBoolean linkHasNoCredits = new AtomicBoolean();
    private final Object creditsAdded = new Object();

    private final AtomicReference<CoreSubscriber<? super Message>> downstream = new AtomicReference<>();
    private final AtomicInteger wip = new AtomicInteger();

    private final int prefetch;
    private final String entityPath;
    private final Disposable parentConnection;
    private final int maxQueueSize;

    private volatile Throwable lastError;
    private volatile boolean isCancelled;
    private volatile AmqpReceiveLink currentLink;
    private volatile String currentLinkName;
    private volatile Disposable currentLinkSubscriptions;

    // Opting to use AtomicReferenceFieldUpdater because Project Reactor provides utility methods that calculates
    // backpressure requests, sets the upstream correctly, and reports its state.
    private volatile Subscription upstream;
    private static final AtomicReferenceFieldUpdater<AmqpReceiveLinkProcessor, Subscription> UPSTREAM =
        AtomicReferenceFieldUpdater.newUpdater(AmqpReceiveLinkProcessor.class, Subscription.class,
            "upstream");

    /**
     * The number of requested messages.
     */
    private volatile long requested;
    private static final AtomicLongFieldUpdater<AmqpReceiveLinkProcessor> REQUESTED =
        AtomicLongFieldUpdater.newUpdater(AmqpReceiveLinkProcessor.class, "requested");

    /**
     * Creates an instance of {@link AmqpReceiveLinkProcessor}.
     *
     * @param prefetch The number if messages to initially fetch.
     * @param parentConnection Represents the parent connection.
     *
     * @throws NullPointerException if {@code retryPolicy} is null.
     * @throws IllegalArgumentException if {@code prefetch} is less than 0.
     */
    public AmqpReceiveLinkProcessor(String entityPath, int prefetch, Disposable parentConnection) {
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.parentConnection = Objects.requireNonNull(parentConnection, "'parentConnection' cannot be null.");

        if (prefetch < 0) {
            throw logger.logExceptionAsError(new IllegalArgumentException("'prefetch' cannot be less than 0."));
        }

        this.prefetch = prefetch;
        this.maxQueueSize = prefetch * 2;
    }

    /**
     * Gets the error associated with this processor.
     *
     * @return Error associated with this processor. {@code null} if there is no error.
     */
    @Override
    public Throwable getError() {
        return lastError;
    }

    /**
     * Gets whether or not the processor is terminated.
     *
     * @return {@code true} if the processor has terminated; false otherwise.
     */
    @Override
    public boolean isTerminated() {
        return isTerminated.get() || isCancelled;
    }

    /**
     * When a subscription is obtained from upstream publisher.
     *
     * @param subscription Subscription to upstream publisher.
     */
    @Override
    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "'subscription' cannot be null");
        logger.info("Setting new subscription for receive link processor");

        if (!Operators.setOnce(UPSTREAM, this, subscription)) {
            throw logger.logExceptionAsError(new IllegalStateException("Cannot set upstream twice."));
        }

        requestUpstream();
    }

    @Override
    public int getPrefetch() {
        return prefetch;
    }

    /**
     * When the next AMQP link is fetched.
     *
     * @param next The next AMQP receive link.
     */
    @Override
    public void onNext(AmqpReceiveLink next) {
        Objects.requireNonNull(next, "'next' cannot be null.");

        if (isTerminated()) {
            logger.atWarning()
                .addKeyValue(LINK_NAME_KEY, next.getLinkName())
                .addKeyValue(ENTITY_PATH_KEY, next.getEntityPath())
                .log("Got another link when we have already terminated processor.");
            Operators.onNextDropped(next, currentContext());
            return;
        }

        final String linkName = next.getLinkName();

        logger.atInfo()
            .addKeyValue(LINK_NAME_KEY, linkName)
            .addKeyValue(ENTITY_PATH_KEY, next.getEntityPath())
            .log("Setting next AMQP receive link.");

        final AmqpReceiveLink oldChannel;
        final Disposable oldSubscription;
        synchronized (lock) {
            oldChannel = currentLink;
            oldSubscription = currentLinkSubscriptions;

            currentLink = next;
            currentLinkName = next.getLinkName();

            // Empty credit listener is invoked when there are no credits left on the underlying link.
            next.setEmptyCreditListener(() -> {
                final int credits;
                synchronized (creditsAdded) {
                    credits = getCreditsToAdd();

                    // This means that considering the downstream request and current size of the message queue, we
                    // have enough messages to satisfy them.
                    // Thus, there are no credits on the link AND we are not going to add anymore.
                    // We'll wait until the next time downstream calls request(long) to get more events.
                    if (credits < 1) {
                        linkHasNoCredits.compareAndSet(false, true);
                    } else {
                        logger.atInfo()
                            .addKeyValue(LINK_NAME_KEY, linkName)
                            .addKeyValue(ENTITY_PATH_KEY, next.getEntityPath())
                            .addKeyValue(CREDITS_KEY, credits)
                            .log("Link is empty. Adding more credits.");
                    }
                }

                return credits;
            });

            currentLinkSubscriptions = Disposables.composite(
                // For a new link, add the prefetch as credits.
                next.getEndpointStates().filter(e -> e == AmqpEndpointState.ACTIVE).next()
                    .flatMap(state -> {
                        // If there was already a subscriber downstream who made a request, see if that is more than
                        // the prefetch. If it is, then add the difference. (ie. if they requested 500, but our
                        // prefetch is 100, we'll add 500 credits rather than 100.
                        final Mono<Void> operation;
                        synchronized (creditsAdded) {
                            final int creditsToAdd = getCreditsToAdd();
                            final int total = Math.max(prefetch, creditsToAdd);

                            logger.atVerbose()
                                .addKeyValue(LINK_NAME_KEY, linkName)
                                .addKeyValue("prefetch", prefetch)
                                .addKeyValue(CREDITS_KEY, creditsToAdd)
                                .log("Adding initial credits.");

                            operation = next.addCredits(total);
                        }

                        return operation;
                    })
                    .subscribe(noop -> {
                    }, error -> logger.atInfo().addKeyValue(LINK_NAME_KEY, linkName).log("Link was already closed. Could not add credits.")),
                next.getEndpointStates().subscribeOn(Schedulers.boundedElastic()).subscribe(
                    state -> {
                        // Connection was successfully opened, we can reset the retry interval.
                        if (state == AmqpEndpointState.ACTIVE) {
                            logger.atInfo()
                                .addKeyValue(LINK_NAME_KEY, linkName)
                                .addKeyValue(CREDITS_KEY, next.getCredits())
                                .log("Link is active.");
                            retryAttempts.set(0);
                        }
                    },
                    error -> {
                        if (error instanceof AmqpException) {
                            AmqpException amqpException = (AmqpException) error;
                            if (amqpException.getErrorCondition() == AmqpErrorCondition.LINK_STOLEN
                                && amqpException.getContext() != null
                                && amqpException.getContext() instanceof LinkErrorContext) {
                                LinkErrorContext errorContext = (LinkErrorContext) amqpException.getContext();
                                if (currentLink != null
                                    && !currentLink.getLinkName().equals(errorContext.getTrackingId())) {
                                    logger.atInfo()
                                        .addKeyValue(LINK_NAME_KEY, linkName)
                                        .addKeyValue(ENTITY_PATH_KEY, entityPath)
                                        .addKeyValue(TRACKING_ID_KEY, errorContext.getTrackingId())
                                        .log("Link lost signal received for a link that is not current. Ignoring the error.");
                                    return;
                                }
                            }
                        }

                        currentLink = null;
                        onError(error);
                    },
                    () -> {
                        if (parentConnection.isDisposed() || isTerminated()
                            || UPSTREAM.get(this) == Operators.cancelledSubscription()) {

                            logger.atInfo()
                                .addKeyValue(LINK_NAME_KEY, linkName)
                                .addKeyValue(ENTITY_PATH_KEY, entityPath)
                                .log("Terminal state reached. Disposing of link processor.");

                            dispose();
                        } else {
                            logger.atInfo()
                                .addKeyValue(LINK_NAME_KEY, linkName)
                                .addKeyValue(ENTITY_PATH_KEY, entityPath)
                                .log("Receive link endpoint states are closed. Requesting another.");

                            final AmqpReceiveLink existing = currentLink;
                            currentLink = null;
                            currentLinkName = null;

                            disposeReceiver(existing);
                            requestUpstream();
                        }
                    }),
                next.receive()
                    .onBackpressureBuffer(maxQueueSize, BufferOverflowStrategy.ERROR)
                    .subscribe(message -> {
                        messageQueue.add(message);
                        drain();
                    }));
        }

        disposeReceiver(oldChannel);

        if (oldSubscription != null) {
            oldSubscription.dispose();
        }
    }

    /**
     * Sets up the downstream subscriber.
     *
     * @param actual The downstream subscriber.
     *
     * @throws IllegalStateException if there is already a downstream subscriber.
     */
    @Override
    public void subscribe(CoreSubscriber<? super Message> actual) {
        Objects.requireNonNull(actual, "'actual' cannot be null.");

        final boolean terminateSubscriber = isTerminated()
            || (currentLink == null && upstream == Operators.cancelledSubscription());
        if (isTerminated()) {
            logger.atInfo()
                .addKeyValue(LINK_NAME_KEY, currentLinkName)
                .addKeyValue(ENTITY_PATH_KEY, entityPath)
                .log("AmqpReceiveLink is already terminated.");
        } else if (currentLink == null && upstream == Operators.cancelledSubscription()) {
            logger.info("There is no current link and upstream is terminated.");
        }

        if (terminateSubscriber) {
            actual.onSubscribe(Operators.emptySubscription());
            if (hasError()) {
                actual.onError(lastError);
            } else {
                actual.onComplete();
            }

            return;
        }

        if (downstream.compareAndSet(null, actual)) {
            actual.onSubscribe(this);
            drain();
        } else {
            Operators.error(actual, logger.logExceptionAsError(new IllegalStateException(
                "There is already one downstream subscriber.'")));
        }
    }

    /**
     * When an error occurs from the upstream publisher. If the {@code throwable} is a transient failure, another AMQP
     * element is requested if the {@link AmqpRetryPolicy} allows. Otherwise, the processor closes.
     *
     * @param throwable Error that occurred in upstream publisher.
     */
    @Override
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable, "'throwable' is required.");

        logger.atInfo()
            .addKeyValue(LINK_NAME_KEY, currentLinkName)
            .log("Error on receive link.", throwable);

        if (isTerminated() || isCancelled) {
            logger.atInfo()
                .addKeyValue(LINK_NAME_KEY, currentLinkName)
                .log("AmqpReceiveLinkProcessor is terminated. Cannot process another error.", throwable);
            Operators.onErrorDropped(throwable, currentContext());
            return;
        }

        if (parentConnection.isDisposed()) {
            logger.atInfo()
                .addKeyValue(LINK_NAME_KEY, currentLinkName)
                .log("Parent connection is disposed. Not reopening on error.");
        }

        lastError = throwable;
        isTerminated.set(true);

        final CoreSubscriber<? super Message> subscriber = downstream.get();
        if (subscriber != null) {
            subscriber.onError(throwable);
        }

        onDispose();
    }

    /**
     * When the upstream publisher has no more items to emit.
     */
    @Override
    public void onComplete() {
        logger.atInfo()
            .addKeyValue(LINK_NAME_KEY, currentLinkName)
            .log("Receive link completed from upstream.");

        UPSTREAM.set(this, Operators.cancelledSubscription());
    }

    @Override
    public void dispose() {
        if (isTerminated.getAndSet(true)) {
            return;
        }

        logger.atInfo()
            .addKeyValue(LINK_NAME_KEY, currentLinkName)
            .log("Disposing receive link.");

        drain();
        onDispose();
    }

    /**
     * When downstream subscriber makes a back-pressure request.
     */
    @Override
    public void request(long request) {
        if (!Operators.validate(request)) {
            logger.warning("Invalid request: {}", request);
            return;
        }

        Operators.addCap(REQUESTED, this, request);

        addCreditsToLink("Backpressure request from downstream. Request: " + request);
        drain();
    }

    /**
     * When downstream subscriber cancels their subscription.
     */
    @Override
    public void cancel() {
        if (isCancelled) {
            return;
        }

        isCancelled = true;
        drain();
    }

    /**
     * Requests another receive link from upstream.
     */
    private void requestUpstream() {
        if (isTerminated()) {
            logger.info("Processor is terminated. Not requesting another link.");
            return;
        } else if (UPSTREAM.get(this) == null) {
            logger.info("There is no upstream. Not requesting another link.");
            return;
        } else if (UPSTREAM.get(this) == Operators.cancelledSubscription()) {
            logger.info("Upstream is cancelled or complete. Not requesting another link.");
            return;
        }

        synchronized (lock) {
            if (currentLink != null) {
                logger.info("Current link exists. Not requesting another link.");
                return;
            }
        }

        logger.info("Requesting a new AmqpReceiveLink from upstream.");
        UPSTREAM.get(this).request(1L);
    }

    private void onDispose() {
        disposeReceiver(currentLink);

        currentLink = null;
        currentLinkName = null;

        if (currentLinkSubscriptions != null) {
            currentLinkSubscriptions.dispose();
        }

        Operators.onDiscardQueueWithClear(messageQueue, currentContext(), null);
    }

    private void drain() {
        // If there are multiple threads that enter this, they'll have incremented the wip number, and we'll know
        // how many were 'missed'.
        if (wip.getAndIncrement() != 0) {
            return;
        }

        int missed = 1;

        while (missed != 0) {
            drainQueue();

            // If there are multiple threads that tried to enter this, we would have missed some, so we'll go back
            // through the loop until we have not missed any other work.
            missed = wip.addAndGet(-missed);
        }
    }

    private void drainQueue() {
        final CoreSubscriber<? super Message> subscriber = downstream.get();
        if (subscriber == null || checkAndSetTerminated()) {
            return;
        }

        long numberRequested = REQUESTED.get(this);
        boolean isEmpty = messageQueue.isEmpty();
        while (numberRequested != 0L && !isEmpty) {
            if (checkAndSetTerminated()) {
                break;
            }

            long numberEmitted = 0L;
            while (numberRequested != numberEmitted) {
                if (isEmpty && checkAndSetTerminated()) {
                    break;
                }

                Message message = messageQueue.poll();
                if (message == null) {
                    break;
                }

                if (isCancelled) {
                    Operators.onDiscard(message, subscriber.currentContext());
                    Operators.onDiscardQueueWithClear(messageQueue, subscriber.currentContext(), null);
                    return;
                }

                try {
                    subscriber.onNext(message);
                } catch (Exception e) {
                    logger.atError()
                        .addKeyValue(LINK_NAME_KEY, currentLinkName)
                        .addKeyValue(ENTITY_PATH_KEY, entityPath)
                        .log("Exception occurred while handling downstream onNext operation.", e);

                    throw logger.logExceptionAsError(Exceptions.propagate(
                        Operators.onOperatorError(upstream, e, message, subscriber.currentContext())));
                }

                numberEmitted++;
                isEmpty = messageQueue.isEmpty();
            }

            final long requestedMessages = REQUESTED.get(this);
            if (requestedMessages != Long.MAX_VALUE) {
                numberRequested = REQUESTED.addAndGet(this, -numberEmitted);
            }
        }

        // In the drain loop, add credits only when the queue is empty, link has no credits and there are still
        // outstanding requests to be fulfilled.
        AmqpReceiveLink link = this.currentLink;
        if (numberRequested > 0L && isEmpty && link != null && link.getCredits() <= 0) {
            addCreditsToLink("Adding more credits in drain loop.");
        }
    }

    private boolean checkAndSetTerminated() {
        if (!isTerminated()) {
            return false;
        }

        final CoreSubscriber<? super Message> subscriber = downstream.get();
        final Throwable error = lastError;
        if (error != null) {
            subscriber.onError(error);
        } else {
            subscriber.onComplete();
        }

        disposeReceiver(currentLink);

        messageQueue.clear();
        return true;
    }

    /**
     * Consolidates all credits calculation when checking to see if more should be added. This is invoked in
     * {@link #drainQueue()} and {@link #request(long)}.
     *
     * Calculates if there are enough credits to satisfy the downstream subscriber. If there is not AND the link has no
     * more credits, we will add them onto the link.
     *
     * In the case that the link has some credits, but _not_ enough to satisfy the request, when the link is empty, it
     * will call {@link AmqpReceiveLink#setEmptyCreditListener(Supplier)} to get how much is remaining.
     *
     * @param message Additional message for context.
     */
    private void addCreditsToLink(String message) {
        synchronized (creditsAdded) {
            final AmqpReceiveLink link = currentLink;
            final int credits = getCreditsToAdd();

            if (link == null) {

                logger.atVerbose()
                    .addKeyValue(ENTITY_PATH_KEY, entityPath)
                    .addKeyValue(CREDITS_KEY, credits)
                    .log("There is no link to add credits to.");

                return;
            }

            final String linkName = link.getLinkName();

            if (credits < 1) {
                logger.atVerbose()
                    .addKeyValue(LINK_NAME_KEY, linkName)
                    .addKeyValue(ENTITY_PATH_KEY, entityPath)
                    .addKeyValue(CREDITS_KEY, credits)
                    .log("There are no additional credits to add.");

                return;
            }

            int currentLinkCredits = link.getCredits();
            // Add more credits if the link has fewer credits than prefetch value. This allows users to control how
            // many events to buffer on the client and also control the throughput. If users need higher throughput,
            // they can set a higher prefetch number and allocate larger heap size accordingly.
            if (currentLinkCredits < prefetch) {
                logger.atInfo()
                    .addKeyValue(LINK_NAME_KEY, linkName)
                    .addKeyValue(ENTITY_PATH_KEY, entityPath)
                    .addKeyValue(CREDITS_KEY, credits)
                    .addKeyValue("message", message)
                    .log("Link running low on credits. Adding more.");
                link.addCredits(credits).subscribe(noop -> {
                }, error -> {
                    logger.atInfo()
                        .addKeyValue(LINK_NAME_KEY, linkName)
                        .addKeyValue(ENTITY_PATH_KEY, entityPath)
                        .log("Link was already closed. Could not add credits.");

                    linkHasNoCredits.compareAndSet(false, true);
                });
            }
        }
    }

    /**
     * Gets the number of credits to add based on {@link #requested} and how many messages are still in queue.
     * If {@link #requested} is {@link Long#MAX_VALUE}, which indicates no-backpressure,
     * then we use the {@link #prefetch} value as credit.
     *
     * @return The number of credits to add.
     */
    private int getCreditsToAdd() {
        final CoreSubscriber<? super Message> subscriber = downstream.get();
        final long request = REQUESTED.get(this);

        final int credits;
        if (subscriber == null || request == 0) {
            credits = 0;
        } else if (request == Long.MAX_VALUE) {
            credits = prefetch;
        } else {
            final int remaining = Long.valueOf(request).intValue() - messageQueue.size();
            credits = Math.max(remaining, 0);
        }

        return credits;
    }

    private void disposeReceiver(AmqpReceiveLink link) {
        if (link == null) {
            return;
        }

        try {
            ((AsyncCloseable) link).closeAsync().subscribe();
        } catch (Exception error) {
            logger.atWarning()
                .addKeyValue(LINK_NAME_KEY, link.getLinkName())
                .addKeyValue(ENTITY_PATH_KEY, entityPath)
                .log("Unable to dispose of link.", error);
        }
    }
}