SendLinkHandler.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.amqp.implementation.handler;
import com.azure.core.util.logging.LoggingEventBuilder;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addSignalTypeAndResult;
import static com.azure.core.amqp.implementation.ClientConstants.EMIT_RESULT_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
/**
* Handler that receives events from its corresponding {@link Sender}. Handlers must be associated to a {@link Sender}
* to receive its events.
*
* @see BaseHandler#setHandler(Extendable, Handler)
* @see Sender
*/
public class SendLinkHandler extends LinkHandler {
private final String linkName;
private final String entityPath;
/**
* Indicates whether or not the link has ever been remotely active (ie. the service has acknowledged that we have
* opened a send link to the given entityPath.)
*/
private final AtomicBoolean isRemoteActive = new AtomicBoolean();
private final AtomicBoolean isTerminated = new AtomicBoolean();
private final Sinks.Many<Integer> creditProcessor = Sinks.many().unicast().onBackpressureBuffer();
private final Sinks.Many<Delivery> deliveryProcessor = Sinks.many().multicast().onBackpressureBuffer();
public SendLinkHandler(String connectionId, String hostname, String linkName, String entityPath) {
super(connectionId, hostname, entityPath);
this.linkName = Objects.requireNonNull(linkName, "'linkName' cannot be null.");
this.entityPath = entityPath;
}
public String getLinkName() {
return linkName;
}
public Flux<Integer> getLinkCredits() {
return creditProcessor.asFlux();
}
public Flux<Delivery> getDeliveredMessages() {
return deliveryProcessor.asFlux();
}
/**
* Closes the handler by completing the completing the delivery and link credit fluxes and publishes {@link
* EndpointState#CLOSED}. {@link #getEndpointStates()} is completely closed when {@link #onLinkRemoteClose(Event)},
* {@link #onLinkRemoteDetach(Event)}, or {@link #onLinkFinal(Event)} is called.
*/
@Override
public void close() {
if (isTerminated.getAndSet(true)) {
return;
}
creditProcessor.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
deliveryProcessor.emitComplete((signalType, emitResult) -> {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.addKeyValue(LINK_NAME_KEY, linkName)
.log("Unable to emit complete on deliverySink.");
return false;
});
onNext(EndpointState.CLOSED);
}
@Override
public void onLinkLocalOpen(Event event) {
final Link link = event.getLink();
if (link instanceof Sender) {
logger.atVerbose()
.addKeyValue(LINK_NAME_KEY, link.getName())
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.addKeyValue("localTarget", link.getTarget())
.log("onLinkLocalOpen");
}
}
@Override
public void onLinkRemoteOpen(Event event) {
final Link link = event.getLink();
if (!(link instanceof Sender)) {
return;
}
LoggingEventBuilder logBuilder = logger.atInfo()
.addKeyValue(LINK_NAME_KEY, link.getName())
.addKeyValue(ENTITY_PATH_KEY, entityPath);
if (link.getRemoteTarget() != null) {
logBuilder.addKeyValue("remoteTarget", link.getRemoteTarget());
if (!isRemoteActive.getAndSet(true)) {
onNext(EndpointState.ACTIVE);
}
} else {
logBuilder.addKeyValue("remoteTarget", NOT_APPLICABLE)
.addKeyValue("action", "waitingForError");
}
logBuilder.log("onLinkRemoteOpen");
}
@Override
public void onLinkFlow(Event event) {
if (!isRemoteActive.getAndSet(true)) {
onNext(EndpointState.ACTIVE);
}
final Sender sender = event.getSender();
final int credits = sender.getRemoteCredit();
creditProcessor.emitNext(credits, (signalType, emitResult) -> {
logger.atVerbose()
.addKeyValue(LINK_NAME_KEY, linkName)
.addKeyValue(EMIT_RESULT_KEY, emitResult)
.addKeyValue("credits", credits)
.log("Unable to emit credits.");
return false;
});
logger.atVerbose()
.addKeyValue(LINK_NAME_KEY, linkName)
.addKeyValue("unsettled", sender.getUnsettled())
.addKeyValue("credits", credits)
.log("onLinkFlow.");
}
@Override
public void onLinkLocalClose(Event event) {
super.onLinkLocalClose(event);
// Someone called sender.close() to set the local link state to close. Since the link was never remotely
// active, we complete getEndpointStates() ourselves.
if (!isRemoteActive.get()) {
logger.atInfo()
.addKeyValue(LINK_NAME_KEY, getLinkName())
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.log("Sender link was never active. Closing endpoint states.");
super.close();
}
}
@Override
public void onDelivery(Event event) {
Delivery delivery = event.getDelivery();
while (delivery != null) {
final Sender sender = (Sender) delivery.getLink();
final String deliveryTag = new String(delivery.getTag(), StandardCharsets.UTF_8);
logger.atVerbose()
.addKeyValue(LINK_NAME_KEY, getLinkName())
.addKeyValue("unsettled", sender.getUnsettled())
.addKeyValue("credit", sender.getRemoteCredit())
.addKeyValue("deliveryState", delivery.getRemoteState())
.addKeyValue("delivery.isBuffered", delivery.isBuffered())
.addKeyValue("delivery.id", deliveryTag)
.log("onDelivery");
deliveryProcessor.emitNext(delivery, (signalType, emitResult) -> {
logger.atWarning()
.addKeyValue(LINK_NAME_KEY, getLinkName())
.addKeyValue(EMIT_RESULT_KEY, emitResult)
.addKeyValue("delivery.id", deliveryTag)
.log("Unable to emit delivery.");
return emitResult == Sinks.EmitResult.FAIL_OVERFLOW;
});
delivery.settle();
delivery = sender.current();
}
}
}