Handler.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation.handler;

import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.EndpointState;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;

import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class Handler extends BaseHandler implements Closeable {
    private final AtomicBoolean isTerminal = new AtomicBoolean();
    private final ReplayProcessor<EndpointState> endpointStateProcessor =
        ReplayProcessor.cacheLastOrDefault(EndpointState.UNINITIALIZED);
    private final FluxSink<EndpointState> endpointSink = endpointStateProcessor.sink();
    private final String connectionId;
    private final String hostname;

    Handler(final String connectionId, final String hostname) {
        this.connectionId = connectionId;
        this.hostname = hostname;
    }

    public String getConnectionId() {
        return connectionId;
    }

    public String getHostname() {
        return hostname;
    }

    public Flux<EndpointState> getEndpointStates() {
        return endpointStateProcessor.distinct();
    }

    void onNext(EndpointState state) {
        endpointSink.next(state);
    }

    void onError(Throwable error) {
        if (isTerminal.getAndSet(true)) {
            return;
        }

        endpointSink.next(EndpointState.CLOSED);
        endpointSink.error(error);
    }

    @Override
    public void close() {
        if (isTerminal.getAndSet(true)) {
            return;
        }

        endpointSink.next(EndpointState.CLOSED);
        endpointSink.complete();
    }
}