SynchronousReceiveWork.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.implementation;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import reactor.core.publisher.FluxSink;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.WORK_ID_KEY;

/**
 * Represents a synchronous receive request.
 *
 * @see SynchronousEventSubscriber
 */
public class SynchronousReceiveWork {
    private final ClientLogger logger = new ClientLogger(SynchronousReceiveWork.class);
    private final long id;
    private final AtomicInteger remaining;
    private final int numberToReceive;
    private final Duration timeout;
    private final FluxSink<PartitionEvent> emitter;

    private volatile boolean isTerminal = false;

    /**
     * Creates a new synchronous receive work.
     *
     * @param id Identifier for the work.
     * @param numberToReceive Maximum number of events to receive.
     * @param timeout Maximum duration to wait for {@code numberOfReceive} events.
     * @param emitter Sink to publish received events to.
     */
    public SynchronousReceiveWork(long id, int numberToReceive, Duration timeout, FluxSink<PartitionEvent> emitter) {
        this.id = id;
        this.remaining = new AtomicInteger(numberToReceive);
        this.numberToReceive = numberToReceive;
        this.timeout = timeout;
        this.emitter = emitter;
    }

    /**
     * Gets the unique identifier for this work.
     *
     * @return The unique identifier for this work.
     */
    public long getId() {
        return id;
    }

    /**
     * Gets the maximum duration to wait for the work to complete.
     *
     * @return The duration to wait for the work to complete.
     */
    public Duration getTimeout() {
        return timeout;
    }

    /**
     * Gets the number of events to receive.
     *
     * @return The number of events to receive.
     */
    int getNumberOfEvents() {
        return numberToReceive;
    }

    /**
     * Gets whether or not the work item has reached a terminal state.
     *
     * @return {@code true} if all the events have been fetched, it has been cancelled, or an error occurred. {@code
     *     false} otherwise.
     */
    boolean isTerminal() {
        return emitter.isCancelled() || remaining.get() == 0 || isTerminal;
    }

    /**
     * Publishes the next event data to a downstream subscriber.
     *
     * @param event Event to publish downstream.
     */
    public void next(PartitionEvent event) {
        try {
            emitter.next(event);
            remaining.decrementAndGet();
        } catch (Exception e) {
            logger.warning(Messages.EXCEPTION_OCCURRED_WHILE_EMITTING, e);
            isTerminal = true;
            emitter.error(e);
        }
    }

    /**
     * Completes the publisher. If the publisher has encountered an error, or an error has occurred, it does nothing.
     */
    public synchronized void complete() {
        if (!isTerminal || emitter.isCancelled()) {
            logger.atInfo()
                .addKeyValue(WORK_ID_KEY, id)
                .log("Completing task.");
            isTerminal = true;
            emitter.complete();
        }
    }

    /**
     * Publishes an error downstream. This is a terminal step.
     *
     * @param error Error to publish downstream.
     */
    public void error(Throwable error) {
        isTerminal = true;
        emitter.error(error);
    }
}