CosmosAsyncDatabase.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos;
import com.azure.core.util.Context;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.Offer;
import com.azure.cosmos.implementation.Paths;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.models.CosmosClientEncryptionKeyProperties;
import com.azure.cosmos.models.CosmosClientEncryptionKeyResponse;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.CosmosUserProperties;
import com.azure.cosmos.models.CosmosUserResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import com.azure.cosmos.util.Beta;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;
/**
* Perform read and delete databases, update database throughput, and perform operations on child resources
*/
public class CosmosAsyncDatabase {
private final CosmosAsyncClient client;
private final String id;
private final String link;
CosmosAsyncDatabase(String id, CosmosAsyncClient client) {
this.id = id;
this.client = client;
this.link = getParentLink() + "/" + getURIPathSegment() + "/" + getId();
}
/**
* Get the id of the CosmosAsyncDatabase.
*
* @return the id of the CosmosAsyncDatabase.
*/
public String getId() {
return id;
}
/**
* Reads a database.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a single cosmos database respone with the
* read database. In case of failure the {@link Mono} will error.
*
* @return an {@link Mono} containing the single cosmos database respone with
* the read database or an error.
*/
public Mono<CosmosDatabaseResponse> read() {
return read(new CosmosDatabaseRequestOptions());
}
/**
* Reads a database.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos cosmos database respone with the
* read database. In case of failure the {@link Mono} will error.
*
* @param options the request options.
* @return an {@link Mono} containing the single cosmos database response with
* the read database or an error.
*/
public Mono<CosmosDatabaseResponse> read(CosmosDatabaseRequestOptions options) {
final CosmosDatabaseRequestOptions requestOptions = options == null ? new CosmosDatabaseRequestOptions() : options;
return withContext(context -> readInternal(requestOptions, context));
}
/**
* Deletes a database.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos database response with the
* deleted database. In case of failure the {@link Mono} will error.
*
* @return an {@link Mono} containing the single cosmos database response.
*/
public Mono<CosmosDatabaseResponse> delete() {
return delete(new CosmosDatabaseRequestOptions());
}
/**
* Deletes a database.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos database response with the
* deleted database. In case of failure the {@link Mono} will error.
*
* @param options the request options.
* @return an {@link Mono} containing the single cosmos database response.
*/
public Mono<CosmosDatabaseResponse> delete(CosmosDatabaseRequestOptions options) {
final CosmosDatabaseRequestOptions requestOptions = options == null ? new CosmosDatabaseRequestOptions() : options;
return withContext(context -> deleteInternal(requestOptions, context));
}
/* CosmosAsyncContainer operations */
/**
* Creates a Cosmos container.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param containerProperties the container properties.
* @return a {@link Mono} containing the single cosmos container response with
* the created container or an error.
* @throws IllegalArgumentException containerProperties cannot be null.
*/
public Mono<CosmosContainerResponse> createContainer(CosmosContainerProperties containerProperties) {
return createContainer(containerProperties, new CosmosContainerRequestOptions());
}
/**
* Creates a Cosmos container with custom throughput properties.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param containerProperties the container properties.
* @param throughputProperties the throughput properties for the container.
* @return a {@link Mono} containing the single cosmos container response with
* the created container or an error.
* @throws IllegalArgumentException thown if containerProerties are null.
*/
public Mono<CosmosContainerResponse> createContainer(
CosmosContainerProperties containerProperties,
ThroughputProperties throughputProperties) {
if (containerProperties == null) {
throw new IllegalArgumentException("containerProperties");
}
CosmosContainerRequestOptions options = new CosmosContainerRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
return createContainer(containerProperties, options);
}
/**
* Creates a container.
*
* @param containerProperties the container properties.
* @param throughputProperties the throughput properties.
* @param options the request options.
* @return the mono.
*/
public Mono<CosmosContainerResponse> createContainer(
CosmosContainerProperties containerProperties,
ThroughputProperties throughputProperties,
CosmosContainerRequestOptions options){
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
return createContainer(containerProperties, options);
}
/**
* Creates a Cosmos container.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param containerProperties the containerProperties.
* @param options the cosmos container request options.
* @return a {@link Mono} containing the cosmos container response with the
* created container or an error.
* @throws IllegalArgumentException containerProperties can not be null.
*/
public Mono<CosmosContainerResponse> createContainer(
CosmosContainerProperties containerProperties,
CosmosContainerRequestOptions options) {
if (containerProperties == null) {
throw new IllegalArgumentException("containerProperties");
}
final CosmosContainerRequestOptions requestOptions = options == null ? new CosmosContainerRequestOptions() : options;
return withContext(context -> createContainerInternal(containerProperties, requestOptions, context));
}
/**
* Creates a Cosmos container.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param containerProperties the containerProperties.
* @param throughput the throughput for the container.
* @param options the cosmos container request options.
* @return a {@link Mono} containing the cosmos container response with the
* created container or an error.
* @throws IllegalArgumentException containerProperties cannot be null.
*/
Mono<CosmosContainerResponse> createContainer(
CosmosContainerProperties containerProperties,
int throughput,
CosmosContainerRequestOptions options) {
if (options == null) {
options = new CosmosContainerRequestOptions();
}
ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput));
return createContainer(containerProperties, options);
}
/**
* Creates a Cosmos container.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param id the cosmos container id.
* @param partitionKeyPath the partition key path.
* @return a {@link Mono} containing the cosmos container response with the
* created container or an error.
*/
public Mono<CosmosContainerResponse> createContainer(String id, String partitionKeyPath) {
return createContainer(new CosmosContainerProperties(id, partitionKeyPath));
}
/**
* Creates a Cosmos container.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param id the cosmos container id.
* @param partitionKeyPath the partition key path.
* @param throughputProperties the throughput properties for the container.
* @return a {@link Mono} containing the cosmos container response with the
* created container or an error.
*/
public Mono<CosmosContainerResponse> createContainer(String id, String partitionKeyPath, ThroughputProperties throughputProperties) {
CosmosContainerRequestOptions options = new CosmosContainerRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
return createContainer(new CosmosContainerProperties(id, partitionKeyPath), options);
}
/**
* Creates a Cosmos container if it does not exist on the service.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created or existing container. In case of failure the {@link Mono} will
* error.
*
* @param containerProperties the container properties
* @return a {@link Mono} containing the cosmos container response with the
* created or existing container or an error.
*/
public Mono<CosmosContainerResponse> createContainerIfNotExists(
CosmosContainerProperties containerProperties) {
CosmosAsyncContainer container = getContainer(containerProperties.getId());
return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, null,
context));
}
/**
* Creates a Cosmos container if it does not exist on the service.
* <p>
* The throughput setting will only be used if the specified container
* does not exist and therefore a new container will be created.
*
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created or existing container. In case of failure the {@link Mono} will
* error.
*
* @param containerProperties the container properties.
* @param throughput the throughput for the container.
* @return a {@link Mono} containing the cosmos container response with the
* created or existing container or an error.
*/
Mono<CosmosContainerResponse> createContainerIfNotExists(
CosmosContainerProperties containerProperties,
int throughput) {
CosmosContainerRequestOptions options = new CosmosContainerRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput));
CosmosAsyncContainer container = getContainer(containerProperties.getId());
return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, options,
context));
}
/**
* Creates a Cosmos container if it does not exist on the service.
* <p>
* The throughput properties will only be used if the specified container
* does not exist and therefor a new container will be created.
*
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created or existing container. In case of failure the {@link Mono} will
* error.
*
* @param containerProperties the container properties.
* @param throughputProperties the throughput properties for the container.
* @return a {@link Mono} containing the cosmos container response with the
* created or existing container or an error.
*/
public Mono<CosmosContainerResponse> createContainerIfNotExists(
CosmosContainerProperties containerProperties,
ThroughputProperties throughputProperties) {
CosmosContainerRequestOptions options = new CosmosContainerRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
CosmosAsyncContainer container = getContainer(containerProperties.getId());
return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, options,
context));
}
/**
* Creates a Cosmos container if it does not exist on the service.
* <p>
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param id the cosmos container id.
* @param partitionKeyPath the partition key path.
* @return a {@link Mono} containing the cosmos container response with the
* created container or an error.
*/
public Mono<CosmosContainerResponse> createContainerIfNotExists(String id, String partitionKeyPath) {
CosmosAsyncContainer container = getContainer(id);
return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id,
partitionKeyPath), container, null,
context));
}
/**
* Creates a Cosmos container if it does not exist on the service.
* <p>
* The throughput properties will only be used if the specified container
* does not exist and therefor a new container will be created.
*
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param id the cosmos container id.
* @param partitionKeyPath the partition key path.
* @param throughputProperties the throughput properties for the container.
* @return a {@link Mono} containing the cosmos container response with the
* created container or an error.
*/
public Mono<CosmosContainerResponse> createContainerIfNotExists(
String id, String partitionKeyPath,
ThroughputProperties throughputProperties) {
CosmosContainerRequestOptions options = new CosmosContainerRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
CosmosAsyncContainer container = getContainer(id);
return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id,
partitionKeyPath), container, options, context));
}
/**
* Creates a Cosmos container if it does not exist on the service.
* <p>
* The throughput setting will only be used if the specified container
* does not exist and a new container will be created.
*
* After subscription the operation will be performed. The {@link Mono} upon
* successful completion will contain a cosmos container response with the
* created container. In case of failure the {@link Mono} will error.
*
* @param id the cosmos container id.
* @param partitionKeyPath the partition key path.
* @param throughput the throughput for the container.
* @return a {@link Mono} containing the cosmos container response with the
* created container or an error.
*/
Mono<CosmosContainerResponse> createContainerIfNotExists(
String id, String partitionKeyPath,
int throughput) {
CosmosContainerRequestOptions options = new CosmosContainerRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput));
CosmosAsyncContainer container = getContainer(id);
return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id,
partitionKeyPath), container, options, context));
}
/**
* Reads all cosmos containers.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the read containers. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param options {@link CosmosQueryRequestOptions}
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of read
* containers or an error.
*/
public CosmosPagedFlux<CosmosContainerProperties> readAllContainers(CosmosQueryRequestOptions options) {
CosmosQueryRequestOptions requestOptions = options == null ? new CosmosQueryRequestOptions() : options;
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
String spanName = "readAllContainers." + this.getId();
pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName,
this.getClient().getServiceEndpoint(), getId());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, requestOptions);
return getDocClientWrapper().readCollections(getLink(), requestOptions)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
}
/**
* Reads all cosmos containers.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the read containers. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of read
* containers or an error.
*/
public CosmosPagedFlux<CosmosContainerProperties> readAllContainers() {
return readAllContainers(new CosmosQueryRequestOptions());
}
/**
* Query for cosmos containers in a cosmos database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained containers. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param query the query.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained containers or an error.
*/
public CosmosPagedFlux<CosmosContainerProperties> queryContainers(String query) {
return queryContainersInternal(new SqlQuerySpec(query), new CosmosQueryRequestOptions());
}
/**
* Query for cosmos containers in a cosmos database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained containers. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param query the query.
* @param options the query request options.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained containers or an error.
*/
public CosmosPagedFlux<CosmosContainerProperties> queryContainers(String query, CosmosQueryRequestOptions options) {
if (options == null) {
options = new CosmosQueryRequestOptions();
}
return queryContainersInternal(new SqlQuerySpec(query), options);
}
/**
* Query for cosmos containers in a cosmos database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained containers. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param querySpec the SQL query specification.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained containers or an error.
*/
public CosmosPagedFlux<CosmosContainerProperties> queryContainers(SqlQuerySpec querySpec) {
return queryContainersInternal(querySpec, new CosmosQueryRequestOptions());
}
/**
* Query for cosmos containers in a cosmos database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained containers. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param querySpec the SQL query specification.
* @param options the query request options.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained containers or an error.
*/
public CosmosPagedFlux<CosmosContainerProperties> queryContainers(SqlQuerySpec querySpec
, CosmosQueryRequestOptions options) {
if (options == null) {
options = new CosmosQueryRequestOptions();
}
return queryContainersInternal(querySpec, options);
}
/**
* Gets a CosmosAsyncContainer object without making a service call
*
* @param id id of the container
* @return Cosmos Container
*/
public CosmosAsyncContainer getContainer(String id) {
return new CosmosAsyncContainer(id, this);
}
/**
* Creates a user After subscription the operation will be performed. The
* {@link Mono} upon successful completion will contain a single resource
* response with the created user. In case of failure the {@link Mono} will
* error.
*
* @param userProperties the cosmos user properties
* @return an {@link Mono} containing the single resource response with the
* created cosmos user or an error.
*/
public Mono<CosmosUserResponse> createUser(CosmosUserProperties userProperties) {
return withContext(context -> createUserInternal(userProperties, context));
}
/**
* Creates a client encryption key after subscription the operation will be performed. The
* {@link Mono} upon successful completion will contain a single resource
* response with the created client encryption key. In case of failure the {@link Mono} will
* error.
*
* @param keyProperties the cosmos client encryption key properties
* @return an {@link Mono} containing the single resource response with the
* created cosmos client encryption key or an error.
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public Mono<CosmosClientEncryptionKeyResponse> createClientEncryptionKey(CosmosClientEncryptionKeyProperties keyProperties) {
return withContext(context -> createClientEncryptionKeyInternal(keyProperties, context));
}
/**
* Upsert a user. Upsert will create a new user if it doesn't exist, or replace
* the existing one if it does. After subscription the operation will be
* performed. The {@link Mono} upon successful completion will contain a single
* resource response with the created user. In case of failure the {@link Mono}
* will error.
*
* @param userProperties the cosmos user properties
* @return an {@link Mono} containing the single resource response with the
* upserted user or an error.
*/
public Mono<CosmosUserResponse> upsertUser(CosmosUserProperties userProperties) {
return withContext(context -> upsertUserInternal(userProperties, context));
}
/**
* Reads all cosmos users in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the read cosmos users. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* read cosmos users or an error.
*/
public CosmosPagedFlux<CosmosUserProperties> readAllUsers() {
return readAllUsers(new CosmosQueryRequestOptions());
}
/**
* Reads all cosmos users in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the read cosmos users. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param options the query request options.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* read cosmos users or an error.
*/
CosmosPagedFlux<CosmosUserProperties> readAllUsers(CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
String spanName = "readAllUsers." + this.getId();
pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName,
this.getClient().getServiceEndpoint(), getId());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().readUsers(getLink(), options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()), response
.getResponseHeaders()));
});
}
/**
* Gets a CosmosAsyncClientEncryptionKey object without making a service call
*
* @param id id of the clientEncryptionKey
* @return Cosmos ClientEncryptionKey
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public CosmosAsyncClientEncryptionKey getClientEncryptionKey(String id) {
return new CosmosAsyncClientEncryptionKey(id, this);
}
/**
* Reads all cosmos client encryption keys in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the read cosmos client encryption keys. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* read cosmos client encryption keys or an error.
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public CosmosPagedFlux<CosmosClientEncryptionKeyProperties> readAllClientEncryptionKeys() {
return readAllClientEncryptionKeys(new CosmosQueryRequestOptions());
}
/**
* Reads all cosmos client encryption keys in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the read cosmos client encryption keys. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param options the query request options.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* read cosmos client encryption keys or an error.
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public CosmosPagedFlux<CosmosClientEncryptionKeyProperties> readAllClientEncryptionKeys(CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
String spanName = "readAllClientEncryptionKeys." + this.getId();
pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName,
this.getClient().getServiceEndpoint(), getId());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().readClientEncryptionKeys(getLink(), options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getClientEncryptionKeyPropertiesList(response.getResults()), response
.getResponseHeaders()));
});
}
/**
* Query for cosmos client encryption keys in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained client encryption keys. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param query query as string.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained client encryption keys or an error.
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public CosmosPagedFlux<CosmosClientEncryptionKeyProperties> queryClientEncryptionKeys(String query) {
return queryClientEncryptionKeys(query, new CosmosQueryRequestOptions());
}
/**
* Query for cosmos client encryption keys in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained client encryption keys. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param query query as string.
* @param options the query request options.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained client encryption keys or an error.
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public CosmosPagedFlux<CosmosClientEncryptionKeyProperties> queryClientEncryptionKeys(String query, CosmosQueryRequestOptions options) {
if (options == null) {
options = new CosmosQueryRequestOptions();
}
return queryClientEncryptionKeysInternal(new SqlQuerySpec(query), options);
}
/**
* Query for cosmos client encryption keys in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained client encryption keys. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param querySpec the SQL query specification.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained client encryption keys or an error.
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public CosmosPagedFlux<CosmosClientEncryptionKeyProperties> queryClientEncryptionKeys(SqlQuerySpec querySpec) {
return queryClientEncryptionKeysInternal(querySpec, new CosmosQueryRequestOptions());
}
/**
* Query for cosmos client encryption keys in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained client encryption keys. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param querySpec the SQL query specification.
* @param options the query request options.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained client encryption keys or an error.
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public CosmosPagedFlux<CosmosClientEncryptionKeyProperties> queryClientEncryptionKeys(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
if (options == null) {
options = new CosmosQueryRequestOptions();
}
return queryClientEncryptionKeysInternal(querySpec, options);
}
private CosmosPagedFlux<CosmosClientEncryptionKeyProperties> queryClientEncryptionKeysInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
String spanName = "queryClientEncryptionKeys." + this.getId();
pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName,
this.getClient().getServiceEndpoint(), getId());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryClientEncryptionKeys(getLink(), querySpec, options)
.map(response -> BridgeInternal.createFeedResponseWithQueryMetrics(
ModelBridgeInternal.getClientEncryptionKeyPropertiesList(response.getResults()),
response.getResponseHeaders(),
ModelBridgeInternal.queryMetrics(response),
ModelBridgeInternal.getQueryPlanDiagnosticsContext(response),
false,
false,
response.getCosmosDiagnostics()));
});
}
/**
* Query for cosmos users in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained users. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param query query as string.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained users or an error.
*/
public CosmosPagedFlux<CosmosUserProperties> queryUsers(String query) {
return queryUsers(query, new CosmosQueryRequestOptions());
}
/**
* Query for cosmos users in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained users. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param query query as string.
* @param options the query request options.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained users or an error.
*/
public CosmosPagedFlux<CosmosUserProperties> queryUsers(String query, CosmosQueryRequestOptions options) {
if (options == null) {
options = new CosmosQueryRequestOptions();
}
return queryUsersInternal(new SqlQuerySpec(query), options);
}
/**
* Query for cosmos users in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained users. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param querySpec the SQL query specification.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained users or an error.
*/
public CosmosPagedFlux<CosmosUserProperties> queryUsers(SqlQuerySpec querySpec) {
return queryUsersInternal(querySpec, new CosmosQueryRequestOptions());
}
/**
* Query for cosmos users in a database.
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed response of the obtained users. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param querySpec the SQL query specification.
* @param options the query request options.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the
* obtained users or an error.
*/
public CosmosPagedFlux<CosmosUserProperties> queryUsers(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
if (options == null) {
options = new CosmosQueryRequestOptions();
}
return queryUsersInternal(querySpec, options);
}
/**
* Gets user.
*
* @param id the id
* @return the user
*/
public CosmosAsyncUser getUser(String id) {
return new CosmosAsyncUser(id, this);
}
/**
* Sets throughput provisioned for a container in measurement of
* Requests-per-Unit in the Azure Cosmos service.
*
* @param throughputProperties the throughput properties.
* @return the mono.
*/
public Mono<ThroughputResponse> replaceThroughput(ThroughputProperties throughputProperties) {
return withContext(context -> replaceThroughputInternal(throughputProperties, context));
}
/**
* Gets the throughput of the database.
*
* @return the mono containing throughput response.
*/
public Mono<ThroughputResponse> readThroughput() {
return withContext(context -> readThroughputInternal(context));
}
SqlQuerySpec getOfferQuerySpecFromResourceId(String resourceId) {
String queryText = "select * from c where c.offerResourceId = @resourceId";
SqlQuerySpec querySpec = new SqlQuerySpec(queryText);
List<SqlParameter> parameters = Collections
.singletonList(new SqlParameter("@resourceId", resourceId));
querySpec.setParameters(parameters);
return querySpec;
}
CosmosAsyncClient getClient() {
return client;
}
AsyncDocumentClient getDocClientWrapper() {
return client.getDocClientWrapper();
}
String getURIPathSegment() {
return Paths.DATABASES_PATH_SEGMENT;
}
String getParentLink() {
return StringUtils.EMPTY;
}
String getLink() {
return this.link;
}
private CosmosPagedFlux<CosmosContainerProperties> queryContainersInternal(SqlQuerySpec querySpec
, CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
String spanName = "queryContainers." + this.getId();
pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName,
this.getClient().getServiceEndpoint(), getId());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryCollections(getLink(), querySpec, options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
}
private CosmosPagedFlux<CosmosUserProperties> queryUsersInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
String spanName = "queryUsers." + this.getId();
pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName,
this.getClient().getServiceEndpoint(), getId());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryUsers(getLink(), querySpec, options)
.map(response -> BridgeInternal.createFeedResponseWithQueryMetrics(
ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()),
response.getResponseHeaders(),
ModelBridgeInternal.queryMetrics(response),
ModelBridgeInternal.getQueryPlanDiagnosticsContext(response),
false,
false,
response.getCosmosDiagnostics()));
});
}
private Mono<CosmosContainerResponse> createContainerIfNotExistsInternal(
CosmosContainerProperties containerProperties,
CosmosAsyncContainer container,
CosmosContainerRequestOptions options,
Context context) {
String spanName = "createContainerIfNotExists." + containerProperties.getId();
Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL);
final CosmosContainerRequestOptions requestOptions = options == null ? new CosmosContainerRequestOptions() :
options;
Mono<CosmosContainerResponse> responseMono =
container.read(requestOptions, nestedContext).onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createContainerInternal(containerProperties, requestOptions, nestedContext);
}
}
return Mono.error(unwrappedException);
});
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
getId(),
getClient().getServiceEndpoint());
}
private Mono<CosmosContainerResponse> createContainerInternal(
CosmosContainerProperties containerProperties,
CosmosContainerRequestOptions options,
Context context) {
String spanName = "createContainer." + containerProperties.getId();
Mono<CosmosContainerResponse> responseMono = getDocClientWrapper()
.createCollection(this.getLink(), ModelBridgeInternal.getV2Collection(containerProperties),
ModelBridgeInternal.toRequestOptions(options))
.map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single();
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
getId(),
getClient().getServiceEndpoint());
}
Mono<CosmosDatabaseResponse> readInternal(CosmosDatabaseRequestOptions options, Context context) {
String spanName = "readDatabase." + this.getId();
Mono<CosmosDatabaseResponse> responseMono = getDocClientWrapper().readDatabase(getLink(),
ModelBridgeInternal.toRequestOptions(options))
.map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single();
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
getId(),
getClient().getServiceEndpoint());
}
private Mono<CosmosDatabaseResponse> deleteInternal(CosmosDatabaseRequestOptions options, Context context) {
String spanName = "deleteDatabase." + this.getId();
Mono<CosmosDatabaseResponse> responseMono = getDocClientWrapper().deleteDatabase(getLink(),
ModelBridgeInternal.toRequestOptions(options))
.map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single();
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
getId(),
getClient().getServiceEndpoint());
}
private Mono<CosmosUserResponse> createUserInternal(CosmosUserProperties userProperties, Context context) {
String spanName = "createUser." + this.getId();
Mono<CosmosUserResponse> responseMono = getDocClientWrapper().createUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null)
.map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single();
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
getId(),
getClient().getServiceEndpoint());
}
private Mono<CosmosUserResponse> upsertUserInternal(CosmosUserProperties userProperties, Context context) {
String spanName = "upsertUser." + this.getId();
Mono<CosmosUserResponse> responseMono = getDocClientWrapper().upsertUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null)
.map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single();
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName, getId(), getClient().getServiceEndpoint());
}
private Mono<CosmosClientEncryptionKeyResponse> createClientEncryptionKeyInternal(CosmosClientEncryptionKeyProperties keyProperties, Context context) {
String spanName = "createClientEncryptionKey." + this.getId();
Mono<CosmosClientEncryptionKeyResponse> responseMono =
getDocClientWrapper().createClientEncryptionKey(this.getLink(),
ModelBridgeInternal.getClientEncryptionKey(keyProperties), null)
.map(response -> ModelBridgeInternal.createCosmosClientEncryptionKeyResponse(response)).single();
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
getId(),
getClient().getServiceEndpoint());
}
private Mono<ThroughputResponse> replaceThroughputInternal(ThroughputProperties throughputProperties, Context context){
String spanName = "replaceThroughput." + this.getId();
Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL);
Mono<ThroughputResponse> responseMono = replaceThroughputInternal(this.readInternal(new CosmosDatabaseRequestOptions(), nestedContext), throughputProperties);
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono,
context,
spanName,
getId(),
getClient().getServiceEndpoint());
}
private Mono<ThroughputResponse> replaceThroughputInternal(Mono<CosmosDatabaseResponse> responseMono, ThroughputProperties throughputProperties) {
return responseMono
.flatMap(response -> this.getDocClientWrapper()
.queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()),
new CosmosQueryRequestOptions())
.single()
.flatMap(offerFeedResponse -> {
if (offerFeedResponse.getResults().isEmpty()) {
return Mono.error(BridgeInternal
.createCosmosException(
HttpConstants.StatusCodes.BADREQUEST,
"No offers found for the " +
"resource " + this.getId()));
}
Offer existingOffer = offerFeedResponse.getResults().get(0);
Offer updatedOffer =
ModelBridgeInternal.updateOfferFromProperties(existingOffer,
throughputProperties);
return this.getDocClientWrapper()
.replaceOffer(updatedOffer)
.single();
})
.map(ModelBridgeInternal::createThroughputRespose));
}
private Mono<ThroughputResponse> readThroughputInternal(Context context){
String spanName = "readThroughput." + this.getId();
Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL);
Mono<ThroughputResponse> responseMono = readThroughputInternal(this.readInternal(new CosmosDatabaseRequestOptions(), nestedContext));
return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono,
context,
spanName,
getId(),
getClient().getServiceEndpoint());
}
private Mono<ThroughputResponse> readThroughputInternal(Mono<CosmosDatabaseResponse> responseMono) {
return responseMono
.flatMap(response -> getDocClientWrapper()
.queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()),
new CosmosQueryRequestOptions())
.single()
.flatMap(offerFeedResponse -> {
if (offerFeedResponse.getResults().isEmpty()) {
return Mono.error(BridgeInternal
.createCosmosException(
HttpConstants.StatusCodes.BADREQUEST,
"No offers found for the " +
"resource " + this.getId()));
}
return getDocClientWrapper()
.readOffer(offerFeedResponse.getResults()
.get(0)
.getSelfLink())
.single();
})
.map(ModelBridgeInternal::createThroughputRespose));
}
///////////////////////////////////////////////////////////////////////////////////////////
// the following helper/accessor only helps to access this class outside of this package.//
///////////////////////////////////////////////////////////////////////////////////////////
static {
ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.setCosmosAsyncDatabaseAccessor(
new ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.CosmosAsyncDatabaseAccessor() {
@Override
public CosmosAsyncClient getCosmosAsyncClient(CosmosAsyncDatabase cosmosAsyncDatabase) {
return cosmosAsyncDatabase.getClient();
}
});
}
}