ReactorExecutor.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.scheduler.Scheduler;

import java.io.Closeable;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class ReactorExecutor implements Closeable {
    private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";

    private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
    private final AtomicBoolean hasStarted = new AtomicBoolean();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Semaphore disposeSemaphore = new Semaphore(1);

    private final Object lock = new Object();
    private final Reactor reactor;
    private final Scheduler scheduler;
    private final String connectionId;
    private final Duration timeout;
    private final AmqpExceptionHandler exceptionHandler;
    private final String hostname;

    ReactorExecutor(Reactor reactor, Scheduler scheduler, String connectionId, AmqpExceptionHandler exceptionHandler,
        Duration timeout, String hostname) {
        this.reactor = Objects.requireNonNull(reactor, "'reactor' cannot be null.");
        this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");
        this.connectionId = Objects.requireNonNull(connectionId, "'connectionId' cannot be null.");
        this.timeout = Objects.requireNonNull(timeout, "'timeout' cannot be null.");
        this.exceptionHandler = Objects.requireNonNull(exceptionHandler, "'exceptionHandler' cannot be null.");
        this.hostname = Objects.requireNonNull(hostname, "'hostname' cannot be null.");
    }

    /**
     * Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link
     * #close()} is called.
     */
    void start() {
        if (hasStarted.getAndSet(true)) {
            logger.warning("ReactorExecutor has already started.");
            return;
        }

        logger.info(LOG_MESSAGE, connectionId, "Starting reactor.");
        reactor.start();
        scheduler.schedule(this::run);
    }

    /**
     * Worker loop that tries to process events in the reactor. If there are pending items to process, will reschedule
     * the run() method again.
     */
    private void run() {
        // If this hasn't been disposed of, and we're trying to run work items on it, log a warning and return.
        if (!isDisposed.get() && !hasStarted.get()) {
            logger.warning("Cannot run work items on ReactorExecutor if ReactorExecutor.start() has not been invoked.");
            return;
        }

        boolean rescheduledReactor = false;

        try {
            final boolean shouldReschedule;
            synchronized (lock) {
                shouldReschedule = hasStarted.get() && !Thread.interrupted() && reactor.process();
            }

            if (shouldReschedule) {
                try {
                    scheduler.schedule(this::run);
                    rescheduledReactor = true;
                } catch (RejectedExecutionException exception) {
                    logger.warning(LOG_MESSAGE, connectionId,
                        "Scheduling reactor failed because the scheduler has been shut down.", exception);

                    this.reactor.attachments()
                        .set(RejectedExecutionException.class, RejectedExecutionException.class, exception);
                }
            }
        } catch (HandlerException handlerException) {
            Throwable cause = handlerException.getCause() == null
                ? handlerException
                : handlerException.getCause();

            logger.warning(LOG_MESSAGE, connectionId,
                "Unhandled exception while processing events in reactor, report this error.", handlerException);

            final String message = !CoreUtils.isNullOrEmpty(cause.getMessage())
                ? cause.getMessage()
                : !CoreUtils.isNullOrEmpty(handlerException.getMessage())
                    ? handlerException.getMessage()
                    : "Reactor encountered unrecoverable error";

            final AmqpException exception;
            final AmqpErrorContext errorContext = new AmqpErrorContext(hostname);

            if (cause instanceof UnresolvedAddressException) {
                exception = new AmqpException(true,
                    String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network "
                            + "configuration. Check correctness of namespace information. %s",
                        message, StringUtil.getTrackingIdAndTimeToLog()),
                    cause, errorContext);
            } else {
                exception = new AmqpException(true,
                    String.format(Locale.US, "%s, %s", message, StringUtil.getTrackingIdAndTimeToLog()),
                    cause, errorContext);
            }

            this.exceptionHandler.onConnectionError(exception);
        } finally {
            if (!rescheduledReactor) {
                if (hasStarted.getAndSet(false)) {
                    scheduleCompletePendingTasks();
                } else {
                    final String reason =
                        "Stopping the reactor because thread was interrupted or the reactor has no more events to "
                            + "process.";

                    logger.info(LOG_MESSAGE, connectionId, reason);
                    close(false, reason);
                }
            }
        }
    }

    private void scheduleCompletePendingTasks() {
        try {
            if (!disposeSemaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                logger.info("Unable to acquire dispose reactor semaphore within timeout to schedule pending tasks.");
            }
        } catch (InterruptedException e) {
            logger.warning("Could not acquire dispose semaphore to schedule pending tasks", e);
        }

        this.scheduler.schedule(() -> {
            logger.info(LOG_MESSAGE, connectionId, "Processing all pending tasks and closing old reactor.");
            try {
                reactor.stop();
                reactor.process();
            } catch (HandlerException e) {
                logger.warning(LOG_MESSAGE, connectionId,
                    StringUtil.toStackTraceString(e, "scheduleCompletePendingTasks - exception occurred while "
                        + "processing events."));
            } finally {
                try {
                    reactor.free();
                } catch (IllegalStateException ignored) {
                    // Since reactor is not thread safe, it is possible that another thread has already disposed of the
                    // session before we were able to schedule this work.
                }

                disposeSemaphore.release();
            }
        });
    }

    @Override
    public void close() {
        if (!isDisposed.getAndSet(true)) {
            close(true, "ReactorExecutor.close() was called.");
        }
    }

    private void close(boolean isUserInitialized, String reason) {
        if (!hasStarted.getAndSet(false)) {
            return;
        }

        if (isUserInitialized) {
            scheduleCompletePendingTasks();
            // wait for the scheduled pending tasks to complete
            try {
                if (!disposeSemaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    logger.info("Unable to acquire dispose reactor semaphore within timeout.");
                }
            } catch (InterruptedException e) {
                logger.warning("Could not acquire semaphore to finish close operation.", e);
            }
        }
        exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, isUserInitialized, reason));
        scheduler.dispose();
    }
}