WorkerTask.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.PartitionSupervisor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Worker task that executes in a separate thread.
 */
class WorkerTask extends Thread {
    private final Logger logger = LoggerFactory.getLogger(WorkerTask.class);
    private AtomicBoolean done;
    private Mono<Void> job;
    private Lease lease;
    private PartitionSupervisor partitionSupervisor;

    WorkerTask(Lease lease, PartitionSupervisor partitionSupervisor, Mono<Void> job) {
        this.lease = lease;
        this.job = job;
        this.partitionSupervisor = partitionSupervisor;
        done = new AtomicBoolean(false);
    }

    @Override
    public void run() {
        job
            .doOnSuccess(avoid -> logger.info("Partition controller worker task {} has finished running.", lease.getLeaseToken()))
            .doOnTerminate(() -> {
                logger.info("Partition controller worker task {} has exited.", lease.getLeaseToken());
                job = null;
                this.done.set(true);
            })
            .subscribe();
    }

    public void cancelJob() {
        this.partitionSupervisor.shutdown();
        this.interrupt();
    }

    public Lease lease() {
        return this.lease;
    }

    public boolean isRunning() {
        return !this.done.get();
    }
}