ThroughputControlContainerManager.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.throughputControl.controller.group.global;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.throughputControl.config.GlobalThroughputControlGroup;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.util.retry.RetrySpec;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
 * In Throughput global control mode, in order to coordinate with other clients and share the throughput defined in the group,
 * there is a need to create/read/replace related items in the control container. This class contains all those related operations.
 */
public class ThroughputControlContainerManager {
    private static final Logger logger = LoggerFactory.getLogger(ThroughputControlContainerManager.class);

    private static final String CLIENT_ITEM_PARTITION_KEY_VALUE_SUFFIX = ".client";
    private static final String CONFIG_ITEM_ID_SUFFIX = ".info";
    private static final String CONFIG_ITEM_PARTITION_KEY_VALUE_SUFFIX = ".config";
    private static final String PARTITION_KEY_PATH = "/groupId";

    private final String clientItemId;
    private final String clientItemPartitionKeyValue;
    private final String configItemId;
    private final String configItemPartitionKeyValue;
    private final CosmosAsyncContainer globalControlContainer;
    private final GlobalThroughputControlGroup group;

    private GlobalThroughputControlConfigItem configItem;
    private GlobalThroughputControlClientItem clientItem;

    public ThroughputControlContainerManager(GlobalThroughputControlGroup group) {
        checkNotNull(group, "Global control group config can not be null");

        this.globalControlContainer = group.getGlobalControlContainer();
        this.group = group;

        String encodedGroupId = Utils.encodeUrlBase64String(this.group.getId().getBytes(StandardCharsets.UTF_8));

        this.clientItemId = encodedGroupId + UUID.randomUUID();
        this.clientItemPartitionKeyValue = this.group.getId() + CLIENT_ITEM_PARTITION_KEY_VALUE_SUFFIX;
        this.configItemId = encodedGroupId + CONFIG_ITEM_ID_SUFFIX;
        this.configItemPartitionKeyValue = this.group.getId() + CONFIG_ITEM_PARTITION_KEY_VALUE_SUFFIX;
    }

    public Mono<GlobalThroughputControlClientItem> createGroupClientItem(double loadFactor, double allocatedThroughput) {
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setContentResponseOnWriteEnabled(true);

        return Mono.just(
                new GlobalThroughputControlClientItem(
                    this.clientItemId,
                    this.clientItemPartitionKeyValue,
                    loadFactor,
                    allocatedThroughput,
                    this.group.getControlItemExpireInterval()))
            .flatMap(groupClientItem -> this.globalControlContainer.createItem(groupClientItem, requestOptions))
            .flatMap(itemResponse -> {
                this.clientItem = itemResponse.getItem();
                return Mono.just(this.clientItem);
            });
    }

    /**
     * Get or create the throughput global control config item.
     * This is to make sure all the clients are using the same configuration for the group.
     *
     * The config item in the control container will be used as the source of truth.
     * If the client has a different config, it will be overwritten by the one in the control container.
     *
     * @return A {@link GlobalThroughputControlClientItem}.
     */
    public Mono<GlobalThroughputControlConfigItem> getOrCreateConfigItem() {
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setContentResponseOnWriteEnabled(true);

        GlobalThroughputControlConfigItem expectedConfigItem =
            new GlobalThroughputControlConfigItem(
                this.configItemId,
                this.configItemPartitionKeyValue,
                this.group.getTargetThroughput(),
                this.group.getTargetThroughputThreshold(),
                this.group.isDefault());

        return this.globalControlContainer.readItem(
                    this.configItemId,
                    new PartitionKey(this.configItemPartitionKeyValue),
                    GlobalThroughputControlConfigItem.class)
            .onErrorResume(throwable -> {
                CosmosException cosmosException = Utils.as(Exceptions.unwrap(throwable), CosmosException.class);
                if (cosmosException != null && cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
                    // hooray, you are the first one, needs to create the config file now
                    return this.globalControlContainer.createItem(expectedConfigItem, requestOptions);
                }

                return Mono.error(throwable);
            })
            .retryWhen(RetrySpec.max(10).filter(throwable -> {
                CosmosException cosmosException = Utils.as(Exceptions.unwrap(throwable), CosmosException.class);
                return cosmosException != null && cosmosException.getStatusCode() == HttpConstants.StatusCodes.CONFLICT;
            }))
            .flatMap(itemResponse -> {
                this.configItem = itemResponse.getItem();

                if (!expectedConfigItem.equals(configItem)) {
                    logger.warn(
                        "Group config using by this client is different than the one in control container, will be ignored. Using following config: {}",
                        this.configItem.toString());
                }

                return Mono.just(this.configItem);
            });
    }

    /**
     * Query the load factor of all other clients except itself for the group, and then add the client load factor to the final sum.

     * @param clientLoadFactor The load factor for the current client.
     * @return The sum of load factor from all clients.
     */
    public Mono<Double> queryLoadFactorsOfAllClients(double clientLoadFactor) {
        // The current design is using ttl to expire client items, so there is no need to check whether the client item is expired.

        String sqlQueryTest = "SELECT VALUE SUM(c.loadFactor) FROM c WHERE c.groupId = @GROUPID AND c.id != @CLIENTITEMID";
        List<SqlParameter> parameters = new ArrayList<>();
        parameters.add(new SqlParameter("@GROUPID", this.clientItemPartitionKeyValue));
        parameters.add(new SqlParameter("@CLIENTITEMID", this.clientItemId));

        SqlQuerySpec querySpec = new SqlQuerySpec(sqlQueryTest, parameters);
        return this.globalControlContainer.queryItems(querySpec, Double.class)
            .single()
            .map(result -> result + clientLoadFactor);
    }

    /**
     * Update the existing group client item.
     *
     * If resource not found, then create a new client item.
     * The client item may get deleted based on the ttl if the client can not keep updating the item due to unexpected failure (for example, network failure).
     *
     * @param loadFactor The new load factor of the client.
     * @param clientAllocatedThroughput The new allocated throughput for the client.
     * @return A {@link GlobalThroughputControlClientItem};
     */
    public Mono<GlobalThroughputControlClientItem> replaceOrCreateGroupClientItem(double loadFactor, double clientAllocatedThroughput) {
        CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions();
        itemRequestOptions.setContentResponseOnWriteEnabled(true);

        return Mono.just(this.clientItem)
            .flatMap(groupClientItem -> {
                groupClientItem.setLoadFactor(loadFactor);
                groupClientItem.setAllocatedThroughput(clientAllocatedThroughput);
                return this.globalControlContainer.replaceItem(
                    groupClientItem, groupClientItem.getId(), new PartitionKey(groupClientItem.getGroupId()), itemRequestOptions);
            })
            .onErrorResume(throwable -> {
                CosmosException cosmosException = Utils.as(Exceptions.unwrap(throwable), CosmosException.class);
                if (cosmosException != null && cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
                    logger.warn("Can not find the expected client item {}, will recreate a new one", this.clientItem.getId());
                    return this.globalControlContainer.createItem(this.clientItem, itemRequestOptions)
                        .retryWhen(RetrySpec.max(5));
                }

                return Mono.error(throwable);
            })
            .flatMap(itemResponse -> {
                this.clientItem = itemResponse.getItem();
                return Mono.just(this.clientItem);
            });
    }

    /**
     * Make sure the control container provided is partitioned as expected.
     *
     * @return A {@link ThroughputControlContainerManager}.
     */
    public Mono<ThroughputControlContainerManager> validateControlContainer() {
        return this.globalControlContainer.read()
            .map(containerResponse -> containerResponse.getProperties())
            .flatMap(containerProperties -> {
                boolean isPartitioned =
                    containerProperties.getPartitionKeyDefinition() != null &&
                        containerProperties.getPartitionKeyDefinition().getPaths() != null &&
                        containerProperties.getPartitionKeyDefinition().getPaths().size() > 0;
                if (!isPartitioned
                        || (containerProperties.getPartitionKeyDefinition().getPaths().size() != 1
                        || !containerProperties.getPartitionKeyDefinition().getPaths().get(0).equals(PARTITION_KEY_PATH))) {
                    return Mono.error(new IllegalArgumentException("The control container must have partition key equal to " + PARTITION_KEY_PATH));
                }

                return Mono.empty();
            })
            .thenReturn(this);
    }
}