EventHubReactorSession.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventhubs.implementation;
import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.OFFSET_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
import static com.azure.core.amqp.implementation.AmqpConstants.VENDOR;
/**
* An AMQP session for Event Hubs.
*/
class EventHubReactorSession extends ReactorSession implements EventHubSession {
private static final Symbol EPOCH = Symbol.valueOf(VENDOR + ":epoch");
private static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME =
Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric");
private final ClientLogger logger = new ClientLogger(EventHubReactorSession.class);
/**
* Creates a new AMQP session using proton-j.
*
* @param session Proton-j session for this AMQP session.
* @param sessionHandler Handler for events that occur in the session.
* @param sessionName Name of the session.
* @param provider Provides reactor instances for messages to sent with.
* @param handlerProvider Providers reactor handlers for listening to proton-j reactor events.
* @param cbsNodeSupplier Mono that returns a reference to the {@link ClaimsBasedSecurityNode}.
* @param tokenManagerProvider Provides {@link TokenManager} that authorizes the client when performing
* operations on the message broker.
* @param retryOptions to be used for this session.
* @param messageSerializer to be used.
*/
EventHubReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler,
String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider,
Mono<ClaimsBasedSecurityNode> cbsNodeSupplier, TokenManagerProvider tokenManagerProvider,
AmqpRetryOptions retryOptions, MessageSerializer messageSerializer) {
super(amqpConnection, session, sessionHandler, sessionName, provider, handlerProvider, cbsNodeSupplier,
tokenManagerProvider, messageSerializer, retryOptions);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPath, Duration timeout,
AmqpRetryPolicy retry, EventPosition eventPosition, ReceiveOptions options) {
Objects.requireNonNull(linkName, "'linkName' cannot be null.");
Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
Objects.requireNonNull(timeout, "'timeout' cannot be null.");
Objects.requireNonNull(retry, "'retry' cannot be null.");
Objects.requireNonNull(eventPosition, "'eventPosition' cannot be null.");
Objects.requireNonNull(options, "'options' cannot be null.");
final String eventPositionExpression = getExpression(eventPosition);
final Map<Symbol, Object> filter = new HashMap<>();
filter.put(AmqpConstants.STRING_FILTER, new UnknownDescribedType(AmqpConstants.STRING_FILTER,
eventPositionExpression));
final Map<Symbol, Object> properties = new HashMap<>();
if (options.getOwnerLevel() != null) {
properties.put(EPOCH, options.getOwnerLevel());
}
final Symbol[] desiredCapabilities = options.getTrackLastEnqueuedEventProperties()
? new Symbol[]{ENABLE_RECEIVER_RUNTIME_METRIC_NAME}
: null;
// Use explicit settlement via dispositions (not pre-settled)
return createConsumer(linkName, entityPath, timeout, retry, filter, properties, desiredCapabilities,
SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND);
}
private String getExpression(EventPosition eventPosition) {
final String isInclusiveFlag = eventPosition.isInclusive() ? "=" : "";
// order of preference
if (eventPosition.getOffset() != null) {
return String.format(
AmqpConstants.AMQP_ANNOTATION_FORMAT, OFFSET_ANNOTATION_NAME.getValue(),
isInclusiveFlag,
eventPosition.getOffset());
}
if (eventPosition.getSequenceNumber() != null) {
return String.format(
AmqpConstants.AMQP_ANNOTATION_FORMAT,
SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(),
isInclusiveFlag,
eventPosition.getSequenceNumber());
}
if (eventPosition.getEnqueuedDateTime() != null) {
String ms;
try {
ms = Long.toString(eventPosition.getEnqueuedDateTime().toEpochMilli());
} catch (ArithmeticException ex) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(Locale.ROOT,
"Event position for enqueued DateTime could not be parsed. Value: '%s'",
eventPosition.getEnqueuedDateTime()), ex));
}
return String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT,
ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), isInclusiveFlag, ms);
}
throw logger.logExceptionAsError(new IllegalArgumentException("No starting position was set."));
}
}