CosmosTemplate.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.spring.data.cosmos.core;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.spring.data.cosmos.Constants;
import com.azure.spring.data.cosmos.CosmosFactory;
import com.azure.spring.data.cosmos.common.CosmosUtils;
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.core.convert.MappingCosmosConverter;
import com.azure.spring.data.cosmos.core.generator.CountQueryGenerator;
import com.azure.spring.data.cosmos.core.generator.FindQuerySpecGenerator;
import com.azure.spring.data.cosmos.core.query.CosmosPageImpl;
import com.azure.spring.data.cosmos.core.query.CosmosPageRequest;
import com.azure.spring.data.cosmos.core.query.CosmosQuery;
import com.azure.spring.data.cosmos.core.query.Criteria;
import com.azure.spring.data.cosmos.core.query.CriteriaType;
import com.azure.spring.data.cosmos.exception.CosmosExceptionUtils;
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.auditing.IsNewAwareAuditingHandler;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.query.parser.Part;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Template class for cosmos db
*/
public class CosmosTemplate implements CosmosOperations, ApplicationContextAware {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosTemplate.class);
private final MappingCosmosConverter mappingCosmosConverter;
private final IsNewAwareAuditingHandler cosmosAuditingHandler;
private final String databaseName;
private final ResponseDiagnosticsProcessor responseDiagnosticsProcessor;
private final boolean queryMetricsEnabled;
private final CosmosAsyncClient cosmosAsyncClient;
/**
* Initialization
*
* @param client must not be {@literal null}
* @param databaseName must not be {@literal null}
* @param cosmosConfig must not be {@literal null}
* @param mappingCosmosConverter must not be {@literal null}
* @param cosmosAuditingHandler can be {@literal null}
*/
public CosmosTemplate(CosmosAsyncClient client, String databaseName,
CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter,
IsNewAwareAuditingHandler cosmosAuditingHandler) {
this(new CosmosFactory(client, databaseName), cosmosConfig, mappingCosmosConverter, cosmosAuditingHandler);
}
/**
* Initialization
*
* @param client must not be {@literal null}
* @param databaseName must not be {@literal null}
* @param cosmosConfig must not be {@literal null}
* @param mappingCosmosConverter must not be {@literal null}
*/
public CosmosTemplate(CosmosAsyncClient client, String databaseName,
CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter) {
this(new CosmosFactory(client, databaseName), cosmosConfig, mappingCosmosConverter, null);
}
/**
* Initialization
*
* @param cosmosFactory must not be {@literal null}
* @param cosmosConfig must not be {@literal null}
* @param mappingCosmosConverter must not be {@literal null}
* @param cosmosAuditingHandler can be {@literal null}
*/
public CosmosTemplate(CosmosFactory cosmosFactory,
CosmosConfig cosmosConfig,
MappingCosmosConverter mappingCosmosConverter,
IsNewAwareAuditingHandler cosmosAuditingHandler) {
Assert.notNull(cosmosFactory, "CosmosFactory must not be null!");
Assert.notNull(mappingCosmosConverter, "MappingCosmosConverter must not be null!");
this.mappingCosmosConverter = mappingCosmosConverter;
this.cosmosAuditingHandler = cosmosAuditingHandler;
this.cosmosAsyncClient = cosmosFactory.getCosmosAsyncClient();
this.databaseName = cosmosFactory.getDatabaseName();
this.responseDiagnosticsProcessor = cosmosConfig.getResponseDiagnosticsProcessor();
this.queryMetricsEnabled = cosmosConfig.isQueryMetricsEnabled();
}
/**
* Initialization
*
* @param cosmosFactory must not be {@literal null}
* @param cosmosConfig must not be {@literal null}
* @param mappingCosmosConverter must not be {@literal null}
*/
public CosmosTemplate(CosmosFactory cosmosFactory,
CosmosConfig cosmosConfig,
MappingCosmosConverter mappingCosmosConverter) {
this(cosmosFactory, cosmosConfig, mappingCosmosConverter, null);
}
/**
* Sets the application context
*
* @param applicationContext must not be {@literal null}
* @throws BeansException the bean exception
*/
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
}
/**
* Inserts item
*
* @param objectToSave must not be {@literal null}
* @param partitionKey must not be {@literal null}
* @param <T> type class of domain type
* @return the inserted item
*/
public <T> T insert(T objectToSave, PartitionKey partitionKey) {
return insert(getContainerName(objectToSave.getClass()), objectToSave, partitionKey);
}
/**
* Inserts item into the given container
*
* @param containerName must not be {@literal null}
* @param objectToSave must not be {@literal null}
* @param <T> type class of domain type
* @return the inserted item
*/
@Override
public <T> T insert(String containerName, T objectToSave) {
return insert(containerName, objectToSave, null);
}
/**
* Inserts item into the given container
*
* @param containerName must not be {@literal null}
* @param objectToSave must not be {@literal null}
* @param partitionKey must not be {@literal null}
* @param <T> type class of domain type
* @return the inserted item
*/
public <T> T insert(String containerName, T objectToSave, PartitionKey partitionKey) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Assert.notNull(objectToSave, "objectToSave should not be null");
@SuppressWarnings("unchecked") final Class<T> domainType = (Class<T>) objectToSave.getClass();
generateIdIfNullAndAutoGenerationEnabled(objectToSave, domainType);
final JsonNode originalItem = prepareToPersistAndConvertToItemProperties(objectToSave);
LOGGER.debug("execute createItem in database {} container {}", this.databaseName,
containerName);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
// if the partition key is null, SDK will get the partitionKey from the object
final CosmosItemResponse<JsonNode> response = cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.createItem(originalItem, partitionKey, options)
.publishOn(Schedulers.parallel())
.doOnNext(cosmosItemResponse ->
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemResponse.getDiagnostics(), null))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to insert item", throwable))
.block();
assert response != null;
return toDomainObject(domainType, response.getItem());
}
@SuppressWarnings("unchecked")
private <T> void generateIdIfNullAndAutoGenerationEnabled(T originalItem, Class<?> type) {
CosmosEntityInformation<?, ?> entityInfo = CosmosEntityInformation.getInstance(type);
if (entityInfo.shouldGenerateId() && ReflectionUtils.getField(entityInfo.getIdField(), originalItem) == null) {
ReflectionUtils.setField(entityInfo.getIdField(), originalItem, UUID.randomUUID().toString());
}
}
/**
* Finds item by id
*
* @param id must not be {@literal null}
* @param domainType must not be {@literal null}
* @param <T> type class of domain type
* @return found item
*/
public <T> T findById(Object id, Class<T> domainType) {
Assert.notNull(domainType, "domainType should not be null");
return findById(getContainerName(domainType), id, domainType);
}
@Override
public <T> T findById(Object id, Class<T> domainType, PartitionKey partitionKey) {
Assert.notNull(domainType, "domainType should not be null");
Assert.notNull(partitionKey, "partitionKey should not be null");
String idToQuery = CosmosUtils.getStringIDValue(id);
final String containerName = getContainerName(domainType);
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.readItem(idToQuery, partitionKey, JsonNode.class)
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemResponse.getDiagnostics(), null);
return Mono.justOrEmpty(toDomainObject(domainType, cosmosItemResponse.getItem()));
})
.onErrorResume(throwable ->
CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", throwable))
.block();
}
/**
* Finds item by id
*
* @param containerName must not be {@literal null}
* @param id must not be {@literal null}
* @param domainType must not be {@literal null}
* @param <T> type class of domain type
* @return found item
*/
public <T> T findById(String containerName, Object id, Class<T> domainType) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Assert.notNull(domainType, "domainType should not be null");
final String query = String.format("select * from root where root.id = '%s'",
CosmosUtils.getStringIDValue(id));
final CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setQueryMetricsEnabled(this.queryMetricsEnabled);
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(query, options, JsonNode.class)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemFeedResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
return Mono.justOrEmpty(cosmosItemFeedResponse
.getResults()
.stream()
.map(cosmosItem -> toDomainObject(domainType, cosmosItem))
.findFirst());
})
.onErrorResume(throwable ->
CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", throwable))
.blockFirst();
}
/**
* Upserts an item with partition key
*
* @param object upsert object
* @param <T> type of upsert object
*/
public <T> void upsert(T object) {
Assert.notNull(object, "Upsert object should not be null");
upsert(getContainerName(object.getClass()), object);
}
/**
* Upserts an item into container with partition key
*
* @param containerName the container name
* @param object upsert object
* @param <T> type of upsert object
*/
public <T> void upsert(String containerName, T object) {
upsertAndReturnEntity(containerName, object);
}
/**
* Upserts an item and return item properties
*
* @param containerName the container name
* @param object upsert object
* @param <T> type of upsert object
* @return upsert object entity
*/
public <T> T upsertAndReturnEntity(String containerName, T object) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Assert.notNull(object, "Upsert object should not be null");
final JsonNode originalItem = prepareToPersistAndConvertToItemProperties(object);
LOGGER.debug("execute upsert item in database {} container {}", this.databaseName,
containerName);
@SuppressWarnings("unchecked") final Class<T> domainType = (Class<T>) object.getClass();
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
applyVersioning(domainType, originalItem, options);
final CosmosItemResponse<JsonNode> cosmosItemResponse = cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.upsertItem(originalItem, options)
.publishOn(Schedulers.parallel())
.doOnNext(response -> CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
response.getDiagnostics(), null))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to upsert item", throwable))
.block();
assert cosmosItemResponse != null;
return toDomainObject(domainType, cosmosItemResponse.getItem());
}
/**
* Find the DocumentQuery, find all the items specified by domain type.
*
* @param domainType the domain type
* @param <T> class type of domain
* @return found results in a List
*/
public <T> Iterable<T> findAll(Class<T> domainType) {
Assert.notNull(domainType, "domainType should not be null");
return findAll(getContainerName(domainType), domainType);
}
/**
* Find the DocumentQuery, find all the items specified by domain type in the given container.
*
* @param containerName the container name
* @param domainType the domain type
* @param <T> class type of domain
* @return found results in a List
*/
public <T> Iterable<T> findAll(String containerName, final Class<T> domainType) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Assert.notNull(domainType, "domainType should not be null");
final CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
return findItems(query, containerName, domainType);
}
@Override
public <T> Iterable<T> findAll(PartitionKey partitionKey, final Class<T> domainType) {
Assert.notNull(partitionKey, "partitionKey should not be null");
Assert.notNull(domainType, "domainType should not be null");
final String containerName = getContainerName(domainType);
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setPartitionKey(partitionKey);
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems("SELECT * FROM r", cosmosQueryRequestOptions, JsonNode.class)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemFeedResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.getResults());
})
.map(jsonNode -> toDomainObject(domainType, jsonNode))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable))
.toIterable();
}
/**
* Delete the DocumentQuery, delete all the items in the given container.
*
* @param containerName Container name of database
* @param domainType the domain type
*/
public void deleteAll(@NonNull String containerName, @NonNull Class<?> domainType) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
final CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
this.delete(query, domainType, containerName);
}
@Override
public void deleteContainer(@NonNull String containerName) {
Assert.hasText(containerName, "containerName should have text.");
cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.delete()
.publishOn(Schedulers.parallel())
.doOnNext(response -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
response.getDiagnostics(), null);
})
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to delete container", throwable))
.block();
}
@Override
public String getContainerName(Class<?> domainType) {
Assert.notNull(domainType, "domainType should not be null");
return CosmosEntityInformation.getInstance(domainType).getContainerName();
}
@Override
public CosmosContainerProperties createContainerIfNotExists(CosmosEntityInformation<?, ?> information) {
final CosmosContainerResponse response = cosmosAsyncClient
.createDatabaseIfNotExists(this.databaseName)
.publishOn(Schedulers.parallel())
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to create database", throwable))
.flatMap(cosmosDatabaseResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosDatabaseResponse.getDiagnostics(), null);
final CosmosContainerProperties cosmosContainerProperties =
new CosmosContainerProperties(information.getContainerName(), information.getPartitionKeyPath());
cosmosContainerProperties.setDefaultTimeToLiveInSeconds(information.getTimeToLive());
cosmosContainerProperties.setIndexingPolicy(information.getIndexingPolicy());
CosmosAsyncDatabase cosmosAsyncDatabase = cosmosAsyncClient
.getDatabase(cosmosDatabaseResponse.getProperties().getId());
Mono<CosmosContainerResponse> cosmosContainerResponseMono;
if (information.getRequestUnit() == null) {
cosmosContainerResponseMono =
cosmosAsyncDatabase.createContainerIfNotExists(cosmosContainerProperties);
} else {
ThroughputProperties throughputProperties =
ThroughputProperties.createManualThroughput(information.getRequestUnit());
cosmosContainerResponseMono =
cosmosAsyncDatabase.createContainerIfNotExists(cosmosContainerProperties,
throughputProperties);
}
return cosmosContainerResponseMono
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to create container",
throwable))
.doOnNext(cosmosContainerResponse ->
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosContainerResponse.getDiagnostics(), null));
})
.block();
assert response != null;
return response.getProperties();
}
/**
* Deletes the item by id and partition key.
*
* @param containerName Container name of database
* @param id item id
* @param partitionKey the partition key
*/
public void deleteById(String containerName, Object id, PartitionKey partitionKey) {
deleteById(containerName, id, partitionKey, new CosmosItemRequestOptions());
}
/**
* Deletes the entity
*
* @param <T> type class of domain type
* @param containerName the container name
* @param entity the entity object
*/
@Override
public <T> void deleteEntity(String containerName, T entity) {
Assert.notNull(entity, "entity to be deleted should not be null");
@SuppressWarnings("unchecked")
final Class<T> domainType = (Class<T>) entity.getClass();
final JsonNode originalItem = mappingCosmosConverter.writeJsonNode(entity);
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
applyVersioning(entity.getClass(), originalItem, options);
deleteItem(originalItem, containerName, domainType);
}
private void deleteById(String containerName, Object id, PartitionKey partitionKey,
CosmosItemRequestOptions options) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
String idToDelete = CosmosUtils.getStringIDValue(id);
LOGGER.debug("execute deleteById in database {} container {}", this.databaseName,
containerName);
if (partitionKey == null) {
partitionKey = PartitionKey.NONE;
}
cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.deleteItem(idToDelete, partitionKey, options)
.publishOn(Schedulers.parallel())
.doOnNext(response ->
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
response.getDiagnostics(), null))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to delete item",
throwable))
.block();
}
@Override
public <T, ID> Iterable<T> findByIds(Iterable<ID> ids, Class<T> domainType, String containerName) {
Assert.notNull(ids, "Id list should not be null");
Assert.notNull(domainType, "domainType should not be null.");
Assert.hasText(containerName, "container should not be null, empty or only whitespaces");
final List<Object> idList = new ArrayList<>();
for (ID id : ids) {
idList.add(CosmosUtils.getStringIDValue(id));
}
final CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.IN, "id",
Collections.singletonList(idList), Part.IgnoreCaseType.NEVER));
return find(query, domainType, containerName);
}
/**
* Finds the document query items
*
* @param query The representation for query method.
* @param domainType Class of domain
* @param containerName Container name of database
* @param <T> class of domainType
* @return All the found items as List.
*/
public <T> Iterable<T> find(@NonNull CosmosQuery query, @NonNull Class<T> domainType,
String containerName) {
Assert.notNull(query, "DocumentQuery should not be null.");
Assert.notNull(domainType, "domainType should not be null.");
Assert.hasText(containerName, "container should not be null, empty or only whitespaces");
return findItems(query, containerName, domainType);
}
/**
* Checks if document query items exist
*
* @param query The representation for query method.
* @param domainType Class of domain
* @param containerName Container name of database
* @param <T> class of domainType
* @return if items exist
*/
public <T> Boolean exists(@NonNull CosmosQuery query, @NonNull Class<T> domainType,
String containerName) {
return this.count(query, containerName) > 0;
}
/**
* Delete the DocumentQuery, need to query the domains at first, then delete the item from the result. The cosmos db
* Sql API do _NOT_ support DELETE query, we cannot add one DeleteQueryGenerator.
*
* @param query The representation for query method.
* @param domainType Class of domain
* @param containerName Container name of database
* @param <T> class of domainType
* @return All the deleted items as List.
*/
@Override
public <T> Iterable<T> delete(@NonNull CosmosQuery query, @NonNull Class<T> domainType,
@NonNull String containerName) {
Assert.notNull(query, "DocumentQuery should not be null.");
Assert.notNull(domainType, "domainType should not be null.");
Assert.hasText(containerName, "container should not be null, empty or only whitespaces");
final List<JsonNode> results = findItems(query, containerName);
return results.stream()
.map(item -> deleteItem(item, containerName, domainType))
.collect(Collectors.toList());
}
@Override
public <T> Page<T> findAll(Pageable pageable, Class<T> domainType, String containerName) {
final CosmosQuery query =
new CosmosQuery(Criteria.getInstance(CriteriaType.ALL)).with(pageable);
if (pageable.getSort().isSorted()) {
query.with(pageable.getSort());
}
return paginationQuery(query, domainType, containerName);
}
@Override
public <T> Page<T> paginationQuery(CosmosQuery query, Class<T> domainType,
String containerName) {
Assert.isTrue(query.getPageable().getPageSize() > 0,
"pageable should have page size larger than 0");
Assert.hasText(containerName, "container should not be null, empty or only whitespaces");
final Pageable pageable = query.getPageable();
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
CosmosAsyncContainer container =
cosmosAsyncClient.getDatabase(this.databaseName).getContainer(containerName);
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
Flux<FeedResponse<JsonNode>> feedResponseFlux;
if (pageable instanceof CosmosPageRequest) {
feedResponseFlux = container
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, JsonNode.class)
.byPage(((CosmosPageRequest) pageable).getRequestContinuation(),
pageable.getPageSize());
} else {
feedResponseFlux = container
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, JsonNode.class)
.byPage(pageable.getPageSize());
}
final FeedResponse<JsonNode> feedResponse = feedResponseFlux
.publishOn(Schedulers.parallel())
.doOnNext(propertiesFeedResponse ->
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
propertiesFeedResponse.getCosmosDiagnostics(), propertiesFeedResponse))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to query items", throwable))
.next()
.block();
assert feedResponse != null;
final Iterator<JsonNode> it = feedResponse.getResults().iterator();
final List<T> result = new ArrayList<>();
for (int index = 0; it.hasNext()
&& index < pageable.getPageSize(); index++) {
final JsonNode jsonNode = it.next();
if (jsonNode == null) {
continue;
}
final T entity = mappingCosmosConverter.read(domainType, jsonNode);
result.add(entity);
}
final long total = count(query, containerName);
final int contentSize = result.size();
int pageSize;
if (contentSize < pageable.getPageSize()
&& contentSize > 0) {
// If the content size is less than page size,
// this means, cosmosDB is returning less items than page size,
// because of either RU limit, or payload limit
// Set the page size to content size.
pageSize = contentSize;
} else {
pageSize = pageable.getPageSize();
}
final CosmosPageRequest pageRequest = CosmosPageRequest.of(pageable.getOffset(),
pageable.getPageNumber(),
pageSize,
feedResponse.getContinuationToken(),
query.getSort());
return new CosmosPageImpl<>(result, pageRequest, total);
}
@Override
public long count(String containerName) {
Assert.hasText(containerName, "container name should not be empty");
final CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
final Long count = getCountValue(query, containerName);
assert count != null;
return count;
}
@Override
public <T> long count(CosmosQuery query, String containerName) {
Assert.hasText(containerName, "container name should not be empty");
final Long count = getCountValue(query, containerName);
assert count != null;
return count;
}
@Override
public MappingCosmosConverter getConverter() {
return this.mappingCosmosConverter;
}
@Override
public <T> Iterable<T> runQuery(SqlQuerySpec querySpec, Class<?> domainType, Class<T> returnType) {
return getJsonNodeFluxFromQuerySpec(getContainerName(domainType), querySpec, returnType)
.collectList()
.block();
}
private JsonNode prepareToPersistAndConvertToItemProperties(Object object) {
if (cosmosAuditingHandler != null) {
cosmosAuditingHandler.markAudited(object);
}
return mappingCosmosConverter.writeJsonNode(object);
}
private Long getCountValue(CosmosQuery query, String containerName) {
final SqlQuerySpec querySpec = new CountQueryGenerator().generateCosmos(query);
final CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setQueryMetricsEnabled(this.queryMetricsEnabled);
return executeQuery(querySpec, containerName, options)
.publishOn(Schedulers.parallel())
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to get count value", throwable))
.doOnNext(response -> CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
response.getCosmosDiagnostics(), response))
.next()
.map(r -> r.getResults().get(0).asLong())
.block();
}
private Flux<FeedResponse<JsonNode>> executeQuery(SqlQuerySpec sqlQuerySpec,
String containerName,
CosmosQueryRequestOptions options) {
return cosmosAsyncClient.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(sqlQuerySpec, options, JsonNode.class)
.byPage();
}
private Flux<JsonNode> findItemsAsFlux(@NonNull CosmosQuery query,
@NonNull String containerName) {
final SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, JsonNode.class)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemFeedResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemFeedResponse.getCosmosDiagnostics(),
cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.getResults());
})
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable));
}
private <T> Flux<T> getJsonNodeFluxFromQuerySpec(
@NonNull String containerName, SqlQuerySpec sqlQuerySpec, Class<T> classType) {
final CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, classType)
.byPage()
.publishOn(Schedulers.parallel())
.flatMap(cosmosItemFeedResponse -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemFeedResponse.getCosmosDiagnostics(),
cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.getResults());
})
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable));
}
private List<JsonNode> findItems(@NonNull CosmosQuery query,
@NonNull String containerName) {
return findItemsAsFlux(query, containerName)
.collectList()
.block();
}
private <T> Iterable<T> findItems(@NonNull CosmosQuery query,
@NonNull String containerName,
@NonNull Class<T> domainType) {
return findItemsAsFlux(query, containerName)
.map(jsonNode -> toDomainObject(domainType, jsonNode))
.toIterable();
}
private <T> T deleteItem(@NonNull JsonNode jsonNode,
String containerName,
@NonNull Class<T> domainType) {
final CosmosItemRequestOptions options = new CosmosItemRequestOptions();
applyVersioning(domainType, jsonNode, options);
return cosmosAsyncClient
.getDatabase(this.databaseName)
.getContainer(containerName)
.deleteItem(jsonNode, options)
.publishOn(Schedulers.parallel())
.doOnNext(response -> CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
response.getDiagnostics(), null))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to delete item", throwable))
.flatMap(objectCosmosItemResponse -> Mono.just(toDomainObject(domainType, jsonNode)))
.block();
}
private <T> T toDomainObject(@NonNull Class<T> domainType, JsonNode responseJsonNode) {
return mappingCosmosConverter.read(domainType, responseJsonNode);
}
private void applyVersioning(Class<?> domainType,
JsonNode jsonNode,
CosmosItemRequestOptions options) {
CosmosEntityInformation<?, ?> entityInformation = CosmosEntityInformation.getInstance(domainType);
if (entityInformation.isVersioned()) {
options.setIfMatchETag(jsonNode.get(Constants.ETAG_PROPERTY_DEFAULT_NAME).asText());
}
}
}