EventHubPartitionAsyncConsumer.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONSUMER_GROUP_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;

/**
 * A package-private consumer responsible for reading {@link EventData} from a specific Event Hub partition in the
 * context of a specific consumer group.
 */
class EventHubPartitionAsyncConsumer implements AutoCloseable {
    private final ClientLogger logger = new ClientLogger(EventHubPartitionAsyncConsumer.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicReference<LastEnqueuedEventProperties> lastEnqueuedEventProperties = new AtomicReference<>();
    private final AmqpReceiveLinkProcessor amqpReceiveLinkProcessor;
    private final MessageSerializer messageSerializer;
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final String consumerGroup;
    private final String partitionId;
    private final boolean trackLastEnqueuedEventProperties;
    private final Flux<PartitionEvent> emitterProcessor;
    private final EventPosition initialPosition;

    private volatile Long currentOffset;

    EventHubPartitionAsyncConsumer(AmqpReceiveLinkProcessor amqpReceiveLinkProcessor,
        MessageSerializer messageSerializer, String fullyQualifiedNamespace, String eventHubName, String consumerGroup,
        String partitionId, AtomicReference<Supplier<EventPosition>> currentEventPosition,
        boolean trackLastEnqueuedEventProperties) {
        this.initialPosition = Objects.requireNonNull(currentEventPosition.get().get(),
            "'currentEventPosition.get().get()' cannot be null.");
        this.amqpReceiveLinkProcessor = amqpReceiveLinkProcessor;
        this.messageSerializer = messageSerializer;
        this.fullyQualifiedNamespace = fullyQualifiedNamespace;
        this.eventHubName = eventHubName;
        this.consumerGroup = consumerGroup;
        this.partitionId = partitionId;
        this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;

        if (trackLastEnqueuedEventProperties) {
            lastEnqueuedEventProperties.set(new LastEnqueuedEventProperties(null, null, null, null));
        }

        currentEventPosition.set(() -> {
            final Long offset = currentOffset;
            return offset == null
                ? initialPosition
                : EventPosition.fromOffset(offset);
        });

        this.emitterProcessor = amqpReceiveLinkProcessor
            .map(this::onMessageReceived)
            .doOnNext(event -> {
                // Keep track of the last position so if the link goes down, we don't start from the original location.
                final Long offset = event.getData().getOffset();
                if (offset != null) {
                    currentOffset = offset;
                } else {
                    logger.atWarning()
                        .addKeyValue(PARTITION_ID_KEY,  event.getPartitionContext().getPartitionId())
                        .addKeyValue(CONSUMER_GROUP_KEY, event.getPartitionContext().getConsumerGroup())
                        .addKeyValue("data", () -> event.getData().getBodyAsString())
                        .log("Offset for received event should not be null.");
                }
            });
    }

    /**
     * Disposes of the consumer by closing the underlying connection to the service.
     */
    @Override
    public void close() {
        if (!isDisposed.getAndSet(true)) {
            if (!amqpReceiveLinkProcessor.isTerminated()) {
                // cancel only if the processor is not already terminated.
                amqpReceiveLinkProcessor.cancel();
            }
            logger.atInfo()
                .addKeyValue(PARTITION_ID_KEY, this.partitionId)
                .log("Closed consumer.");
        }
    }

    /**
     * Begin consuming events until there are no longer any subscribers.
     *
     * @return A stream of events received from the partition.
     */
    Flux<PartitionEvent> receive() {
        return emitterProcessor;
    }

    /**
     * On each message received from the service, it will try to:
     * <ol>
     * <li>Deserialize the message into an {@link EventData}.</li>
     * <li>If {@link ReceiveOptions#getTrackLastEnqueuedEventProperties()} is true, then it will try to update
     * {@link LastEnqueuedEventProperties}.</li>
     * </ol>
     *
     * @param message AMQP message to deserialize.
     *
     * @return The deserialized {@link EventData} with partition information.
     */
    private PartitionEvent onMessageReceived(Message message) {
        final EventData event = messageSerializer.deserialize(message, EventData.class);

        if (trackLastEnqueuedEventProperties) {
            final LastEnqueuedEventProperties enqueuedEventProperties =
                messageSerializer.deserialize(message, LastEnqueuedEventProperties.class);

            if (enqueuedEventProperties != null) {
                final LastEnqueuedEventProperties updated = new LastEnqueuedEventProperties(
                    enqueuedEventProperties.getSequenceNumber(), enqueuedEventProperties.getOffset(),
                    enqueuedEventProperties.getEnqueuedTime(), enqueuedEventProperties.getRetrievalTime());
                lastEnqueuedEventProperties.set(updated);
            }
        }

        final PartitionContext partitionContext = new PartitionContext(fullyQualifiedNamespace, eventHubName,
            consumerGroup, partitionId);
        return new PartitionEvent(partitionContext, event, lastEnqueuedEventProperties.get());
    }
}