SynchronousMessageSubscriber.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
 * Subscriber that listens to events and publishes them downstream and publishes events to them in the order received.
 */
class SynchronousMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessage> {
    private final ClientLogger logger = new ClientLogger(SynchronousMessageSubscriber.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicInteger wip = new AtomicInteger();
    private final Queue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue<>();
    private final Queue<ServiceBusReceivedMessage> bufferMessages = new ConcurrentLinkedQueue<>();
    private final AtomicLong remaining = new AtomicLong();

    private final long requested;
    private final Object currentWorkLock = new Object();

    private Disposable currentTimeoutOperation;
    private SynchronousReceiveWork currentWork;
    private boolean subscriberInitialized;

    private volatile Subscription subscription;

    private static final AtomicReferenceFieldUpdater<SynchronousMessageSubscriber, Subscription> UPSTREAM =
        AtomicReferenceFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, Subscription.class,
            "subscription");


    SynchronousMessageSubscriber(long prefetch, SynchronousReceiveWork initialWork) {
        this.workQueue.add(initialWork);
        requested = initialWork.getNumberOfEvents() > prefetch ? initialWork.getNumberOfEvents() : prefetch;
    }

    /**
     * On an initial subscription, will take the first work item, and request that amount of work for it.
     * @param subscription Subscription for upstream.
     */
    @Override
    protected void hookOnSubscribe(Subscription subscription) {

        if (Operators.setOnce(UPSTREAM, this, subscription)) {
            this.subscription = subscription;
            subscriberInitialized = true;
            drain();
        } else {
            logger.error("Already subscribed once.");
        }
    }

    /**
     * Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of
     * the subscriber.
     * @param message Event to publish.
     */
    @Override
    protected void hookOnNext(ServiceBusReceivedMessage message) {
        bufferMessages.add(message);
        drain();
    }

    /**
     * Queue the work to be picked up by drain loop.
     * @param work to be queued.
     */
    void queueWork(SynchronousReceiveWork work) {

        logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(),
            work.getTimeout());
        workQueue.add(work);

        // Do not drain if another thread want to queue the work before we have subscriber
        if (subscriberInitialized) {
            drain();
        }
    }

    /**
     * Drain the work, only one thread can be in this loop at a time.
     */
    private void drain() {
        // If someone is already in this loop, then we are already clearing the queue.
        if (!wip.compareAndSet(0, 1)) {
            return;
        }

        try {
            drainQueue();
        } finally {
            final int decremented = wip.decrementAndGet();
            if (decremented != 0) {
                logger.warning("There should be 0, but was: {}", decremented);
            }
        }
    }

    /***
     * Drain the queue using a lock on current work in progress.
     */
    private void drainQueue() {
        if (isTerminated()) {
            return;
        }

        // Acquiring the lock
        synchronized (currentWorkLock) {

            // Making sure current work not become terminal since last drain queue cycle
            if (currentWork != null && currentWork.isTerminal()) {
                workQueue.remove(currentWork);
                if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
                    currentTimeoutOperation.dispose();
                }
                currentTimeoutOperation = null;
            }

            // We should process a work when
            // 1. it is first time getting picked up
            // 2. or more messages have arrived while we were in drain loop.
            // We might not have all the message in bufferMessages needed for workQueue, Thus we will only remove work
            // from queue when we have delivered all the messages to currentWork.

            while ((currentWork = workQueue.peek()) != null
                && (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) {
                // Additional check for safety, but normally this work should never be terminal
                if (currentWork.isTerminal()) {
                    // This work already finished by either timeout or no more messages to send, process next work.
                    workQueue.remove(currentWork);
                    if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
                        currentTimeoutOperation.dispose();
                    }
                    continue;
                }

                if (!currentWork.isProcessingStarted()) {
                    // timer to complete the currentWork in case of timeout trigger
                    currentTimeoutOperation = getTimeoutOperation(currentWork);
                    currentWork.startedProcessing();
                    final long calculatedRequest = currentWork.getNumberOfEvents() - remaining.get();
                    remaining.addAndGet(calculatedRequest);
                    subscription.request(calculatedRequest);
                }

                // Send messages to currentWork from buffer
                while (bufferMessages.size() > 0 && !currentWork.isTerminal()) {
                    currentWork.next(bufferMessages.poll());
                    remaining.decrementAndGet();
                }

                // if  we have delivered all the messages to currentWork, we will complete it.
                if (currentWork.isTerminal()) {
                    if (currentWork.getError() == null) {
                        currentWork.complete();
                    }
                    // Now remove from queue since it is complete
                    workQueue.remove(currentWork);
                    if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
                        currentTimeoutOperation.dispose();
                    }
                    logger.verbose("The work [{}] is complete.", currentWork.getId());
                }
            }
        }
    }

    /**
     * @param work on which timeout thread need to start.
     *
     * @return {@link Disposable} for the timeout operation.
     */
    private Disposable getTimeoutOperation(SynchronousReceiveWork work) {
        Duration timeout = work.getTimeout();
        return Mono.delay(timeout).thenReturn(work)
            .subscribe(l -> {
                synchronized (currentWorkLock) {
                    if (currentWork == work) {
                        work.timeout();
                    }
                }
            });
    }

    /**
     * {@inheritDoc}
     */
    @Override
    protected void hookOnError(Throwable throwable) {
        logger.error("[{}] Errors occurred upstream", currentWork.getId(), throwable);
        synchronized (currentWorkLock) {
            currentWork.error(throwable);
        }
        dispose();
    }

    @Override
    protected void hookOnCancel() {
        if (isDisposed.getAndSet(true)) {
            return;
        }

        synchronized (currentWorkLock) {
            if (currentWork != null) {
                currentWork.complete();
            }
            if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
                currentTimeoutOperation.dispose();
            }
            currentTimeoutOperation = null;
        }

        subscription.cancel();
    }

    private boolean isTerminated() {
        return isDisposed.get();
    }

    int getWorkQueueSize() {
        return this.workQueue.size();
    }

    long getRequested() {
        return this.requested;
    }

    boolean isSubscriberInitialized() {
        return this.subscriberInitialized;
    }
}