CosmosAsyncClient.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.core.annotation.ServiceClient;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.Context;
import com.azure.core.util.tracing.Tracer;
import com.azure.cosmos.implementation.ApiType;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.Database;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.Permission;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics;
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal;
import com.azure.cosmos.models.CosmosAuthorizationTokenResolver;
import com.azure.cosmos.models.CosmosDatabaseProperties;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosPermissionProperties;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.util.Beta;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import io.micrometer.core.instrument.MeterRegistry;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;

import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
 * Provides a client-side logical representation of the Azure Cosmos DB service.
 * This asynchronous client is used to configure and execute requests against the service.
 */
@ServiceClient(
    builder = CosmosClientBuilder.class,
    isAsync = true)
public final class CosmosAsyncClient implements Closeable {

    // Async Cosmos client wrapper
    private final Configs configs;
    private final AsyncDocumentClient asyncDocumentClient;
    private final String serviceEndpoint;
    private final String keyOrResourceToken;
    private final ConnectionPolicy connectionPolicy;
    private final ConsistencyLevel desiredConsistencyLevel;
    private final List<CosmosPermissionProperties> permissions;
    private final CosmosAuthorizationTokenResolver cosmosAuthorizationTokenResolver;
    private final AzureKeyCredential credential;
    private final TokenCredential tokenCredential;
    private final boolean sessionCapturingOverride;
    private final boolean enableTransportClientSharing;
    private final boolean clientTelemetryEnabled;
    private final TracerProvider tracerProvider;
    private final boolean contentResponseOnWriteEnabled;
    private static final Tracer TRACER;
    private final ApiType apiType;

    static {
        ServiceLoader<Tracer> serviceLoader = ServiceLoader.load(Tracer.class);
        Iterator<?> iterator = serviceLoader.iterator();
        if (iterator.hasNext()) {
            TRACER = serviceLoader.iterator().next();
        } else {
            TRACER = null;
        }
    }

    CosmosAsyncClient(CosmosClientBuilder builder) {
        this.configs = builder.configs();
        this.serviceEndpoint = builder.getEndpoint();
        this.keyOrResourceToken = builder.getKey();
        this.connectionPolicy = builder.getConnectionPolicy();
        this.desiredConsistencyLevel = builder.getConsistencyLevel();
        this.permissions = builder.getPermissions();
        this.cosmosAuthorizationTokenResolver = builder.getAuthorizationTokenResolver();
        this.credential = builder.getCredential();
        this.tokenCredential = builder.getTokenCredential();
        this.sessionCapturingOverride = builder.isSessionCapturingOverrideEnabled();
        this.enableTransportClientSharing = builder.isConnectionSharingAcrossClientsEnabled();
        this.clientTelemetryEnabled = builder.isClientTelemetryEnabled();
        this.contentResponseOnWriteEnabled = builder.isContentResponseOnWriteEnabled();
        this.tracerProvider = new TracerProvider(TRACER);
        this.apiType = builder.apiType();

        List<Permission> permissionList = new ArrayList<>();
        if (this.permissions != null) {
            permissionList =
                this.permissions
                    .stream()
                    .map(permissionProperties -> ModelBridgeInternal.getPermission(permissionProperties))
                    .filter(permission -> permission != null)
                    .collect(Collectors.toList());
        }

        this.asyncDocumentClient = new AsyncDocumentClient.Builder()
                                       .withServiceEndpoint(this.serviceEndpoint)
                                       .withMasterKeyOrResourceToken(this.keyOrResourceToken)
                                       .withConnectionPolicy(this.connectionPolicy)
                                       .withConsistencyLevel(this.desiredConsistencyLevel)
                                       .withSessionCapturingOverride(this.sessionCapturingOverride)
                                       .withConfigs(this.configs)
                                       .withTokenResolver(this.cosmosAuthorizationTokenResolver)
                                       .withCredential(this.credential)
                                       .withTransportClientSharing(this.enableTransportClientSharing)
                                       .withContentResponseOnWriteEnabled(this.contentResponseOnWriteEnabled)
                                       .withTokenCredential(this.tokenCredential)
                                       .withState(builder.metadataCaches())
                                       .withPermissionFeed(permissionList)
                                       .withApiType(apiType)
                                       .build();
    }

    AsyncDocumentClient getContextClient() {
        return this.asyncDocumentClient;
    }

    /**
     * Monitor Cosmos client performance and resource utilization using the specified meter registry.
     *
     * @param registry meter registry to use for performance monitoring.
     */
    static void setMonitorTelemetry(MeterRegistry registry) {
        RntbdMetrics.add(registry);
    }

    /**
     * Get the service endpoint.
     *
     * @return the service endpoint.
     */
    String getServiceEndpoint() {
        return serviceEndpoint;
    }

    /**
     * Gets the key or resource token.
     *
     * @return get the key or resource token.
     */
    String getKeyOrResourceToken() {
        return keyOrResourceToken;
    }

    /**
     * Get the connection policy.
     *
     * @return {@link ConnectionPolicy}.
     */
    ConnectionPolicy getConnectionPolicy() {
        return connectionPolicy;
    }

    /**
     * Gets the consistency level.
     *
     * @return the {@link ConsistencyLevel}.
     */
    ConsistencyLevel getDesiredConsistencyLevel() {
        return desiredConsistencyLevel;
    }

    /**
     * Gets the permission list.
     *
     * @return the permission list.
     */
    List<CosmosPermissionProperties> getPermissions() {
        return permissions;
    }

    AsyncDocumentClient getDocClientWrapper() {
        return asyncDocumentClient;
    }

    /**
     * Gets the configs.
     *
     * @return the configs.
     */
    Configs getConfigs() {
        return configs;
    }

    /**
     * Gets the token resolver.
     *
     * @return the token resolver.
     */
    CosmosAuthorizationTokenResolver getCosmosAuthorizationTokenResolver() {
        return cosmosAuthorizationTokenResolver;
    }

    /**
     * Gets the azure key credential.
     *
     * @return azure key credential.
     */
    AzureKeyCredential credential() {
        return credential;
    }

    /**
     * Gets the boolean which indicates whether to only return the headers and status code in Cosmos DB response
     * in case of Create, Update and Delete operations on CosmosItem.
     *
     * If set to false (which is by default), this removes the resource from response. It reduces networking
     * and CPU load by not sending the resource back over the network and serializing it
     * on the client.
     *
     * By-default, this is false.
     *
     * @return a boolean indicating whether resource will be included in the response or not.
     */
    boolean isContentResponseOnWriteEnabled() {
        return contentResponseOnWriteEnabled;
    }

    boolean isClientTelemetryEnabled() {
        return clientTelemetryEnabled;
    }

    /**
     * CREATE a Database if it does not already exist on the service.
     * <p>
     * The {@link Mono} upon successful completion will contain a single cosmos database response with the
     * created or existing database.
     *
     * @param databaseProperties CosmosDatabaseProperties.
     * @return a {@link Mono} containing the cosmos database response with the created or existing database or
     * an error.
     */
    public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) {
        return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()),
            null, context));
    }

    /**
     * Create a Database if it does not already exist on the service.
     * <p>
     * The {@link Mono} upon successful completion will contain a single cosmos database response with the
     * created or existing database.
     *
     * @param id the id of the database.
     * @return a {@link Mono} containing the cosmos database response with the created or existing database or
     * an error.
     */
    public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id) {
        return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id), null, context));
    }

    /**
     * Create a Database if it does not already exist on the service.
     * <p>
     * The throughputProperties will only be used if the specified database
     * does not exist and therefor a new database will be created with throughputProperties.
     * <p>
     * The {@link Mono} upon successful completion will contain a single cosmos database response with the
     * created or existing database.
     *
     * @param id the id.
     * @param throughputProperties the throughputProperties.
     * @return the mono.
     */
    public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) {
        return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id),
            throughputProperties, context));
    }

    /**
     * Creates a database.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single resource response with the
     * created database.
     * In case of failure the {@link Mono} will error.
     *
     * @param databaseProperties {@link CosmosDatabaseProperties}.
     * @param options {@link CosmosDatabaseRequestOptions}.
     * @return an {@link Mono} containing the single cosmos database response with the created database or an error.
     */
    public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseProperties,
                                                       CosmosDatabaseRequestOptions options) {
        final CosmosDatabaseRequestOptions requestOptions = options == null ? new CosmosDatabaseRequestOptions() : options;
        Database wrappedDatabase = new Database();
        wrappedDatabase.setId(databaseProperties.getId());
        return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context));
    }

    /**
     * Creates a database.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single resource response with the
     * created database.
     * In case of failure the {@link Mono} will error.
     *
     * @param databaseProperties {@link CosmosDatabaseProperties}.
     * @return an {@link Mono} containing the single cosmos database response with the created database or an error.
     */
    public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseProperties) {
        return createDatabase(databaseProperties, new CosmosDatabaseRequestOptions());
    }

    /**
     * Creates a database.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single resource response with the
     * created database.
     * In case of failure the {@link Mono} will error.
     *
     * @param id id of the database.
     * @return a {@link Mono} containing the single cosmos database response with the created database or an error.
     */
    public Mono<CosmosDatabaseResponse> createDatabase(String id) {
        return createDatabase(new CosmosDatabaseProperties(id), new CosmosDatabaseRequestOptions());
    }

    /**
     * Creates a database.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single resource response with the
     * created database.
     * In case of failure the {@link Mono} will error.
     *
     * @param databaseProperties {@link CosmosDatabaseProperties}.
     * @param throughputProperties the throughput properties for the database.
     * @param options {@link CosmosDatabaseRequestOptions}.
     * @return an {@link Mono} containing the single cosmos database response with the created database or an error.
     */
    public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseProperties,
                                                       ThroughputProperties throughputProperties,
                                                       CosmosDatabaseRequestOptions options) {
        if (options == null) {
            options = new CosmosDatabaseRequestOptions();
        }

        ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
        Database wrappedDatabase = new Database();
        wrappedDatabase.setId(databaseProperties.getId());
        final CosmosDatabaseRequestOptions requestOptions = options;
        return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context));
    }

    /**
     * Creates a database.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single resource response with the
     * created database.
     * In case of failure the {@link Mono} will error.
     *
     * @param databaseProperties {@link CosmosDatabaseProperties}.
     * @param throughputProperties the throughput properties for the database.
     * @return an {@link Mono} containing the single cosmos database response with the created database or an error.
     */
    public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseProperties, ThroughputProperties throughputProperties) {
        CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
        ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
        return createDatabase(databaseProperties, options);
    }

    /**
     * Creates a database.
     *
     * @param id the id.
     * @param throughputProperties the throughputProperties.
     * @return the mono.
     */
    public Mono<CosmosDatabaseResponse> createDatabase(String id, ThroughputProperties throughputProperties) {
        CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
        ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
        return createDatabase(new CosmosDatabaseProperties(id), options);
    }

    /**
     * Reads all databases.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response of the read databases.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param options {@link CosmosQueryRequestOptions}
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
     */
    CosmosPagedFlux<CosmosDatabaseProperties> readAllDatabases(CosmosQueryRequestOptions options) {
        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            pagedFluxOptions.setTracerInformation(this.tracerProvider, "readAllDatabases", this.serviceEndpoint, null);
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
            return getDocClientWrapper().readDatabases(options)
                .map(response ->
                    BridgeInternal.createFeedResponse(
                        ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
                        response.getResponseHeaders()));
        });
    }

    /**
     * Reads all databases.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response of the read databases.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
     */
    public CosmosPagedFlux<CosmosDatabaseProperties> readAllDatabases() {
        return readAllDatabases(new CosmosQueryRequestOptions());
    }


    /**
     * Query for databases.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response of the read databases.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param query the query.
     * @param options the feed options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
     */
    public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(String query, CosmosQueryRequestOptions options) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        return queryDatabasesInternal(new SqlQuerySpec(query), options);
    }

    /**
     * Query for databases.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response of the read databases.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param querySpec the SQL query specification.
     * @param options the feed options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
     */
    public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        return queryDatabasesInternal(querySpec, options);
    }

    /**
     * Gets a database object without making a service call.
     *
     * @param id name of the database.
     * @return {@link CosmosAsyncDatabase}.
     */
    public CosmosAsyncDatabase getDatabase(String id) {
        return new CosmosAsyncDatabase(id, this);
    }

    /**
     * Close this {@link CosmosAsyncClient} instance and cleans up the resources.
     */
    @Override
    public void close() {
        asyncDocumentClient.close();
    }

    TracerProvider getTracerProvider(){
        return this.tracerProvider;
    }

    /**
     * Enable throughput control group.
     *
     * @param group Throughput control group going to be enabled.
     */
    void enableThroughputControlGroup(ThroughputControlGroupInternal group) {
        checkNotNull(group, "Throughput control group cannot be null");
        this.asyncDocumentClient.enableThroughputControlGroup(group);
    }

    /**
     * Create global throughput control config builder which will be used to build {@link GlobalThroughputControlConfig}.
     *
     * @param databaseId The database id of the control container.
     * @param containerId The container id of the control container.
     * @return A {@link GlobalThroughputControlConfigBuilder}.
     */
    @Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public GlobalThroughputControlConfigBuilder createGlobalThroughputControlConfigBuilder(String databaseId, String containerId) {
        return new GlobalThroughputControlConfigBuilder(this, databaseId, containerId);
    }

    private CosmosPagedFlux<CosmosDatabaseProperties> queryDatabasesInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options){
        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            pagedFluxOptions.setTracerInformation(this.tracerProvider, "queryDatabases", this.serviceEndpoint, null);
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
            return getDocClientWrapper().queryDatabases(querySpec, options)
                .map(response -> BridgeInternal.createFeedResponse(
                    ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
                    response.getResponseHeaders()));
        });
    }


    private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database,
                                                                           ThroughputProperties throughputProperties, Context context) {
        String spanName = "createDatabaseIfNotExists." + database.getId();
        Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL);
        Mono<CosmosDatabaseResponse> responseMono = database.readInternal(new CosmosDatabaseRequestOptions(),
            nestedContext).onErrorResume(exception -> {
            final Throwable unwrappedException = Exceptions.unwrap(exception);
            if (unwrappedException instanceof CosmosException) {
                final CosmosException cosmosException = (CosmosException) unwrappedException;
                if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
                    CosmosDatabaseRequestOptions requestOptions = new CosmosDatabaseRequestOptions();
                    if (throughputProperties != null) {
                        ModelBridgeInternal.setThroughputProperties(requestOptions, throughputProperties);
                    }

                    Database wrappedDatabase = new Database();
                    wrappedDatabase.setId(database.getId());
                    return createDatabaseInternal(wrappedDatabase,
                        requestOptions, nestedContext);
                }
            }
            return Mono.error(unwrappedException);
        });
        return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono,
            context,
            spanName,
            database.getId(),
            this.serviceEndpoint);
    }

    private Mono<CosmosDatabaseResponse> createDatabaseInternal(Database database, CosmosDatabaseRequestOptions options,
                                                             Context context) {
        String spanName = "createDatabase." + database.getId();
        Mono<CosmosDatabaseResponse> responseMono = asyncDocumentClient.createDatabase(database, ModelBridgeInternal.toRequestOptions(options))
            .map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
            .single();
        return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono,
            context,
            spanName,
            database.getId(),
            this.serviceEndpoint);
    }
}