ReactiveCosmosTemplate.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.spring.data.cosmos.core;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.spring.data.cosmos.Constants;
import com.azure.spring.data.cosmos.CosmosFactory;
import com.azure.spring.data.cosmos.common.CosmosUtils;
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.core.convert.MappingCosmosConverter;
import com.azure.spring.data.cosmos.core.generator.CountQueryGenerator;
import com.azure.spring.data.cosmos.core.generator.FindQuerySpecGenerator;
import com.azure.spring.data.cosmos.core.query.CosmosQuery;
import com.azure.spring.data.cosmos.core.query.Criteria;
import com.azure.spring.data.cosmos.core.query.CriteriaType;
import com.azure.spring.data.cosmos.exception.CosmosExceptionUtils;
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation;
import com.fasterxml.jackson.databind.JsonNode;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.auditing.IsNewAwareAuditingHandler;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.UUID;
/**
* Template class of reactive cosmos
*/
@SuppressWarnings("unchecked")
public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, ApplicationContextAware {
private final MappingCosmosConverter mappingCosmosConverter;
private final String databaseName;
private final ResponseDiagnosticsProcessor responseDiagnosticsProcessor;
private final boolean queryMetricsEnabled;
private final CosmosAsyncClient cosmosAsyncClient;
private final IsNewAwareAuditingHandler cosmosAuditingHandler;
/**
* Initialization
*
* @param client must not be {@literal null}
* @param databaseName must not be {@literal null}
* @param cosmosConfig must not be {@literal null}
* @param mappingCosmosConverter must not be {@literal null}
* @param cosmosAuditingHandler can be {@literal null}
*/
public ReactiveCosmosTemplate(CosmosAsyncClient client, String databaseName,
CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter,
IsNewAwareAuditingHandler cosmosAuditingHandler) {
this(new CosmosFactory(client, databaseName), cosmosConfig, mappingCosmosConverter, cosmosAuditingHandler);
}
/**
* Initialization
*
* @param client must not be {@literal null}
* @param databaseName must not be {@literal null}
* @param cosmosConfig must not be {@literal null}
* @param mappingCosmosConverter must not be {@literal null}
*/
public ReactiveCosmosTemplate(CosmosAsyncClient client, String databaseName,
CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter) {
this(new CosmosFactory(client, databaseName), cosmosConfig, mappingCosmosConverter, null);
}
/**
* Constructor
*
* @param cosmosFactory the cosmos db factory
* @param cosmosConfig the cosmos config
* @param mappingCosmosConverter the mappingCosmosConverter
* @param cosmosAuditingHandler the auditing handler
*/
public ReactiveCosmosTemplate(CosmosFactory cosmosFactory,
CosmosConfig cosmosConfig,
MappingCosmosConverter mappingCosmosConverter,
IsNewAwareAuditingHandler cosmosAuditingHandler) {
Assert.notNull(cosmosFactory, "CosmosFactory must not be null!");
Assert.notNull(cosmosConfig, "CosmosConfig must not be null!");
Assert.notNull(mappingCosmosConverter, "MappingCosmosConverter must not be null!");
this.mappingCosmosConverter = mappingCosmosConverter;
this.cosmosAsyncClient = cosmosFactory.getCosmosAsyncClient();
this.databaseName = cosmosFactory.getDatabaseName();
this.responseDiagnosticsProcessor = cosmosConfig.getResponseDiagnosticsProcessor();
this.queryMetricsEnabled = cosmosConfig.isQueryMetricsEnabled();
this.cosmosAuditingHandler = cosmosAuditingHandler;
}
/**
* Initialization
*
* @param cosmosFactory must not be {@literal null}
* @param cosmosConfig must not be {@literal null}
* @param mappingCosmosConverter must not be {@literal null}
*/
public ReactiveCosmosTemplate(CosmosFactory cosmosFactory,
CosmosConfig cosmosConfig,
MappingCosmosConverter mappingCosmosConverter) {
this(cosmosFactory, cosmosConfig, mappingCosmosConverter, null);
}
/**
* @param applicationContext the application context
* @throws BeansException the bean exception
*/
public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
// NOTE: When application context instance variable gets introduced, assign it here.
}
/**
* Creates a container if it doesn't already exist
*
* @param information the CosmosEntityInformation
* @return Mono containing CosmosContainerResponse
*/
@Override
public Mono<CosmosContainerResponse> createContainerIfNotExists(CosmosEntityInformation<?, ?> information) {
return cosmosAsyncClient
.createDatabaseIfNotExists(this.databaseName)
.publishOn(Schedulers.parallel())
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to create database", throwable))
.flatMap(cosmosDatabaseResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosDatabaseResponse.getDiagnostics(), null);
final CosmosContainerProperties cosmosContainerProperties =
new CosmosContainerProperties(information.getContainerName(), information.getPartitionKeyPath());
cosmosContainerProperties.setDefaultTimeToLiveInSeconds(information.getTimeToLive());
cosmosContainerProperties.setIndexingPolicy(information.getIndexingPolicy());
CosmosAsyncDatabase database =
cosmosAsyncClient.getDatabase(cosmosDatabaseResponse.getProperties().getId());
Mono<CosmosContainerResponse> cosmosContainerResponseMono;
if (information.getRequestUnit() == null) {
cosmosContainerResponseMono =
database.createContainerIfNotExists(cosmosContainerProperties);
} else {
ThroughputProperties throughputProperties =
ThroughputProperties.createManualThroughput(information.getRequestUnit());
cosmosContainerResponseMono =
database.createContainerIfNotExists(cosmosContainerProperties,
throughputProperties);
}
return cosmosContainerResponseMono
.map(cosmosContainerResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosContainerResponse.getDiagnostics(), null);
return cosmosContainerResponse;
})
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to create container",
throwable));
});
}
/**
* Find all items in a given container
*
* @param containerName the containerName
* @param domainType the domainType
* @return Flux with all the found items or error
*/
@Override
public <T> Flux<T> findAll(String containerName, Class<T> domainType) {
final CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
return find(query, domainType, containerName);
}
/**
* Find all items in a given container
*
* @param domainType the domainType
* @return Flux with all the found items or error
*/
@Override
public <T> Flux<T> findAll(Class<T> domainType) {
return findAll(domainType.getSimpleName(), domainType);
}
@Override
public <T> Flux<T> findAll(PartitionKey partitionKey, Class<T> domainType) {
Assert.notNull(partitionKey, "partitionKey should not be null");
Assert.notNull(domainType, "domainType should not be null");
final String containerName = getContainerName(domainType);
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setPartitionKey(partitionKey);
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems("SELECT * FROM r", cosmosQueryRequestOptions, JsonNode.class)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemFeedResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.getResults());
})
.map(cosmosItemProperties -> toDomainObject(domainType, cosmosItemProperties))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable));
}
/**
* Find by id
*
* @param id the id
* @param domainType the domainType
* @return Mono with the item or error
*/
@Override
public <T> Mono<T> findById(Object id, Class<T> domainType) {
Assert.notNull(domainType, "domainType should not be null");
return findById(getContainerName(domainType), id, domainType);
}
/**
* Find by id
*
* @param containerName the container name
* @param id the id
* @param domainType the entity class
* @return Mono with the item or error
*/
@Override
public <T> Mono<T> findById(String containerName, Object id, Class<T> domainType) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Assert.notNull(domainType, "domainType should not be null");
final String query = String.format("select * from root where root.id = '%s'",
CosmosUtils.getStringIDValue(id));
final CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setQueryMetricsEnabled(this.queryMetricsEnabled);
return cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(query, options, JsonNode.class)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemFeedResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemFeedResponse.getCosmosDiagnostics(),
cosmosItemFeedResponse);
return Mono.justOrEmpty(cosmosItemFeedResponse
.getResults()
.stream()
.map(cosmosItem -> toDomainObject(domainType, cosmosItem))
.findFirst());
})
.onErrorResume(throwable ->
CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", throwable))
.next();
}
/**
* Find by id
*
* @param id the id
* @param domainType the entity class
* @param partitionKey partition Key
* @return Mono with the item or error
*/
@Override
public <T> Mono<T> findById(Object id, Class<T> domainType, PartitionKey partitionKey) {
Assert.notNull(domainType, "domainType should not be null");
String idToFind = CosmosUtils.getStringIDValue(id);
final String containerName = getContainerName(domainType);
return cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.readItem(idToFind, partitionKey, JsonNode.class)
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemResponse.getDiagnostics(), null);
return Mono.justOrEmpty(toDomainObject(domainType,
cosmosItemResponse.getItem()));
})
.onErrorResume(throwable ->
CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", throwable));
}
/**
* Insert
*
* @param <T> type of inserted objectToSave
* @param objectToSave the object to save
* @param partitionKey the partition key
* @return Mono with the item or error
*/
public <T> Mono<T> insert(T objectToSave, PartitionKey partitionKey) {
return insert(getContainerName(objectToSave.getClass()), objectToSave, partitionKey);
}
/**
* Insert
*
* @param objectToSave the object to save
* @param <T> type of inserted objectToSave
* @return Mono with the item or error
*/
public <T> Mono<T> insert(T objectToSave) {
return insert(getContainerName(objectToSave.getClass()), objectToSave, null);
}
/**
* Insert
*
* @param <T> type of inserted objectToSave
* @param containerName the container name
* @param objectToSave the object to save
* @param partitionKey the partition key
* @return Mono with the item or error
*/
public <T> Mono<T> insert(String containerName, Object objectToSave,
PartitionKey partitionKey) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Assert.notNull(objectToSave, "objectToSave should not be null");
final Class<T> domainType = (Class<T>) objectToSave.getClass();
generateIdIfNullAndAutoGenerationEnabled(objectToSave, domainType);
final JsonNode originalItem = prepareToPersistAndConvertToItemProperties(objectToSave);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
// if the partition key is null, SDK will get the partitionKey from the object
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.createItem(originalItem, partitionKey, options)
.publishOn(Schedulers.parallel())
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to insert item", throwable))
.flatMap(cosmosItemResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemResponse.getDiagnostics(), null);
return Mono.just(toDomainObject(domainType, cosmosItemResponse.getItem()));
});
}
/**
* Insert
*
* @param <T> type of inserted objectToSave
* @param containerName the container name
* @param objectToSave the object to save
* @return Mono with the item or error
*/
@Override
public <T> Mono<T> insert(String containerName, T objectToSave) {
return insert(containerName, objectToSave, null);
}
@SuppressWarnings("unchecked")
private <T> void generateIdIfNullAndAutoGenerationEnabled(T originalItem, Class<?> type) {
CosmosEntityInformation<?, ?> entityInfo = CosmosEntityInformation.getInstance(type);
if (entityInfo.shouldGenerateId() && ReflectionUtils.getField(entityInfo.getIdField(), originalItem) == null) {
ReflectionUtils.setField(entityInfo.getIdField(), originalItem, UUID.randomUUID().toString());
}
}
/**
* Upsert
*
* @param object the object to upsert
* @return Mono with the item or error
*/
@Override
public <T> Mono<T> upsert(T object) {
return upsert(getContainerName(object.getClass()), object);
}
/**
* Upsert
*
* @param containerName the container name
* @param object the object to save
* @return Mono with the item or error
*/
@Override
public <T> Mono<T> upsert(String containerName, T object) {
final Class<T> domainType = (Class<T>) object.getClass();
final JsonNode originalItem = prepareToPersistAndConvertToItemProperties(object);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
applyVersioning(object.getClass(), originalItem, options);
return cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.upsertItem(originalItem, options)
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemResponse.getDiagnostics(), null);
return Mono.just(toDomainObject(domainType,
cosmosItemResponse.getItem()));
})
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to upsert item", throwable));
}
/**
* Deletes the item with id and partition key.
*
* @param containerName Container name of database
* @param id item id
* @param partitionKey the partition key
*/
@Override
public Mono<Void> deleteById(String containerName, Object id, PartitionKey partitionKey) {
return deleteById(containerName, id, partitionKey, new CosmosItemRequestOptions());
}
private Mono<Void> deleteById(String containerName, Object id, PartitionKey partitionKey,
CosmosItemRequestOptions cosmosItemRequestOptions) {
Assert.hasText(containerName, "container name should not be null, empty or only whitespaces");
String idToDelete = CosmosUtils.getStringIDValue(id);
if (partitionKey == null) {
partitionKey = PartitionKey.NONE;
}
return cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.deleteItem(idToDelete, partitionKey, cosmosItemRequestOptions)
.publishOn(Schedulers.parallel())
.doOnNext(cosmosItemResponse ->
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemResponse.getDiagnostics(), null))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to delete item", throwable))
.then();
}
/**
* Deletes the entity
*
* @param <T> type class of domain type
* @param containerName Container name of database
* @param entity the entity to delete
* @return void Mono
*/
public <T> Mono<Void> deleteEntity(String containerName, T entity) {
Assert.notNull(entity, "entity to be deleted should not be null");
@SuppressWarnings("unchecked")
final Class<T> domainType = (Class<T>) entity.getClass();
final JsonNode originalItem = mappingCosmosConverter.writeJsonNode(entity);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
applyVersioning(entity.getClass(), originalItem, options);
return deleteItem(originalItem, containerName, domainType).then();
}
/**
* Delete all items in a container
*
* @param containerName the container name
* @param domainType the domainType
* @return void Mono
*/
@Override
public Mono<Void> deleteAll(@NonNull String containerName, @NonNull Class<?> domainType) {
Assert.hasText(containerName, "container name should not be null, empty or only whitespaces");
final CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
return this.delete(query, domainType, containerName).then();
}
/**
* Delete items matching query
*
* @param query the document query
* @param domainType the entity class
* @param containerName the container name
* @return Mono
*/
@Override
public <T> Flux<T> delete(CosmosQuery query, Class<T> domainType, String containerName) {
Assert.notNull(query, "DocumentQuery should not be null.");
Assert.notNull(domainType, "domainType should not be null.");
Assert.hasText(containerName, "container name should not be null, empty or only whitespaces");
final Flux<JsonNode> results = findItems(query, containerName);
return results.flatMap(d -> deleteItem(d, containerName, domainType));
}
/**
* Find items
*
* @param query the document query
* @param domainType the entity class
* @param containerName the container name
* @return Flux with found items or error
*/
@Override
public <T> Flux<T> find(CosmosQuery query, Class<T> domainType, String containerName) {
return findItems(query, containerName)
.map(cosmosItemProperties -> toDomainObject(domainType, cosmosItemProperties));
}
/**
* Exists
*
* @param query the document query
* @param domainType the entity class
* @param containerName the container name
* @return Mono with a boolean or error
*/
@Override
public Mono<Boolean> exists(CosmosQuery query, Class<?> domainType, String containerName) {
return count(query, containerName).flatMap(count -> Mono.just(count > 0));
}
/**
* Exists
*
* @param id the id
* @param domainType the entity class
* @param containerName the container name
* @return Mono with a boolean or error
*/
public Mono<Boolean> existsById(Object id, Class<?> domainType, String containerName) {
return findById(containerName, id, domainType)
.flatMap(o -> Mono.just(o != null));
}
/**
* Count
*
* @param containerName the container name
* @return Mono with the count or error
*/
@Override
public Mono<Long> count(String containerName) {
final CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
return count(query, containerName);
}
/**
* Count
*
* @param query the document query
* @param containerName the container name
* @return Mono with count or error
*/
@Override
public Mono<Long> count(CosmosQuery query, String containerName) {
return getCountValue(query, containerName);
}
@Override
public MappingCosmosConverter getConverter() {
return mappingCosmosConverter;
}
@Override
public <T> Flux<T> runQuery(SqlQuerySpec querySpec, Class<?> domainType, Class<T> returnType) {
String containerName = getContainerName(domainType);
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
return cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(querySpec, options, returnType)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemFeedResponse -> {
CosmosUtils
.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemFeedResponse.getCosmosDiagnostics(),
cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.getResults());
})
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable));
}
private Mono<Long> getCountValue(CosmosQuery query, String containerName) {
final SqlQuerySpec querySpec = new CountQueryGenerator().generateCosmos(query);
final CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setQueryMetricsEnabled(this.queryMetricsEnabled);
return executeQuery(querySpec, containerName, options)
.doOnNext(feedResponse -> CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
feedResponse.getCosmosDiagnostics(), feedResponse))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to get count value", throwable))
.next()
.map(r -> r.getResults().get(0).asLong());
}
private Flux<FeedResponse<JsonNode>> executeQuery(SqlQuerySpec sqlQuerySpec,
String containerName,
CosmosQueryRequestOptions options) {
return cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(sqlQuerySpec, options, JsonNode.class)
.byPage()
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to execute query", throwable));
}
/**
* Delete container with container name
*
* @param containerName the container name
*/
@Override
public void deleteContainer(@NonNull String containerName) {
Assert.hasText(containerName, "containerName should have text.");
cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.delete()
.doOnNext(cosmosContainerResponse ->
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosContainerResponse.getDiagnostics(), null))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to delete container",
throwable))
.block();
}
/**
* @param domainType the domain class
* @return the container name
*/
public String getContainerName(Class<?> domainType) {
Assert.notNull(domainType, "domainType should not be null");
return CosmosEntityInformation.getInstance(domainType).getContainerName();
}
private JsonNode prepareToPersistAndConvertToItemProperties(Object object) {
if (cosmosAuditingHandler != null) {
cosmosAuditingHandler.markAudited(object);
}
return mappingCosmosConverter.writeJsonNode(object);
}
private Flux<JsonNode> findItems(@NonNull CosmosQuery query,
@NonNull String containerName) {
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, JsonNode.class)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemFeedResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.getResults());
})
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to query items", throwable));
}
private <T> Mono<T> deleteItem(@NonNull JsonNode jsonNode,
String containerName,
@NonNull Class<T> domainType) {
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
applyVersioning(domainType, jsonNode, options);
return cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.deleteItem(jsonNode, options)
.publishOn(Schedulers.parallel())
.map(cosmosItemResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemResponse.getDiagnostics(), null);
return cosmosItemResponse;
})
.flatMap(objectCosmosItemResponse -> Mono.just(toDomainObject(domainType, jsonNode)))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to delete item", throwable));
}
private <T> T toDomainObject(@NonNull Class<T> domainType, JsonNode jsonNode) {
return mappingCosmosConverter.read(domainType, jsonNode);
}
private void applyVersioning(Class<?> domainType,
JsonNode jsonNode,
CosmosItemRequestOptions options) {
CosmosEntityInformation<?, ?> entityInformation = CosmosEntityInformation.getInstance(domainType);
if (entityInformation.isVersioned()) {
options.setIfMatchETag(jsonNode.get(Constants.ETAG_PROPERTY_DEFAULT_NAME).asText());
}
}
}