ReactorReceiver.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.amqp.implementation;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
/**
* Handles receiving events from Event Hubs service and translating them to proton-j messages.
*/
public class ReactorReceiver implements AmqpReceiveLink {
// Initial value is true because we could not have created this receiver without authorising against the CBS node
// first.
private final AtomicBoolean hasAuthorized = new AtomicBoolean(true);
private final String entityPath;
private final Receiver receiver;
private final ReceiveLinkHandler handler;
private final TokenManager tokenManager;
private final ReactorDispatcher dispatcher;
private final Disposable subscriptions;
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final EmitterProcessor<Message> messagesProcessor;
private final ClientLogger logger = new ClientLogger(ReactorReceiver.class);
private final ReplayProcessor<AmqpEndpointState> endpointStates;
private final AtomicReference<Supplier<Integer>> creditSupplier = new AtomicReference<>();
protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandler handler,
TokenManager tokenManager, ReactorDispatcher dispatcher) {
this.entityPath = entityPath;
this.receiver = receiver;
this.handler = handler;
this.tokenManager = tokenManager;
this.dispatcher = dispatcher;
this.messagesProcessor = this.handler.getDeliveredMessages()
.map(this::decodeDelivery)
.doOnNext(next -> {
if (receiver.getRemoteCredit() == 0 && !isDisposed.get()) {
final Supplier<Integer> supplier = creditSupplier.get();
if (supplier == null) {
return;
}
final Integer credits = supplier.get();
if (credits != null && credits > 0) {
addCredits(credits);
}
}
})
.subscribeWith(EmitterProcessor.create());
this.endpointStates = this.handler.getEndpointStates()
.map(state -> {
logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", handler.getConnectionId(),
entityPath, getLinkName(), state);
return AmqpEndpointStateUtil.getConnectionState(state);
})
.subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
this.subscriptions = this.tokenManager.getAuthorizationResults().subscribe(
response -> {
logger.verbose("Token refreshed: {}", response);
hasAuthorized.set(true);
}, error -> {
logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]",
handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage());
hasAuthorized.set(false);
}, () -> hasAuthorized.set(false));
}
@Override
public Flux<AmqpEndpointState> getEndpointStates() {
return endpointStates.distinct();
}
@Override
public Flux<Message> receive() {
return messagesProcessor;
}
@Override
public void addCredits(int credits) {
if (!isDisposed.get()) {
try {
dispatcher.invoke(() -> receiver.flow(credits));
} catch (IOException e) {
logger.warning("Unable to schedule work to add more credits.", e);
}
}
}
@Override
public void addCreditsInstantly(int credits) {
receiver.flow(credits);
}
@Override
public int getCredits() {
return receiver.getRemoteCredit();
}
@Override
public void setEmptyCreditListener(Supplier<Integer> creditSupplier) {
Objects.requireNonNull(creditSupplier, "'creditSupplier' cannot be null.");
this.creditSupplier.set(creditSupplier);
}
@Override
public String getLinkName() {
return receiver.getName();
}
@Override
public String getEntityPath() {
return entityPath;
}
@Override
public String getHostname() {
return handler.getHostname();
}
@Override
public boolean isDisposed() {
return isDisposed.get();
}
@Override
public void dispose() {
if (isDisposed.getAndSet(true)) {
return;
}
subscriptions.dispose();
messagesProcessor.onComplete();
tokenManager.close();
receiver.close();
try {
dispatcher.invoke(() -> {
receiver.free();
handler.close();
});
} catch (IOException e) {
logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", e);
handler.close();
}
}
/**
* Disposes of the sender when an exception is encountered.
*
* @param condition Error condition associated with close operation.
*/
void dispose(ErrorCondition condition) {
if (isDisposed.getAndSet(true)) {
return;
}
logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}",
handler.getConnectionId(), entityPath, getLinkName(), condition);
if (receiver.getLocalState() != EndpointState.CLOSED) {
receiver.close();
if (receiver.getCondition() == null) {
receiver.setCondition(condition);
}
}
try {
dispatcher.invoke(() -> {
receiver.free();
handler.close();
});
} catch (IOException e) {
logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", e);
handler.close();
}
messagesProcessor.onComplete();
tokenManager.close();
}
protected Message decodeDelivery(Delivery delivery) {
final int messageSize = delivery.pending();
final byte[] buffer = new byte[messageSize];
final int read = receiver.recv(buffer, 0, messageSize);
receiver.advance();
final Message message = Proton.message();
message.decode(buffer, 0, read);
delivery.settle();
return message;
}
@Override
public String toString() {
return String.format("link name: [%s], entity path: [%s]", receiver.getName(), entityPath);
}
}