ReactorDispatcher.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.implementation.handler.DispatchHandler;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Pipe;
import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;

/**
 * {@link Reactor} is not thread-safe - all calls to {@link Proton} APIs should be on the Reactor Thread.
 * {@link Reactor} works out-of-box for all event driven API - ex: onReceive - which could raise upon onSocketRead.
 * {@link Reactor} doesn't support APIs like send() out-of-box - which could potentially run on different thread to that
 * of the Reactor thread.
 *
 * <p>
 * The following utility class is used to generate an Event to hook into {@link Reactor}'s event delegation pattern.
 * It uses a {@link Pipe} as the IO on which Reactor listens to.
 * </p>
 *
 * <p>
 * Cardinality: Multiple {@link ReactorDispatcher}'s could be attached to 1 {@link Reactor}.
 * Each {@link ReactorDispatcher} should be initialized synchronously - as it calls API in {@link Reactor} which is not
 * thread-safe.
 * </p>
 */
public final class ReactorDispatcher {
    private final ClientLogger logger = new ClientLogger(ReactorDispatcher.class);
    private final CloseHandler onClose;
    private final Reactor reactor;
    private final Pipe ioSignal;
    private final ConcurrentLinkedQueue<Work> workQueue;
    private final WorkScheduler workScheduler;

    public ReactorDispatcher(final Reactor reactor) throws IOException {
        this.reactor = reactor;
        this.ioSignal = Pipe.open();
        this.workQueue = new ConcurrentLinkedQueue<>();
        this.onClose = new CloseHandler();
        this.workScheduler = new WorkScheduler();

        initializeSelectable();
    }

    private void initializeSelectable() {
        Selectable schedulerSelectable = this.reactor.selectable();

        schedulerSelectable.setChannel(this.ioSignal.source());
        schedulerSelectable.onReadable(this.workScheduler);
        schedulerSelectable.onFree(this.onClose);

        schedulerSelectable.setReading(true);
        this.reactor.update(schedulerSelectable);
    }

    public void invoke(final Runnable work) throws IOException {
        this.throwIfSchedulerError();

        this.workQueue.offer(new Work(work));
        this.signalWorkQueue();
    }

    public void invoke(final Runnable work, final Duration delay) throws IOException {
        this.throwIfSchedulerError();

        this.workQueue.offer(new Work(work, delay));
        this.signalWorkQueue();
    }

    private void throwIfSchedulerError() {
        // throw when the scheduler on which Reactor is running is already closed
        final RejectedExecutionException rejectedException = this.reactor.attachments()
            .get(RejectedExecutionException.class, RejectedExecutionException.class);
        if (rejectedException != null) {
            throw logger.logExceptionAsError(new RejectedExecutionException(rejectedException.getMessage(),
                rejectedException));
        }

        // throw when the pipe is in closed state - in which case,
        // signalling the new event-dispatch will fail
        if (!this.ioSignal.sink().isOpen()) {
            throw logger.logExceptionAsError(new RejectedExecutionException("ReactorDispatcher instance is closed."));
        }
    }

    private void signalWorkQueue() throws IOException {
        try {
            ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
            while (this.ioSignal.sink().write(oneByteBuffer) == 0) {
                oneByteBuffer = ByteBuffer.allocate(1);
            }
        } catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) {
            logger.info("signalWorkQueue failed with an error: {}", ignorePipeClosedDuringReactorShutdown);
        }
    }

    // Schedules work to be executed in reactor.
    private final class WorkScheduler implements Callback {
        @Override
        public void run(Selectable selectable) {
            try {
                ByteBuffer oneKbByteBuffer = ByteBuffer.allocate(1024);
                while (ioSignal.source().read(oneKbByteBuffer) > 0) {
                    // read until the end of the stream
                    oneKbByteBuffer = ByteBuffer.allocate(1024);
                }
            } catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) {
                logger.info("WorkScheduler.run() failed with an error: %s", ignorePipeClosedDuringReactorShutdown);
            } catch (IOException ioException) {
                logger.error("WorkScheduler.run() failed with an error: %s", ioException);
                throw logger.logExceptionAsError(new RuntimeException(ioException));
            }

            Work topWork;
            while ((topWork = workQueue.poll()) != null) {
                if (topWork.delay != null) {
                    reactor.schedule((int) topWork.delay.toMillis(), topWork.dispatchHandler);
                } else {
                    topWork.dispatchHandler.onTimerTask(null);
                }
            }
        }
    }

    // Disposes of the IO pipe when the reactor closes.
    private final class CloseHandler implements Callback {

        @Override
        public void run(Selectable selectable) {
            try {
                if (ioSignal.sink().isOpen()) {
                    ioSignal.sink().close();
                }
            } catch (IOException ioException) {
                logger.error("CloseHandler.run() sink().close() failed with an error. %s", ioException);
            }

            workScheduler.run(null);

            try {
                if (ioSignal.source().isOpen()) {
                    ioSignal.source().close();
                }
            } catch (IOException ioException) {
                logger.error("CloseHandler.run() source().close() failed with an error %s", ioException);
            }
        }
    }

    // Work items that are dispatched to reactor.
    private static final class Work {
        // The work item that is dispatched to Reactor.
        private final DispatchHandler dispatchHandler;
        private final Duration delay;

        Work(Runnable work) {
            this(work, null);
        }

        Work(Runnable work, Duration delay) {
            this.dispatchHandler = new DispatchHandler(work);
            this.delay = delay;
        }
    }
}