CheckpointerObserverFactory.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverFactory;
import com.azure.cosmos.implementation.changefeed.CheckpointFrequency;

/**
 * Factory class used to create instance(s) of {@link ChangeFeedObserver}.
 */
class CheckpointerObserverFactory implements ChangeFeedObserverFactory {
    private final ChangeFeedObserverFactory observerFactory;
    private final CheckpointFrequency checkpointFrequency;

    /**
     * Initializes a new instance of the {@link CheckpointerObserverFactory} class.
     *
     * @param observerFactory the instance of observer factory.
     * @param checkpointFrequency the the frequency of lease event.
     */
    public CheckpointerObserverFactory(ChangeFeedObserverFactory observerFactory, CheckpointFrequency checkpointFrequency) {
        if (observerFactory == null) {
            throw new IllegalArgumentException("observerFactory");
        }

        if (checkpointFrequency == null) {
            throw new IllegalArgumentException("checkpointFrequency");
        }

        this.observerFactory = observerFactory;
        this.checkpointFrequency = checkpointFrequency;
    }

    /**
     * @return a new instance of {@link ChangeFeedObserver}.
     */
    @Override
    public ChangeFeedObserver createObserver() {
        ChangeFeedObserver observer = new ObserverExceptionWrappingChangeFeedObserverDecorator(this.observerFactory.createObserver());
        if (this.checkpointFrequency.isExplicitCheckpoint()) {
            return observer;
        }

        return new AutoCheckpointer(this.checkpointFrequency, observer);
    }
}