CosmosAsyncScripts.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.core.util.Context;
import com.azure.cosmos.implementation.StoredProcedure;
import com.azure.cosmos.implementation.Trigger;
import com.azure.cosmos.implementation.UserDefinedFunction;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.CosmosStoredProcedureProperties;
import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions;
import com.azure.cosmos.models.CosmosStoredProcedureResponse;
import com.azure.cosmos.models.CosmosTriggerProperties;
import com.azure.cosmos.models.CosmosTriggerResponse;
import com.azure.cosmos.models.CosmosUserDefinedFunctionProperties;
import com.azure.cosmos.models.CosmosUserDefinedFunctionResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import reactor.core.publisher.Mono;

import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;

/**
 * The type Cosmos async scripts. This contains async methods to operate on cosmos scripts like UDFs, StoredProcedures
 * and Triggers
 */
public class CosmosAsyncScripts {
    private final CosmosAsyncContainer container;
    private final CosmosAsyncDatabase database;

    CosmosAsyncScripts(CosmosAsyncContainer container) {
        this.container = container;
        this.database = container.getDatabase();
    }
    /* CosmosAsyncStoredProcedure operations */

    /**
     * Creates a cosmos stored procedure.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single cosmos stored procedure response with the
     * created cosmos stored procedure.
     * In case of failure the {@link Mono} will error.
     *
     * @param properties the cosmos stored procedure properties.
     * @return an {@link Mono} containing the single cosmos stored procedure resource response or an error.
     */
    public Mono<CosmosStoredProcedureResponse> createStoredProcedure(CosmosStoredProcedureProperties properties) {
        return this.createStoredProcedure(properties, new CosmosStoredProcedureRequestOptions());
    }

    /**
     * Creates a cosmos stored procedure.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single cosmos stored procedure response with the
     * created cosmos stored procedure.
     * In case of failure the {@link Mono} will error.
     *
     * @param properties the cosmos stored procedure properties.
     * @param options the stored procedure request options.
     * @return an {@link Mono} containing the single cosmos stored procedure resource response or an error.
     */
    public Mono<CosmosStoredProcedureResponse> createStoredProcedure(
        CosmosStoredProcedureProperties properties,
        CosmosStoredProcedureRequestOptions options) {
        if (options == null) {
            options = new CosmosStoredProcedureRequestOptions();
        }
        StoredProcedure sProc = new StoredProcedure();
        sProc.setId(properties.getId());
        sProc.setBody(properties.getBody());
        final CosmosStoredProcedureRequestOptions requestOptions = options;
        return withContext(context -> createStoredProcedureInternal(sProc, requestOptions, context));
    }

    /**
     * Reads all cosmos stored procedures in a container.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the read cosmos stored
     * procedure properties.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read cosmos stored
     * procedures
     * properties or an error.
     */
    public CosmosPagedFlux<CosmosStoredProcedureProperties> readAllStoredProcedures() {
        return readAllStoredProcedures(new CosmosQueryRequestOptions());
    }

    /**
     * Reads all cosmos stored procedures in a container.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the read cosmos stored
     * procedure properties.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read cosmos stored
     * procedures
     * properties or an error.
     */
    CosmosPagedFlux<CosmosStoredProcedureProperties> readAllStoredProcedures(CosmosQueryRequestOptions options) {
        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            String spanName = "readAllStoredProcedures." + this.container.getId();
            pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(),
                spanName,
                this.container.getDatabase().getClient().getServiceEndpoint(),
                this.container.getDatabase().getId());
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
            return database.getDocClientWrapper()
                .readStoredProcedures(container.getLink(), options)
                .map(response -> BridgeInternal.createFeedResponse(
                    ModelBridgeInternal.getCosmosStoredProcedurePropertiesFromV2Results(response.getResults()),
                    response.getResponseHeaders()));
        });
    }

    /**
     * Query for stored procedures in a container.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the obtained stored procedures.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param query the the query.
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained stored
     * procedures or
     * an error.
     */
    public CosmosPagedFlux<CosmosStoredProcedureProperties> queryStoredProcedures(
        String query,
            CosmosQueryRequestOptions options) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        return queryStoredProceduresInternal(new SqlQuerySpec(query), options);
    }

    /**
     * Query for stored procedures in a container.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the obtained stored procedures.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param querySpec the SQL query specification.
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained stored
     * procedures or
     * an error.
     */
    public CosmosPagedFlux<CosmosStoredProcedureProperties> queryStoredProcedures(
        SqlQuerySpec querySpec,
        CosmosQueryRequestOptions options) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        return queryStoredProceduresInternal(querySpec, options);
    }

    /**
     * Gets a CosmosAsyncStoredProcedure object without making a service call
     *
     * @param id id of the stored procedure
     * @return a cosmos stored procedure
     */
    public CosmosAsyncStoredProcedure getStoredProcedure(String id) {
        return new CosmosAsyncStoredProcedure(id, this.container);
    }

    /* UDF Operations */

    /**
     * Creates a cosmos user defined function.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single cosmos user defined function response.
     * In case of failure the {@link Mono} will error.
     *
     * @param properties the cosmos user defined function properties
     * @return an {@link Mono} containing the single resource response with the created user defined function or an
     * error.
     */
    public Mono<CosmosUserDefinedFunctionResponse> createUserDefinedFunction(
        CosmosUserDefinedFunctionProperties properties) {
        UserDefinedFunction udf = new UserDefinedFunction();
        udf.setId(properties.getId());
        udf.setBody(properties.getBody());
        return withContext(context -> createUserDefinedFunctionInternal(udf, context));
    }

    /**
     * Reads all cosmos user defined functions in the container
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the read user defined functions.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read user defined
     * functions or an
     * error.
     */
    public CosmosPagedFlux<CosmosUserDefinedFunctionProperties> readAllUserDefinedFunctions() {
        return readAllUserDefinedFunctions(new CosmosQueryRequestOptions());
    }

    /**
     * Reads all cosmos user defined functions in the container
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the read user defined functions.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read user defined
     * functions or an
     * error.
     */
    CosmosPagedFlux<CosmosUserDefinedFunctionProperties> readAllUserDefinedFunctions(CosmosQueryRequestOptions options) {
        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            String spanName = "readAllUserDefinedFunctions." + this.container.getId();
            pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(),
                spanName,
                this.container.getDatabase().getClient().getServiceEndpoint(),
                this.container.getDatabase().getId());
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
            return database.getDocClientWrapper()
                .readUserDefinedFunctions(container.getLink(), options)
                .map(response -> BridgeInternal.createFeedResponse(
                    ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()),
                    response.getResponseHeaders()));
        });
    }

    /**
     * Query for user defined functions in the container.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the obtained user defined
     * functions.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param query the query.
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained user defined
     * functions
     * or an error.
     */
    public CosmosPagedFlux<CosmosUserDefinedFunctionProperties> queryUserDefinedFunctions(
        String query,
        CosmosQueryRequestOptions options) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        return queryUserDefinedFunctions(new SqlQuerySpec(query), options);
    }

    /**
     * Query for user defined functions in the container.
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the obtained user defined
     * functions.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param querySpec the SQL query specification.
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained user defined
     * functions
     * or an error.
     */
    public CosmosPagedFlux<CosmosUserDefinedFunctionProperties> queryUserDefinedFunctions(
        SqlQuerySpec querySpec,
        CosmosQueryRequestOptions options) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        return queryUserDefinedFunctionsInternal(querySpec, options);
    }

    /**
     * Gets a CosmosAsyncUserDefinedFunction object without making a service call
     *
     * @param id id of the user defined function
     * @return a cosmos user defined function
     */
    public CosmosAsyncUserDefinedFunction getUserDefinedFunction(String id) {
        return new CosmosAsyncUserDefinedFunction(id, this.container);
    }

    /* Trigger Operations */

    /**
     * Creates a Cosmos trigger.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a cosmos trigger response
     * In case of failure the {@link Mono} will error.
     *
     * @param properties the cosmos trigger properties
     * @return an {@link Mono} containing the single resource response with the created trigger or an error.
     */
    public Mono<CosmosTriggerResponse> createTrigger(CosmosTriggerProperties properties) {
        return withContext(context -> createTriggerInternal(properties, context));
    }

    /**
     * Reads all triggers in a container
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the read cosmos trigger
     * properties.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read cosmos rigger
     * properties or
     * an error.
     */
    public CosmosPagedFlux<CosmosTriggerProperties> readAllTriggers() {
        return readAllTriggers(new CosmosQueryRequestOptions());
    }

    /**
     * Reads all triggers in a container
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the read cosmos trigger
     * properties.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the read cosmos rigger
     * properties or
     * an error.
     */
    CosmosPagedFlux<CosmosTriggerProperties> readAllTriggers(CosmosQueryRequestOptions options) {
        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            String spanName = "readAllTriggers." + this.container.getId();
            pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(),
                spanName,
                this.container.getDatabase().getClient().getServiceEndpoint(),
                this.container.getDatabase().getId());
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
            return database.getDocClientWrapper()
                .readTriggers(container.getLink(), options)
                .map(response -> BridgeInternal.createFeedResponse(
                    ModelBridgeInternal.getCosmosTriggerPropertiesFromV2Results(response.getResults()),
                    response.getResponseHeaders()));
        });
    }

    /**
     * Query for triggers in the container
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the obtained triggers.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param query the query.
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained triggers or an
     * error.
     */
    public CosmosPagedFlux<CosmosTriggerProperties> queryTriggers(String query, CosmosQueryRequestOptions options) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        return queryTriggersInternal(false, new SqlQuerySpec(query), options);
    }

    /**
     * Query for triggers in the container
     * <p>
     * After subscription the operation will be performed.
     * The {@link CosmosPagedFlux} will contain one or several feed response pages of the obtained triggers.
     * In case of failure the {@link CosmosPagedFlux} will error.
     *
     * @param querySpec the SQL query specification.
     * @param options the query request options.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained triggers or an
     * error.
     */
    public CosmosPagedFlux<CosmosTriggerProperties> queryTriggers(
        SqlQuerySpec querySpec,
        CosmosQueryRequestOptions options) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();
        }

        return queryTriggersInternal(true, querySpec, options);
    }

    /**
     * Gets a CosmosAsyncTrigger object without making a service call
     *
     * @param id id of the cosmos trigger
     * @return a cosmos trigger
     */
    public CosmosAsyncTrigger getTrigger(String id) {
        return new CosmosAsyncTrigger(id, this.container);
    }

    private CosmosPagedFlux<CosmosStoredProcedureProperties> queryStoredProceduresInternal(
        SqlQuerySpec querySpec,
        CosmosQueryRequestOptions options) {
        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            String spanName = "queryStoredProcedures." + this.container.getId();
            pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(),
                spanName,
                this.container.getDatabase().getClient().getServiceEndpoint(),
                this.container.getDatabase().getId());
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
            return database.getDocClientWrapper()
                .queryStoredProcedures(container.getLink(), querySpec, options)
                .map(response -> BridgeInternal.createFeedResponse(
                    ModelBridgeInternal.getCosmosStoredProcedurePropertiesFromV2Results(response.getResults()),
                    response.getResponseHeaders()));
        });
    }

    private CosmosPagedFlux<CosmosUserDefinedFunctionProperties> queryUserDefinedFunctionsInternal(
        SqlQuerySpec querySpec,
        CosmosQueryRequestOptions options) {
        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            String spanName = "queryUserDefinedFunctions." + this.container.getId();
            pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(),
                spanName,
                this.container.getDatabase().getClient().getServiceEndpoint(),
                this.container.getDatabase().getId());
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
            return database.getDocClientWrapper()
                .queryUserDefinedFunctions(container.getLink(), querySpec, options)
                .map(response -> BridgeInternal.createFeedResponse(
                    ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()),
                    response.getResponseHeaders()));
        });
    }

    private CosmosPagedFlux<CosmosTriggerProperties> queryTriggersInternal(
        boolean isParameterised,
        SqlQuerySpec querySpec,
        CosmosQueryRequestOptions options) {
        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            String spanName;
            if (isParameterised) {
                spanName = "queryTriggers." + this.container.getId() + "." + querySpec.getQueryText();
            } else {
                spanName = "queryTriggers." + this.container.getId();
            }

            pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(),
                spanName,
                this.container.getDatabase().getClient().getServiceEndpoint(),
                this.container.getDatabase().getId());
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
            return database.getDocClientWrapper()
                .queryTriggers(container.getLink(), querySpec, options)
                .map(response -> BridgeInternal.createFeedResponse(
                    ModelBridgeInternal.getCosmosTriggerPropertiesFromV2Results(response.getResults()),
                    response.getResponseHeaders()));
        });
    }

    private Mono<CosmosStoredProcedureResponse> createStoredProcedureInternal(StoredProcedure sProc,
                                                                           CosmosStoredProcedureRequestOptions options,
                                                                           Context context) {
        String spanName = "createStoredProcedure." + container.getId();
        Mono<CosmosStoredProcedureResponse> responseMono = createStoredProcedureInternal(sProc, options);
        return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono,
            context,
            spanName,
            database.getId(),
            database.getClient().getServiceEndpoint());
    }

    private Mono<CosmosStoredProcedureResponse> createStoredProcedureInternal(StoredProcedure sProc,
                                                                           CosmosStoredProcedureRequestOptions options) {
        return database.getDocClientWrapper()
            .createStoredProcedure(container.getLink(), sProc, ModelBridgeInternal.toRequestOptions(options)).map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response))
            .single();
    }

    private Mono<CosmosUserDefinedFunctionResponse> createUserDefinedFunctionInternal(
        UserDefinedFunction udf,
        Context context) {
        String spanName = "createUserDefinedFunction." + container.getId();
        Mono<CosmosUserDefinedFunctionResponse> responseMono = createUserDefinedFunctionInternal(udf);
        return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono,
            context,
            spanName,
            database.getId(),
            database.getClient().getServiceEndpoint());
    }

    private Mono<CosmosUserDefinedFunctionResponse> createUserDefinedFunctionInternal(
        UserDefinedFunction udf) {
        return database.getDocClientWrapper()
            .createUserDefinedFunction(container.getLink(), udf, null).map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single();
    }

    private Mono<CosmosTriggerResponse> createTriggerInternal(CosmosTriggerProperties properties, Context context) {
        String spanName = "createTrigger." + container.getId();
        Mono<CosmosTriggerResponse> responseMono = createTriggerInternal(properties);
        return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono,
            context,
            spanName,
            database.getId(),
            database.getClient().getServiceEndpoint());
    }

    private Mono<CosmosTriggerResponse> createTriggerInternal(CosmosTriggerProperties properties) {
        Trigger trigger = new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable(ModelBridgeInternal.getResource(properties)));
        return database.getDocClientWrapper()
            .createTrigger(container.getLink(), trigger, null)
            .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response))
            .single();
    }

}