ChangeFeedStateV1.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;

import java.util.Objects;

import static com.azure.cosmos.BridgeInternal.setProperty;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class ChangeFeedStateV1 extends ChangeFeedState {
    private final String containerRid;
    private final FeedRangeInternal feedRange;
    private final ChangeFeedMode mode;
    private final ChangeFeedStartFromInternal startFromSettings;
    private FeedRangeContinuation continuation;

    public ChangeFeedStateV1(
        String containerRid,
        FeedRangeInternal feedRange,
        ChangeFeedMode mode,
        ChangeFeedStartFromInternal startFromSettings,
        FeedRangeContinuation continuation) {

        super();

        this.containerRid = containerRid;
        this.feedRange = feedRange;
        this.startFromSettings = startFromSettings;
        this.continuation = continuation;
        this.mode = mode;
    }

    @Override
    public FeedRangeContinuation getContinuation() {
        return this.continuation;
    }

    @Override
    public ChangeFeedState setContinuation(FeedRangeContinuation continuation) {
        checkNotNull(continuation, "Argument 'continuation' must not be null.");
        continuation.validateContainer(this.containerRid);
        this.continuation = continuation;

        return this;
    }

    @Override
    public FeedRangeInternal getFeedRange() {
        return this.feedRange;
    }

    @Override
    public ChangeFeedMode getMode() {
        return this.mode;
    }

    @Override
    public ChangeFeedStartFromInternal getStartFromSettings() {
        return this.startFromSettings;
    }

    @Override
    public String applyServerResponseContinuation(
        String serverContinuationToken,
        RxDocumentServiceRequest request) {

        checkNotNull(
            serverContinuationToken,
            "Argument 'serverContinuationToken' must not be null");
        checkNotNull(request, "Argument 'request' must not be null");

        if (this.continuation == null) {
            this.continuation = FeedRangeContinuation.create(
                this.containerRid,
                request.getFeedRange() != null ?
                    request.getFeedRange()
                    : new FeedRangeEpkImpl(request.getEffectiveRange()),
                request.getEffectiveRange());
        }

        this.continuation.replaceContinuation(serverContinuationToken);
        return this.toString();
    }

    @Override
    public String getContainerRid() {
        return this.containerRid;
    }

    private void populateEffectiveRangeAndStartFromSettingsToRequest(RxDocumentServiceRequest request) {
        final ChangeFeedStartFromInternal effectiveStartFrom;
        final CompositeContinuationToken continuationToken;
        if (this.continuation != null) {
            continuationToken = this.continuation.getCurrentContinuationToken();
            request.applyFeedRangeFilter(this.continuation.getFeedRange());
        } else {
            continuationToken = null;
            request.applyFeedRangeFilter(this.feedRange);
        }

        if (continuationToken == null || continuationToken.getToken() == null) {
            effectiveStartFrom = this.startFromSettings;
        } else {
            effectiveStartFrom = new ChangeFeedStartFromETagAndFeedRangeImpl(
                continuationToken.getToken(),
                new FeedRangeEpkImpl(continuationToken.getRange()));
        }

        effectiveStartFrom.populateRequest(request);
    }

    @Override
    public void populatePropertyBag() {
        super.populatePropertyBag();

        setProperty(
            this,
            Constants.Properties.CHANGE_FEED_STATE_VERSION,
            ChangeFeedStateVersions.V1);

        setProperty(
            this,
            Constants.Properties.CHANGE_FEED_STATE_RESOURCE_ID,
            this.containerRid);

        setProperty(
            this,
            Constants.Properties.CHANGE_FEED_STATE_MODE,
            this.mode);

        setProperty(
            this,
            Constants.Properties.CHANGE_FEED_STATE_START_FROM,
            this.startFromSettings);

        if (this.continuation != null) {
            this.continuation.populatePropertyBag();
            setProperty(
                this,
                Constants.Properties.CHANGE_FEED_STATE_CONTINUATION,
                this.continuation);
            this.feedRange.removeProperties(this);
        } else {
            this.feedRange.setProperties(this, true);
        }
    }

    @Override
    public void populateRequest(RxDocumentServiceRequest request, int maxItemCount) {
        // Page size
        request.getHeaders().put(
            HttpConstants.HttpHeaders.PAGE_SIZE,
            String.valueOf(maxItemCount));
        switch (this.mode) {
            case INCREMENTAL:
                request.getHeaders().put(
                    HttpConstants.HttpHeaders.A_IM,
                    HttpConstants.A_IMHeaderValues.INCREMENTAL_FEED);
                break;
            case FULL_FIDELITY:
                request.getHeaders().put(
                    HttpConstants.HttpHeaders.A_IM,
                    HttpConstants.A_IMHeaderValues.FullFidelityFeed);
                break;
            default:
                throw new IllegalStateException("Unsupported change feed mode");
        }

        this.populateEffectiveRangeAndStartFromSettingsToRequest(request);
    }

    @Override
    public boolean equals(Object o) {
        if (!(o instanceof ChangeFeedStateV1)) {
            return false;
        }

        ChangeFeedStateV1 other = (ChangeFeedStateV1)o;
        return Objects.equals(this.feedRange, other.feedRange) &&
            Objects.equals(this.containerRid, other.containerRid) &&
            Objects.equals(this.startFromSettings, other.startFromSettings) &&
            Objects.equals(this.mode, other.mode) &&
            Objects.equals(this.continuation, other.continuation);
    }

    @Override
    public int hashCode() {
        return Objects.hash(
            this.feedRange,
            this.containerRid,
            this.startFromSettings,
            this.mode,
            this.continuation);
    }
}