SynchronousReceiveWork.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Synchronous work for receiving messages.
 */
class SynchronousReceiveWork implements AutoCloseable {

    /* When we have received at-least one message and next message does not arrive in this time. The work will
    complete.*/
    private static final Duration TIMEOUT_BETWEEN_MESSAGES = Duration.ofMillis(1000);

    private final ClientLogger logger = new ClientLogger(SynchronousReceiveWork.class);
    private final long id;
    private final AtomicInteger remaining;
    private final int numberToReceive;
    private final Duration timeout;
    private final FluxSink<ServiceBusReceivedMessage> emitter;
    private final FluxSink<ServiceBusReceivedMessage> messageReceivedSink;
    private final DirectProcessor<ServiceBusReceivedMessage> emitterProcessor;
    // Subscribes to next message from upstream and implement short timeout between the messages.
    private final Disposable nextMessageSubscriber;

    // Indicate state that timeout has occurred for this work.
    private boolean workTimedOut = false;

    // Indicate that if processing started or not.
    private boolean processingStarted;

    private volatile Throwable error = null;

    /**
     * Creates a new synchronous receive work.
     *
     * @param id Identifier for the work.
     * @param numberToReceive Maximum number of events to receive.
     * @param timeout Maximum duration to wait for {@code numberOfReceive} events.
     * @param emitter Sink to publish received messages to.
     */
    SynchronousReceiveWork(long id, int numberToReceive, Duration timeout,
        FluxSink<ServiceBusReceivedMessage> emitter) {
        this.id = id;
        this.remaining = new AtomicInteger(numberToReceive);
        this.numberToReceive = numberToReceive;
        this.timeout = timeout;
        this.emitter = emitter;

        emitterProcessor = DirectProcessor.create();
        messageReceivedSink = emitterProcessor.sink();

        nextMessageSubscriber = Flux.switchOnNext(emitterProcessor.map(messageContext ->
            Flux.interval(TIMEOUT_BETWEEN_MESSAGES)))
            .handle((delay, sink) -> {
                logger.info("[{}]: Timeout between the messages occurred. Completing the work.", id);
                sink.next(delay);
                emitter.complete();
            })
        .subscribe();
    }

    /**
     * Gets the unique identifier for this work.
     *
     * @return The unique identifier for this work.
     */
    long getId() {
        return id;
    }

    /**
     * Gets the maximum duration to wait for the work to complete.
     *
     * @return The duration to wait for the work to complete.
     */
    Duration getTimeout() {
        return timeout;
    }

    /**
     * Gets the number of events to receive.
     *
     * @return The number of events to receive.
     */
    int getNumberOfEvents() {
        return numberToReceive;
    }

    /**
     * @return remaining events to receive.
     */
    int getRemaining() {
        return remaining.get();
    }

    /**
     * Gets whether or not the work item has reached a terminal state.
     *
     * @return {@code true} if all the events have been fetched, it has been cancelled, or an error occurred. {@code
     *     false} otherwise.
     */
    boolean isTerminal() {
        return emitter.isCancelled() || remaining.get() == 0 || error != null || workTimedOut;
    }

    /**
     * Publishes the next message to a downstream subscriber.
     *
     * @param message Event to publish downstream.
     */
    void next(ServiceBusReceivedMessage message) {
        try {
            emitter.next(message);
            messageReceivedSink.next(message);
            remaining.decrementAndGet();
        } catch (Exception e) {
            logger.warning("Exception occurred while publishing downstream.", e);
            error(e);
        }
    }

    /**
     * Completes the publisher. If the publisher has encountered an error, or an error has occurred, it does nothing.
     */
    void complete() {
        logger.info("[{}]: Completing task.", id);
        emitter.complete();
        close();
    }

    /**
     * Completes the publisher and sets the state to timeout.
     */
    void timeout() {
        logger.info("[{}]: Work timeout occurred. Completing the work.", id);
        emitter.complete();
        workTimedOut = true;
        close();
    }

    /**
     * Publishes an error downstream. This is a terminal step.
     *
     * @param error Error to publish downstream.
     */
    void error(Throwable error) {
        this.error = error;
        emitter.error(error);
        close();
    }

    /**
     * Returns the error object.
     * @return the error.
     */
    Throwable getError() {
        return this.error;
    }

    /**
     * Indiate that processing is started for this work.
     */
    void startedProcessing() {
        this.processingStarted = true;
    }

    /**
     *
     * @return flag indicting that processing is started or not.
     */
    boolean isProcessingStarted() {
        return this.processingStarted;
    }

    @Override
    public void close() {
        if (!nextMessageSubscriber.isDisposed()) {
            nextMessageSubscriber.dispose();
        }
    }
}