PartitionProcessorFactoryImpl.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseCheckpointer;
import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer;
import com.azure.cosmos.implementation.changefeed.PartitionProcessor;
import com.azure.cosmos.implementation.changefeed.PartitionProcessorFactory;
import com.azure.cosmos.implementation.changefeed.ProcessorSettings;
/**
* Implementation for {@link PartitionProcessorFactory}.
*/
class PartitionProcessorFactoryImpl implements PartitionProcessorFactory {
private final ChangeFeedContextClient documentClient;
private final ChangeFeedProcessorOptions changeFeedProcessorOptions;
private final LeaseCheckpointer leaseCheckpointer;
private final CosmosAsyncContainer collectionSelfLink;
public PartitionProcessorFactoryImpl(
ChangeFeedContextClient documentClient,
ChangeFeedProcessorOptions changeFeedProcessorOptions,
LeaseCheckpointer leaseCheckpointer,
CosmosAsyncContainer collectionSelfLink) {
if (documentClient == null) {
throw new IllegalArgumentException("documentClient");
}
if (changeFeedProcessorOptions == null) {
throw new IllegalArgumentException("changeFeedProcessorOptions");
}
if (leaseCheckpointer == null) {
throw new IllegalArgumentException("leaseCheckpointer");
}
if (collectionSelfLink == null) {
throw new IllegalArgumentException("collectionSelfLink");
}
this.documentClient = documentClient;
this.changeFeedProcessorOptions = changeFeedProcessorOptions;
this.leaseCheckpointer = leaseCheckpointer;
this.collectionSelfLink = collectionSelfLink;
}
@Override
public PartitionProcessor create(Lease lease, ChangeFeedObserver observer) {
if (observer == null) {
throw new IllegalArgumentException("observer");
}
if (lease == null) {
throw new IllegalArgumentException("lease");
}
String startContinuation = lease.getContinuationToken();
if (startContinuation == null || startContinuation.isEmpty()) {
startContinuation = this.changeFeedProcessorOptions.getStartContinuation();
}
ProcessorSettings settings = new ProcessorSettings()
.withCollectionLink(this.collectionSelfLink)
.withStartContinuation(startContinuation)
.withPartitionKeyRangeId(lease.getLeaseToken())
.withFeedPollDelay(this.changeFeedProcessorOptions.getFeedPollDelay())
.withMaxItemCount(this.changeFeedProcessorOptions.getMaxItemCount())
.withStartFromBeginning(this.changeFeedProcessorOptions.isStartFromBeginning())
.withStartTime(this.changeFeedProcessorOptions.getStartTime()); // .getSessionToken(this.changeFeedProcessorOptions.getSessionToken());
PartitionCheckpointer checkpointer = new PartitionCheckpointerImpl(this.leaseCheckpointer, lease);
return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer);
}
}