EventPosition.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.models;

import com.azure.core.annotation.Immutable;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;

import java.time.Instant;
import java.util.Locale;
import java.util.Objects;

/**
 * Defines a position of an {@link EventData} in the Event Hub partition. The position can be an offset, sequence
 * number, or enqueued time.
 */
@Immutable
public final class EventPosition {
    /**
     * This is a constant defined to represent the start of a partition stream in EventHub.
     */
    private static final Long START_OF_STREAM = -1L;

    /**
     * This is a constant defined to represent the current end of a partition stream in EventHub. This can be used as an
     * offset argument in receiver creation to start receiving from the latest event, instead of a specific offset or
     * point in time.
     */
    private static final String END_OF_STREAM = "@latest";

    private static final EventPosition EARLIEST = fromOffset(START_OF_STREAM, false);
    private static final EventPosition LATEST =  new EventPosition(false, END_OF_STREAM, null, null);

    private final boolean isInclusive;
    private final String offset;
    private final Long sequenceNumber;
    private final Instant enqueuedDateTime;

    private EventPosition(final boolean isInclusive, final Long offset, final Long sequenceNumber,
                          final Instant enqueuedDateTime) {
        this(isInclusive, String.valueOf(offset), sequenceNumber, enqueuedDateTime);
    }

    private EventPosition(final boolean isInclusive, final String offset, final Long sequenceNumber,
                          final Instant enqueuedDateTime) {
        this.offset = offset;
        this.sequenceNumber = sequenceNumber;
        this.enqueuedDateTime = enqueuedDateTime;
        this.isInclusive = isInclusive;
    }

    /**
     * Corresponds to the location of the first event present in the partition. Use this position to begin receiving
     * from the first event that was enqueued in the partition which has not expired due to the retention policy.
     *
     * @return An {@link EventPosition} set to the start of an Event Hub stream.
     */
    public static EventPosition earliest() {
        return EARLIEST;
    }

    /**
     * Corresponds to the end of the partition, where no more events are currently enqueued. Use this position to begin
     * receiving from the next event to be enqueued in the partition when
     * {@link EventHubConsumerAsyncClient#receiveFromPartition(String, EventPosition) receiveFromPartition()} invoked.
     *
     * @return An {@link EventPosition} set to the end of an Event Hubs stream and listens for new events.
     */
    public static EventPosition latest() {
        return LATEST;
    }

    /**
     * Creates a position at the given {@link Instant}. Corresponds to a specific instance within a partition to begin
     * looking for an event. The event enqueued after the requested {@code enqueuedDateTime} becomes the current
     * position.
     *
     * @param enqueuedDateTime The instant, in UTC, from which the next available event should be chosen.
     * @return An {@link EventPosition} object.
     */
    public static EventPosition fromEnqueuedTime(Instant enqueuedDateTime) {
        return new EventPosition(false, (String) null, null, enqueuedDateTime);
    }

    /**
     * Creates a position to an event in the partition at the provided offset. The event at that offset will not be
     * included. Instead, the next event is returned.
     *
     * <p>
     * The offset is the relative position for event in the context of the stream. The offset should not be considered a
     * stable value, as the same offset may refer to a different event as events reach the age limit for retention and
     * are no longer visible within the stream.
     * </p>
     *
     * @param offset The offset of the event within that partition.
     * @return An {@link EventPosition} object.
     */
    public static EventPosition fromOffset(long offset) {
        return fromOffset(offset, false);
    }

    /**
     * Creates a position to an event in the partition at the provided offset. If {@code isInclusive} is true, the event
     * with the same offset is returned. Otherwise, the next event is received.
     *
     * @param offset The offset of an event with respect to its relative position in the
     * @param isInclusive If true, the event with the {@code offset} is included; otherwise, the next event will be
     *     received.
     * @return An {@link EventPosition} object.
     */
    private static EventPosition fromOffset(long offset, boolean isInclusive) {
        return new EventPosition(isInclusive, offset, null, null);
    }

    /**
     * Creates a position to an event in the partition at the provided sequence number. The event with the sequence
     * number will not be included. Instead, the next event is returned.
     *
     * @param sequenceNumber is the sequence number of the event.
     * @return An {@link EventPosition} object.
     */
    public static EventPosition fromSequenceNumber(long sequenceNumber) {
        return fromSequenceNumber(sequenceNumber, false);
    }

    /**
     * Creates a position at the given sequence number. If {@code isInclusive} is true, the event with the same sequence
     * number is returned. Otherwise, the next event in the sequence is received.
     *
     * @param sequenceNumber is the sequence number of the event.
     * @param isInclusive If true, the event with the {@code sequenceNumber} is included; otherwise, the next event
     *     will be received.
     * @return An {@link EventPosition} object.
     */
    public static EventPosition fromSequenceNumber(long sequenceNumber, boolean isInclusive) {
        return new EventPosition(isInclusive, (String) null, sequenceNumber, null);
    }

    /**
     * Gets the boolean value of if the event is included. If true, the event with the {@code sequenceNumber} is
     * included; otherwise, the next event will be received.
     *
     * @return The boolean if the event will be received.
     */
    public boolean isInclusive() {
        return isInclusive;
    }

    /**
     * Gets the relative position for event in the context of the stream. The offset should not be considered a stable
     * value, as the same offset may refer to a different event as events reach the age limit for retention and are no
     * longer visible within the stream.
     *
     * @return The offset of the event within that partition.
     */
    public String getOffset() {
        return offset;
    }

    /**
     * Gets the sequence number of the event.
     *
     * @return The sequence number of the event.
     */
    public Long getSequenceNumber() {
        return sequenceNumber;
    }

    /**
     * Gets the instant, in UTC, from which the next available event should be chosen.
     *
     * @return The instant, in UTC, from which the next available event should be chosen.
     */
    public Instant getEnqueuedDateTime() {
        return this.enqueuedDateTime;
    }

    @Override
    public String toString() {
        return String.format(Locale.US, "offset[%s], sequenceNumber[%s], enqueuedTime[%s], isInclusive[%s]",
            offset, sequenceNumber,
            enqueuedDateTime != null ? enqueuedDateTime.toEpochMilli() : "null",
            isInclusive);
    }

    @Override
    public boolean equals(Object obj) {
        if (!(obj instanceof EventPosition)) {
            return false;
        }

        final EventPosition other = (EventPosition) obj;

        return Objects.equals(isInclusive, other.isInclusive)
            && Objects.equals(offset, other.offset)
            && Objects.equals(sequenceNumber, other.sequenceNumber)
            && Objects.equals(enqueuedDateTime, other.enqueuedDateTime);
    }

    @Override
    public int hashCode() {
        return Objects.hash(isInclusive, offset, sequenceNumber, enqueuedDateTime);
    }
}