AmqpChannelProcessor.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.logging.ClientLogger;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>, CoreSubscriber<T>, Disposable {
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<AmqpChannelProcessor, Subscription> UPSTREAM =
        AtomicReferenceFieldUpdater.newUpdater(AmqpChannelProcessor.class, Subscription.class,
            "upstream");

    private final ClientLogger logger;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicBoolean isRequested = new AtomicBoolean();
    private final AtomicBoolean isRetryPending = new AtomicBoolean();
    private final AtomicInteger retryAttempts = new AtomicInteger();

    private final Object lock = new Object();
    private final AmqpRetryPolicy retryPolicy;
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final Function<T, Flux<AmqpEndpointState>> endpointStatesFunction;

    private volatile Subscription upstream;
    private volatile ConcurrentLinkedDeque<ChannelSubscriber<T>> subscribers = new ConcurrentLinkedDeque<>();
    private volatile Throwable lastError;
    private volatile T currentChannel;
    private volatile Disposable connectionSubscription;
    private volatile Disposable retrySubscription;

    public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
        Function<T, Flux<AmqpEndpointState>> endpointStatesFunction, AmqpRetryPolicy retryPolicy, ClientLogger logger) {
        this.fullyQualifiedNamespace = Objects
            .requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction,
            "'endpointStates' cannot be null.");
        this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
        this.logger = Objects.requireNonNull(logger, "'logger' cannot be null.");
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        if (Operators.setOnce(UPSTREAM, this, subscription)) {
            // Request the first connection on a subscription.
            isRequested.set(true);
            subscription.request(1);
        } else {
            logger.warning("Processors can only be subscribed to once.");
        }
    }

    @Override
    public void onNext(T amqpChannel) {
        logger.info("namespace[{}] entityPath[{}]: Setting next AMQP channel.", fullyQualifiedNamespace, entityPath);

        Objects.requireNonNull(amqpChannel, "'amqpChannel' cannot be null.");

        final T oldChannel;
        final Disposable oldSubscription;
        synchronized (lock) {
            oldChannel = currentChannel;
            oldSubscription = connectionSubscription;

            currentChannel = amqpChannel;

            final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
            logger.info("namespace[{}] entityPath[{}]: Next AMQP channel received, updating {} current "
                + "subscribers", fullyQualifiedNamespace, entityPath, subscribers.size());

            currentSubscribers.forEach(subscription -> subscription.onNext(amqpChannel));

            connectionSubscription = endpointStatesFunction.apply(amqpChannel).subscribe(
                state -> {
                    // Connection was successfully opened, we can reset the retry interval.
                    if (state == AmqpEndpointState.ACTIVE) {
                        retryAttempts.set(0);
                        logger.info("namespace[{}] entityPath[{}]: Channel is now active.",
                            fullyQualifiedNamespace, entityPath);
                    }
                },
                error -> {
                    setAndClearChannel();
                    onError(error);
                },
                () -> {
                    if (isDisposed()) {
                        logger.info("namespace[{}] entityPath[{}]: Channel is disposed.",
                            fullyQualifiedNamespace, entityPath);
                    } else {
                        logger.info("namespace[{}] entityPath[{}]: Channel is closed.",
                            fullyQualifiedNamespace, entityPath);
                        setAndClearChannel();
                    }
                });
        }

        close(oldChannel);

        if (oldSubscription != null) {
            oldSubscription.dispose();
        }

        isRequested.set(false);
    }

    @Override
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable, "'throwable' is required.");

        if (isRetryPending.get() && retryPolicy.calculateRetryDelay(throwable, retryAttempts.get()) != null) {
            logger.warning("Retry is already pending. Ignoring transient error.", throwable);
            return;
        }

        int attemptsMade = retryAttempts.incrementAndGet();

        if (throwable instanceof AmqpException) {
            AmqpException amqpException = (AmqpException) throwable;
            // Connection processor should never be disposed if the underlying error is transient.
            // So, we never exhaust the retry attempts for transient errors. This will ensure a new connection
            // will be created whenever the underlying transient error is resolved. For e.g. when a network
            // connection is lost for an extended period of time and when the network is restored later, we should be
            // able to recreate a new AMQP connection.
            if (amqpException.isTransient()) {
                logger.verbose("Attempted {} times to get a new AMQP connection", attemptsMade);
                // for the purpose of computing delay, we'll use the min of retry attempts or max retries set in
                // the retry policy to get the max delay duration.
                attemptsMade = Math.min(attemptsMade, retryPolicy.getMaxRetries());
            }
        }
        final int attempt = attemptsMade;
        final Duration retryInterval = retryPolicy.calculateRetryDelay(throwable, attempt);

        if (retryInterval != null) {
            // There was already a retry in progress, so we decrement the value because we don't want to make two retry
            // attempts concurrently.
            if (isRetryPending.getAndSet(true)) {
                retryAttempts.decrementAndGet();
                return;
            }

            logger.warning("Retry #{}. Transient error occurred. Retrying after {} ms.",
                attempt, retryInterval.toMillis(), throwable);

            retrySubscription = Mono.delay(retryInterval).subscribe(i -> {
                if (isDisposed()) {
                    logger.info("Retry #{}. Not requesting from upstream. Processor is disposed.", attempt);
                } else {
                    logger.info("Retry #{}. Requesting from upstream.", attempt);

                    requestUpstream();
                    isRetryPending.set(false);
                }
            });

            return;
        }

        logger.warning("Non-retryable error occurred in connection.", throwable);
        lastError = throwable;
        isDisposed.set(true);
        dispose();

        synchronized (lock) {
            final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
            subscribers = new ConcurrentLinkedDeque<>();
            logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} "
                    + "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size());

            currentSubscribers.forEach(subscriber -> subscriber.onError(throwable));
        }
    }

    @Override
    public void onComplete() {
        logger.info("Upstream connection publisher was completed. Terminating processor.");

        isDisposed.set(true);
        synchronized (lock) {
            final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
            subscribers = new ConcurrentLinkedDeque<>();
            logger.info("namespace[{}] entityPath[{}]: AMQP channel processor completed. Notifying {} "
                + "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size());
            currentSubscribers.forEach(subscriber -> subscriber.onComplete());
        }
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        if (isDisposed()) {
            if (lastError != null) {
                actual.onSubscribe(Operators.emptySubscription());
                actual.onError(lastError);
            } else {
                Operators.error(actual, logger.logExceptionAsError(new IllegalStateException(
                    String.format("namespace[%s] entityPath[%s]: Cannot subscribe. Processor is already terminated.",
                        fullyQualifiedNamespace, entityPath))));
            }

            return;
        }

        final ChannelSubscriber<T> subscriber = new ChannelSubscriber<T>(actual, this);
        actual.onSubscribe(subscriber);

        synchronized (lock) {
            if (currentChannel != null) {
                subscriber.complete(currentChannel);
                return;
            }
        }

        subscribers.add(subscriber);
        logger.verbose("Added a subscriber {} to AMQP channel processor. Total "
                + "subscribers = {}", subscriber, subscribers.size());

        if (!isRetryPending.get()) {
            requestUpstream();
        }
    }

    @Override
    public void dispose() {
        if (isDisposed.getAndSet(true)) {
            return;
        }

        if (retrySubscription != null && !retrySubscription.isDisposed()) {
            retrySubscription.dispose();
        }

        onComplete();

        synchronized (lock) {
            setAndClearChannel();
        }
    }

    @Override
    public boolean isDisposed() {
        return isDisposed.get();
    }

    private void requestUpstream() {
        if (currentChannel != null) {
            logger.verbose("namespace[{}] entityPath[{}]: Connection exists, not requesting another.",
                fullyQualifiedNamespace, entityPath);
            return;
        } else if (isDisposed()) {
            logger.verbose("namespace[{}] entityPath[{}]: Is already disposed.", fullyQualifiedNamespace, entityPath);
            return;
        }

        final Subscription subscription = UPSTREAM.get(this);
        if (subscription == null) {
            logger.warning("namespace[{}] entityPath[{}]: There is no upstream subscription.",
                fullyQualifiedNamespace, entityPath);
            return;
        }

        // subscribe(CoreSubscriber) may have requested a subscriber already.
        if (!isRequested.getAndSet(true)) {
            logger.info("namespace[{}] entityPath[{}]: Connection not requested, yet. Requesting one.",
                fullyQualifiedNamespace, entityPath);
            subscription.request(1);
        }
    }

    private void setAndClearChannel() {
        T oldChannel;
        synchronized (lock) {
            oldChannel = currentChannel;
            currentChannel = null;
        }

        close(oldChannel);
    }

    private void close(T channel) {
        if (channel instanceof AutoCloseable) {
            try {
                ((AutoCloseable) channel).close();
            } catch (Exception error) {
                logger.warning("Error occurred closing item.", channel);
            }
        } else if (channel instanceof Disposable) {
            ((Disposable) channel).dispose();
        }
    }

    /**
     * Checks the current state of the channel for this channel and returns true if the channel is null or if this
     * processor is disposed.
     *
     * @return true if the current channel in the processor is null or if the processor is disposed
     */
    public boolean isChannelClosed() {
        synchronized (lock) {
            return currentChannel == null || isDisposed();
        }
    }

    /**
     * Represents a subscriber, waiting for an AMQP connection.
     */
    private static final class ChannelSubscriber<T> extends Operators.MonoSubscriber<T, T> {
        private final AmqpChannelProcessor<T> processor;

        private ChannelSubscriber(CoreSubscriber<? super T> actual, AmqpChannelProcessor<T> processor) {
            super(actual);
            this.processor = processor;
        }

        @Override
        public void cancel() {
            super.cancel();
            processor.subscribers.remove(this);
        }

        @Override
        public void onComplete() {
            if (!isCancelled()) {
                actual.onComplete();
            }
        }

        @Override
        public void onNext(T channel) {
            if (!isCancelled()) {
                super.complete(channel);
            }
        }

        @Override
        public void onError(Throwable throwable) {
            if (!isCancelled()) {
                actual.onError(throwable);
            } else {
                Operators.onOperatorError(throwable, currentContext());
            }
        }
    }
}