SendLinkHandler.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation.handler;

import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;

public class SendLinkHandler extends LinkHandler {
    private final String linkName;
    private final String entityPath;
    private final AtomicBoolean isFirstFlow = new AtomicBoolean(true);
    private final UnicastProcessor<Integer> creditProcessor = UnicastProcessor.create();
    private final DirectProcessor<Delivery> deliveryProcessor = DirectProcessor.create();
    private final FluxSink<Integer> creditSink = creditProcessor.sink();
    private final FluxSink<Delivery> deliverySink = deliveryProcessor.sink();

    public SendLinkHandler(String connectionId, String hostname, String linkName, String entityPath) {
        super(connectionId, hostname, entityPath, new ClientLogger(SendLinkHandler.class));
        this.linkName = linkName;
        this.entityPath = entityPath;
    }

    public String getLinkName() {
        return linkName;
    }

    public Flux<Integer> getLinkCredits() {
        return creditProcessor;
    }

    public Flux<Delivery> getDeliveredMessages() {
        return deliveryProcessor;
    }

    @Override
    public void close() {
        creditSink.complete();
        deliverySink.complete();
        super.close();
    }

    @Override
    public void onLinkLocalOpen(Event event) {
        final Link link = event.getLink();
        if (link instanceof Sender) {
            logger.verbose("onLinkLocalOpen connectionId[{}], entityPath[{}], linkName[{}], localTarget[{}]",
                getConnectionId(), entityPath, link.getName(), link.getTarget());
        }
    }

    @Override
    public void onLinkRemoteOpen(Event event) {
        final Link link = event.getLink();
        if (!(link instanceof Sender)) {
            return;
        }

        if (link.getRemoteTarget() != null) {
            logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteTarget[{}]",
                getConnectionId(), entityPath, link.getName(), link.getRemoteTarget());

            if (isFirstFlow.getAndSet(false)) {
                onNext(EndpointState.ACTIVE);
            }
        } else {
            logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteTarget[null],"
                    + " remoteSource[null], action[waitingForError]",
                getConnectionId(), entityPath, link.getName());
        }
    }

    @Override
    public void onLinkFlow(Event event) {
        if (isFirstFlow.getAndSet(false)) {
            onNext(EndpointState.ACTIVE);
        }

        final Sender sender = event.getSender();
        creditSink.next(sender.getRemoteCredit());

        logger.verbose("onLinkFlow connectionId[{}], entityPath[{}], linkName[{}], unsettled[{}], credit[{}]",
            getConnectionId(), entityPath, sender.getName(), sender.getUnsettled(), sender.getCredit());
    }

    @Override
    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();

        while (delivery != null) {
            Sender sender = (Sender) delivery.getLink();

            logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], unsettled[{}], credit[{}],"
                    + " deliveryState[{}], delivery.isBuffered[{}], delivery.id[{}]",
                getConnectionId(), entityPath, sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(),
                delivery.getRemoteState(), delivery.isBuffered(), new String(delivery.getTag(),
                    StandardCharsets.UTF_8));

            deliverySink.next(delivery);
            delivery.settle();
            delivery = sender.current();
        }
    }
}