SynchronousReceiveWork.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Synchronous work for receiving messages.
 */
class SynchronousReceiveWork {

    /* When we have received at-least one message and next message does not arrive in this time. The work will
    complete.*/
    private static final Duration TIMEOUT_BETWEEN_MESSAGES = Duration.ofMillis(1000);

    private final ClientLogger logger = new ClientLogger(SynchronousReceiveWork.class);
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final Duration timeout;

    private final long id;
    private final AtomicInteger remaining;
    private final int numberToReceive;

    // Emits the messages downstream.
    private final Sinks.Many<ServiceBusReceivedMessage> downstreamEmitter;

    // Composite subscriptions for both the overall timeout and timeout between messages.
    private final Disposable.Composite timeoutSubscriptions;

    // Indicate state that timeout has occurred for this work.
    private final AtomicBoolean isTerminal = new AtomicBoolean();

    /**
     * Creates a new synchronous receive work.
     *
     * @param id Identifier for the work.
     * @param numberToReceive Maximum number of events to receive.
     * @param timeout Maximum duration to wait for {@code numberOfReceive} events.
     * @param emitter Sink to publish received messages to.
     */
    SynchronousReceiveWork(long id, int numberToReceive, Duration timeout,
        Sinks.Many<ServiceBusReceivedMessage> emitter) {
        this.id = id;
        this.remaining = new AtomicInteger(numberToReceive);
        this.numberToReceive = numberToReceive;
        this.timeout = timeout;
        this.downstreamEmitter = emitter;
        this.timeoutSubscriptions = Disposables.composite();
    }

    /**
     * Gets the unique identifier for this work.
     *
     * @return The unique identifier for this work.
     */
    long getId() {
        return id;
    }

    /**
     * Gets the maximum duration to wait for the work to complete.
     *
     * @return The duration to wait for the work to complete.
     */
    Duration getTimeout() {
        return timeout;
    }

    /**
     * Gets the number of events to receive.
     *
     * @return The number of events to receive.
     */
    int getNumberOfEvents() {
        return numberToReceive;
    }

    /**
     * Gets the number of events left to receive.
     *
     * @return The number of events to receive.
     */
    int getRemainingEvents() {
        return remaining.get();
    }

    /**
     * Starts the timer for the work.
     */
    synchronized void start() {
        if (isStarted.getAndSet(true)) {
            return;
        }

        this.timeoutSubscriptions.add(
            Mono.delay(timeout).subscribe(
                index -> complete("Timeout elapsed for work."),
                error -> complete("Error occurred while waiting for timeout.", error)));
        this.timeoutSubscriptions.add(
            Flux.switchOnNext(downstreamEmitter.asFlux().map(messageContext -> Mono.delay(TIMEOUT_BETWEEN_MESSAGES)))
                .subscribe(delayElapsed -> {
                    complete("Timeout between the messages occurred. Completing the work.");
                }, error -> {
                    complete("Error occurred while waiting for timeout between messages.", error);
                }));
    }

    /**
     * Gets whether or not the work item has reached a terminal state.
     *
     * @return {@code true} if all the events have been fetched, it has been cancelled, or an error occurred. {@code
     *     false} otherwise.
     */
    synchronized boolean isTerminal() {
        return isTerminal.get();
    }

    /**
     * Publishes the next message to a downstream subscriber.
     *
     * @param message Event to publish downstream.
     *
     * @return true if the work could be emitted downstream. False if it could not be.
     */
    synchronized boolean emitNext(ServiceBusReceivedMessage message) {
        if (isTerminal.get()) {
            return false;
        }

        if (!isStarted.get()) {
            start();
        }

        final int numberLeft = remaining.decrementAndGet();

        if (numberLeft < 0) {
            logger.info("workId[{}] Number left {} < 0. Not emitting downstream.", id, numberLeft);
            return false;
        }

        final Sinks.EmitResult result = downstreamEmitter.tryEmitNext(message);
        if (result != Sinks.EmitResult.OK) {
            logger.info("workId[{}] Could not emit downstream. EmitResult: {}", id, result);
            return false;
        }

        // All events are emitted, so complete the synchronous work item. Next loop, it'll return false.
        if (numberLeft == 0) {
            complete(null);
        }

        return true;
    }

    /**
     * Completes the publisher.
     *
     * @param message Message to log.
     */
    void complete(String message) {
        complete(message, null);
    }

    /**
     * Completes the publisher. If the publisher has encountered an error, or an error has occurred, it does nothing.
     *
     * @param message Message to log. Null if there is no message to log.
     * @param error Error if one occurred. Null otherwise.
     */
    void complete(String message, Throwable error) {
        if (isTerminal.getAndSet(true)) {
            return;
        }

        if (message != null) {
            if (error == null) {
                logger.verbose("workId[{}] {}", id, message);
            } else {
                logger.warning("workId[{}] {}", id, message, error);
            }
        }

        try {
            timeoutSubscriptions.dispose();
        } finally {
            if (error == null) {
                downstreamEmitter.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            } else {
                downstreamEmitter.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST);
            }
        }
    }
}