RemainingWorkEstimatorImpl.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.RemainingPartitionWork;
import com.azure.cosmos.implementation.changefeed.RemainingWorkEstimator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * Implementation for {@link RemainingWorkEstimator}.
 */
class RemainingWorkEstimatorImpl implements RemainingWorkEstimator {
    private final static char PKRangeIdSeparator = ':';
    private final static char SegmentSeparator = '#';
    private final static String LSNPropertyName = "_lsn";
    private final ChangeFeedContextClient feedDocumentClient;
    private final LeaseContainer leaseContainer;
    private final String collectionSelfLink;
    private final int degreeOfParallelism;

    public RemainingWorkEstimatorImpl(
        LeaseContainer leaseContainer,
        ChangeFeedContextClient feedDocumentClient,
        String collectionSelfLink,
        int degreeOfParallelism) {

        if (leaseContainer == null) {
            throw new IllegalArgumentException("leaseContainer");
        }

        if (collectionSelfLink == null || collectionSelfLink.isEmpty()) {
            throw new IllegalArgumentException("collectionSelfLink");
        }

        if (feedDocumentClient == null) {
            throw new IllegalArgumentException("feedDocumentClient");
        }

        if (degreeOfParallelism < 1) {
            throw new IllegalArgumentException("degreeOfParallelism - Degree of parallelism is out of range");
        }

        this.leaseContainer = leaseContainer;
        this.collectionSelfLink = collectionSelfLink;
        this.feedDocumentClient = feedDocumentClient;
        this.degreeOfParallelism = degreeOfParallelism;
    }

    @Override
    public Mono<Long> estimatedRemainingWork() {
        return this.estimatedRemainingWorkPerPartition()
            .map(RemainingPartitionWork::getRemainingWork)
            .collectList()
            .map(list -> {
                long sum;
                if (list.size() == 0) {
                    sum = 1;
                } else {
                    sum = 0;
                    for (long value : list) {
                        sum += value;
                    }
                }

                return sum;
            });
    }

    @Override
    public Flux<RemainingPartitionWork> estimatedRemainingWorkPerPartition() {
        return null;
    }
}