GlobalThroughputControlGroupController.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.throughputControl.controller.group.global;
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.guava25.collect.EvictingQueue;
import com.azure.cosmos.implementation.throughputControl.LinkedCancellationToken;
import com.azure.cosmos.implementation.throughputControl.config.GlobalThroughputControlGroup;
import com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
public class GlobalThroughputControlGroupController extends ThroughputGroupControllerBase {
private static final Logger logger = LoggerFactory.getLogger(GlobalThroughputControlGroupController.class);
private static final double INITIAL_CLIENT_THROUGHPUT_RU_SHARE = 1.0;
private static final double INITIAL_THROUGHPUT_USAGE = 1.0;
private static final int DEFAULT_THROUGHPUT_USAGE_QUEUE_SIZE = 300; // 5 mins windows since we refresh ru usage every 1s
private static final double MIN_LOAD_FACTOR = 0.1;
private final Duration controlItemRenewInterval;
private final ThroughputControlContainerManager containerManager;
private final EvictingQueue<ThroughputUsageSnapshot> throughputUsageSnapshotQueue;
private final Object throughputUsageSnapshotQueueLock;
private AtomicReference<Double> clientThroughputShare;
public GlobalThroughputControlGroupController(
ConnectionMode connectionMode,
GlobalThroughputControlGroup group,
Integer maxContainerThroughput,
RxPartitionKeyRangeCache partitionKeyRangeCache,
String targetContainerRid,
LinkedCancellationToken parentToken) {
super(connectionMode, group, maxContainerThroughput, partitionKeyRangeCache, targetContainerRid, parentToken);
this.controlItemRenewInterval = group.getControlItemRenewInterval();
this.containerManager = new ThroughputControlContainerManager(group);
this.throughputUsageSnapshotQueue = EvictingQueue.create(DEFAULT_THROUGHPUT_USAGE_QUEUE_SIZE);
this.throughputUsageSnapshotQueue.add(new ThroughputUsageSnapshot(INITIAL_THROUGHPUT_USAGE));
this.throughputUsageSnapshotQueueLock = new Object();
this.clientThroughputShare = new AtomicReference<>(INITIAL_CLIENT_THROUGHPUT_RU_SHARE);
}
@Override
@SuppressWarnings("unchecked")
public <T> Mono<T> init() {
return this.containerManager.validateControlContainer()
.flatMap(dummy -> this.containerManager.getOrCreateConfigItem())
.flatMap(dummy -> {
double loadFactor = this.calculateLoadFactor();
return this.calculateClientThroughputShare(loadFactor)
.flatMap(controller -> this.containerManager.createGroupClientItem(loadFactor, this.getClientAllocatedThroughput()));
})
.flatMap(dummy -> this.resolveRequestController())
.doOnSuccess(dummy -> {
this.throughputUsageCycleRenewTask(this.cancellationTokenSource.getToken())
.publishOn(CosmosSchedulers.COSMOS_PARALLEL)
.subscribe();
this.calculateClientThroughputShareTask(this.cancellationTokenSource.getToken())
.publishOn(CosmosSchedulers.COSMOS_PARALLEL)
.subscribe();
})
.thenReturn((T)this);
}
@Override
public double getClientAllocatedThroughput() {
return this.groupThroughput.get() * this.clientThroughputShare.get();
}
@Override
public void recordThroughputUsage(double throughputUsage) {
synchronized (this.throughputUsageSnapshotQueueLock) {
this.throughputUsageSnapshotQueue.add(new ThroughputUsageSnapshot(throughputUsage));
}
}
private Mono<GlobalThroughputControlGroupController> calculateClientThroughputShare(double loadFactor) {
return this.containerManager.queryLoadFactorsOfAllClients(loadFactor)
.doOnSuccess(totalLoads -> this.clientThroughputShare.set(loadFactor / totalLoads))
.thenReturn(this);
}
private double calculateLoadFactor() {
synchronized (this.throughputUsageSnapshotQueueLock) {
Instant startTime = this.throughputUsageSnapshotQueue.peek().getTime();
double totalWeight = 0.0;
for (ThroughputUsageSnapshot throughputUsageSnapshot : this.throughputUsageSnapshotQueue) {
totalWeight += throughputUsageSnapshot.calculateWeight(startTime);
}
double loadFactor = 0.0;
for (ThroughputUsageSnapshot throughputUsageSnapshot : this.throughputUsageSnapshotQueue) {
loadFactor += (throughputUsageSnapshot.getWeight() / totalWeight) * throughputUsageSnapshot.getThroughputUsage();
}
return Math.max(MIN_LOAD_FACTOR, loadFactor);
}
}
private Flux<Void> calculateClientThroughputShareTask(LinkedCancellationToken cancellationToken) {
return Mono.delay(controlItemRenewInterval, CosmosSchedulers.COSMOS_PARALLEL)
.flatMap(t -> {
if (cancellationToken.isCancellationRequested()) {
return Mono.empty();
} else {
double loadFactor = this.calculateLoadFactor();
return this.calculateClientThroughputShare(loadFactor)
.flatMap(dummy -> this.containerManager.replaceOrCreateGroupClientItem(loadFactor, this.getClientAllocatedThroughput()));
}
})
.onErrorResume(throwable -> {
logger.warn("Calculate throughput task failed ", throwable);
return Mono.empty();
})
.then()
.repeat(() -> !cancellationToken.isCancellationRequested());
}
}