// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.


import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static;
import static;
import static;

 * CosmosAsyncContainer with encryption capabilities.
public class CosmosEncryptionAsyncContainer {
    private final Scheduler encryptionScheduler;
    private final CosmosResponseFactory responseFactory = new CosmosResponseFactory();
    private final CosmosAsyncContainer container;
    private final EncryptionProcessor encryptionProcessor;

    private final CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient;
    ImplementationBridgeHelpers.CosmosItemResponseHelper.CosmosItemResponseBuilderAccessor cosmosItemResponseBuilderAccessor;
    ImplementationBridgeHelpers.CosmosItemRequestOptionsHelper.CosmosItemRequestOptionsAccessor cosmosItemRequestOptionsAccessor;
    ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.CosmosQueryRequestOptionsAccessor cosmosQueryRequestOptionsAccessor;
    ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.CosmosChangeFeedRequestOptionsAccessor cosmosChangeFeedRequestOptionsAccessor;
    ImplementationBridgeHelpers.CosmosAsyncContainerHelper.CosmosAsyncContainerAccessor cosmosAsyncContainerAccessor;
    ImplementationBridgeHelpers.CosmosBatchHelper.CosmosBatchAccessor cosmosBatchAccessor;
    ImplementationBridgeHelpers.CosmosBatchResponseHelper.CosmosBatchResponseAccessor cosmosBatchResponseAccessor;
    ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.CosmosBatchOperationResultAccessor cosmosBatchOperationResultAccessor;
    ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.CosmosBatchRequestOptionsAccessor cosmosBatchRequestOptionsAccessor;
    ImplementationBridgeHelpers.CosmosPatchOperationsHelper.CosmosPatchOperationsAccessor cosmosPatchOperationsAccessor;

    CosmosEncryptionAsyncContainer(CosmosAsyncContainer container,
                                   CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient) {
        this.container = container;
        this.cosmosEncryptionAsyncClient = cosmosEncryptionAsyncClient;
        this.encryptionProcessor = new EncryptionProcessor(this.container, cosmosEncryptionAsyncClient);
        this.encryptionScheduler = Schedulers.parallel();
        this.cosmosItemResponseBuilderAccessor =
        this.cosmosItemRequestOptionsAccessor =
        this.cosmosQueryRequestOptionsAccessor =
        this.cosmosChangeFeedRequestOptionsAccessor =
        this.cosmosAsyncContainerAccessor =
        this.cosmosBatchAccessor = ImplementationBridgeHelpers.CosmosBatchHelper.getCosmosBatchAccessor();
        this.cosmosBatchResponseAccessor = ImplementationBridgeHelpers.CosmosBatchResponseHelper.getCosmosBatchResponseAccessor();
        this.cosmosBatchOperationResultAccessor = ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.getCosmosBatchOperationResultAccessor();
        this.cosmosBatchRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.getCosmosBatchRequestOptionsAccessor();
        this.cosmosPatchOperationsAccessor = ImplementationBridgeHelpers.CosmosPatchOperationsHelper.getCosmosPatchOperationsAccessor();

    EncryptionProcessor getEncryptionProcessor() {
        return this.encryptionProcessor;

     * Creates an item.
     * <p>
     * After subscription the operation will be performed. The {@link Mono} upon
     * successful completion will contain a single resource response with the
     * created Cosmos item. In case of failure the {@link Mono} will error.
     * @param <T> the type parameter.
     * @param item the Cosmos item represented as a POJO or Cosmos item object.
     * @return an {@link Mono} containing the single resource response with the
     * created Cosmos item or an error.
    public <T> Mono<CosmosItemResponse<T>> createItem(T item) {
        return createItem(item, new CosmosItemRequestOptions());

     * Creates a Cosmos item.
     * @param <T> the type parameter.
     * @param item the item.
     * @param requestOptions the item request options.
     * @return an {@link Mono} containing the single resource response with the created Cosmos item or an error.
    public <T> Mono<CosmosItemResponse<T>> createItem(T item,
                                                      CosmosItemRequestOptions requestOptions) {
        Preconditions.checkNotNull(item, "item");
        if (requestOptions == null) {
            requestOptions = new CosmosItemRequestOptions();
        byte[] streamPayload = cosmosSerializerToStream(item);
        return createItemHelper(streamPayload, null, requestOptions,(Class<T>) item.getClass(), false );


     * Creates an item.
     * <p>
     * After subscription the operation will be performed. The {@link Mono} upon
     * successful completion will contain a single resource response with the
     * created Cosmos item. In case of failure the {@link Mono} will error.
     * @param <T> the type parameter.
     * @param item the Cosmos item represented as a POJO or Cosmos item object.
     * @param partitionKey the partition key.
     * @param requestOptions the request options.
     * @return an {@link Mono} containing the single resource response with the created Cosmos item or an error.
    public <T> Mono<CosmosItemResponse<T>> createItem(T item,
                                                      PartitionKey partitionKey,
                                                      CosmosItemRequestOptions requestOptions) {
        Preconditions.checkNotNull(item, "item");
        if (requestOptions == null) {
            requestOptions = new CosmosItemRequestOptions();

        Preconditions.checkArgument(partitionKey != null, "partitionKey cannot be null for operations using "
            + "EncryptionContainer.");

        byte[] streamPayload = cosmosSerializerToStream(item);
        return  createItemHelper(streamPayload, partitionKey, requestOptions, (Class<T>) item.getClass(), false);

     * Deletes an item.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single Cosmos item response for the deleted item.
     * @param itemId the item id.
     * @param partitionKey the partition key.
     * @return an {@link Mono} containing the Cosmos item resource response.
    public Mono<CosmosItemResponse<Object>> deleteItem(String itemId, PartitionKey partitionKey) {
        return deleteItem(itemId, partitionKey, new CosmosItemRequestOptions());

     * Deletes the item.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single Cosmos item response for the deleted item.
     * @param itemId id of the item.
     * @param partitionKey partitionKey of the item.
     * @param requestOptions the request options.
     * @return an {@link Mono} containing the Cosmos item resource response.
    public Mono<CosmosItemResponse<Object>> deleteItem(String itemId,
                                                       PartitionKey partitionKey,
                                                       CosmosItemRequestOptions requestOptions) {

        return container.deleteItem(itemId, partitionKey, requestOptions);

     * Deletes the item.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single Cosmos item response for the deleted item.
     * @param <T> the type parameter.
     * @param item item to be deleted.
     * @param requestOptions the request options.
     * @return an {@link Mono} containing the Cosmos item resource response.
    public <T> Mono<CosmosItemResponse<Object>> deleteItem(T item, CosmosItemRequestOptions requestOptions) {
        return container.deleteItem(item, requestOptions);

     * Deletes all items in the Container with the specified partitionKey value.
     * Starts an asynchronous Cosmos DB background operation which deletes all items in the Container with the specified value.
     * The asynchronous Cosmos DB background operation runs using a percentage of user RUs.
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single Cosmos item response for all the deleted items.
     * @param partitionKey partitionKey of the item.
     * @param requestOptions the request options.
     * @return an {@link Mono} containing the Cosmos item resource response.
    @Beta(value = Beta.SinceVersion.V1, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public Mono<CosmosItemResponse<Object>> deleteAllItemsByPartitionKey(PartitionKey partitionKey, CosmosItemRequestOptions requestOptions) {
        if (requestOptions == null) {
            requestOptions = new CosmosItemRequestOptions();
        return container.deleteAllItemsByPartitionKey(partitionKey, requestOptions);

     * Upserts an item.
     * <p>
     * After subscription the operation will be performed. The {@link Mono} upon
     * successful completion will contain a single resource response with the
     * upserted item. In case of failure the {@link Mono} will error.
     * @param <T> the type parameter.
     * @param item the item represented as a POJO or Item object to upsert.
     * @return an {@link Mono} containing the single resource response with the upserted item or an error.
    public <T> Mono<CosmosItemResponse<T>> upsertItem(T item) {
        return upsertItem(item, new CosmosItemRequestOptions());

     * Upserts an item.
     * <p>
     * After subscription the operation will be performed. The {@link Mono} upon
     * successful completion will contain a single resource response with the
     * upserted item. In case of failure the {@link Mono} will error.
     * @param <T> the type parameter.
     * @param item the item represented as a POJO or Item object to upsert.
     * @param requestOptions the request options.
     * @return an {@link Mono} containing the single resource response with the upserted item or an error.
    public <T> Mono<CosmosItemResponse<T>> upsertItem(T item, CosmosItemRequestOptions requestOptions) {
        Preconditions.checkNotNull(item, "item");
        if (requestOptions == null) {
            requestOptions = new CosmosItemRequestOptions();

        byte[] streamPayload = cosmosSerializerToStream(item);
        return upsertItemHelper(streamPayload, null, requestOptions, (Class<T>) item.getClass(), false);

     * Upserts an item.
     * <p>
     * After subscription the operation will be performed. The {@link Mono} upon
     * successful completion will contain a single resource response with the
     * upserted item. In case of failure the {@link Mono} will error.
     * @param <T> the type parameter.
     * @param item the item represented as a POJO or Item object to upsert.
     * @param partitionKey the partition key.
     * @param requestOptions the request options.
     * @return an {@link Mono} containing the single resource response with the upserted item or an error.
    public <T> Mono<CosmosItemResponse<T>> upsertItem(T item,
                                                      PartitionKey partitionKey,
                                                      CosmosItemRequestOptions requestOptions) {
        Preconditions.checkNotNull(item, "item");
        if (requestOptions == null) {
            requestOptions = new CosmosItemRequestOptions();

        Preconditions.checkArgument(partitionKey != null, "partitionKey cannot be null for operations using "
            + "EncryptionContainer.");

        byte[] streamPayload = cosmosSerializerToStream(item);
        return upsertItemHelper(streamPayload, partitionKey, requestOptions, (Class<T>) item.getClass(), false);

     * Replaces an item with the passed in item  and encrypts the requested fields.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single Cosmos item response with the replaced item.
     * @param <T> the type parameter.
     * @param item the item to replace (containing the item id).
     * @param itemId the item id.
     * @param partitionKey the partition key.
     * @return an {@link Mono} containing the Cosmos item resource response with the replaced item or an error.
    public <T> Mono<CosmosItemResponse<T>> replaceItem(T item, String itemId, PartitionKey partitionKey) {
        return replaceItem(item, itemId, partitionKey, new CosmosItemRequestOptions());

     * Replaces an item with the passed in item  and encrypts the requested fields.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single Cosmos item response with the replaced item.
     * @param <T> the type parameter.
     * @param item the item to replace (containing the item id).
     * @param itemId the item id.
     * @param partitionKey the partition key.
     * @param requestOptions the request comosItemRequestOptions.
     * @return an {@link Mono} containing the Cosmos item resource response with the replaced item or an error.
    public <T> Mono<CosmosItemResponse<T>> replaceItem(T item,
                                                       String itemId,
                                                       PartitionKey partitionKey,
                                                       CosmosItemRequestOptions requestOptions) {
        Preconditions.checkNotNull(item, "item");
        if (requestOptions == null) {
            requestOptions = new CosmosItemRequestOptions();

        Preconditions.checkArgument(partitionKey != null, "partitionKey cannot be null for operations using "
            + "EncryptionContainer.");

        byte[] streamPayload = cosmosSerializerToStream(item);
        return replaceItemHelper(streamPayload, itemId, partitionKey, requestOptions, (Class<T>) item.getClass(), false);

     * Reads an item.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain an item response with the read item.
     * @param <T> the type parameter.
     * @param id the item id.
     * @param partitionKey the partition key.
     * @param classType the item type.
     * @return an {@link Mono} containing the Cosmos item response with the read item or an error.
    public <T> Mono<CosmosItemResponse<T>> readItem(String id, PartitionKey partitionKey, Class<T> classType) {
        return readItem(id, partitionKey, ModelBridgeInternal.createCosmosItemRequestOptions(partitionKey), classType);

     * Reads an item using a configured {@link CosmosItemRequestOptions}.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a Cosmos item response with the read item.
     * @param <T> the type parameter.
     * @param id the item id.
     * @param partitionKey the partition key.
     * @param requestOptions the request {@link CosmosItemRequestOptions}.
     * @param classType the item type.
     * @return an {@link Mono} containing the Cosmos item response with the read item or an error.
    public <T> Mono<CosmosItemResponse<T>> readItem(String id,
                                                    PartitionKey partitionKey,
                                                    CosmosItemRequestOptions requestOptions,
                                                    Class<T> classType) {
        if (requestOptions == null) {
            requestOptions = new CosmosItemRequestOptions();

        Mono<CosmosItemResponse<byte[]>> responseMessageMono = this.readItemHelper(id, partitionKey, requestOptions, false);

        return responseMessageMono.publishOn(encryptionScheduler).flatMap(cosmosItemResponse -> setByteArrayContent(cosmosItemResponse,
            .map(bytes -> this.responseFactory.createItemResponse(cosmosItemResponse, classType)));

     * Query for items in the current container.
     * <p>
     * After subscription the operation will be performed. The {@link CosmosPagedFlux} will
     * contain one or several feed response of the obtained items. In case of
     * failure the {@link CosmosPagedFlux} will error.
     * @param <T> the type parameter.
     * @param query the query.
     * @param classType the class type.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an
     * error.
    public <T> CosmosPagedFlux<T> queryItems(String query, Class<T> classType) {
        return this.queryItems(new SqlQuerySpec(query), classType);

     * Query for items in the current container using a string.
     * <p>
     * After subscription the operation will be performed. The {@link CosmosPagedFlux} will
     * contain one or several feed response of the obtained items. In case of
     * failure the {@link CosmosPagedFlux} will error.
     * @param <T> the type parameter.
     * @param query the query.
     * @param requestOptions the query request options.
     * @param classType the class type.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an
     * error.
    public <T> CosmosPagedFlux<T> queryItems(String query, CosmosQueryRequestOptions requestOptions,
                                             Class<T> classType) {
        if (requestOptions == null) {
            requestOptions = new CosmosQueryRequestOptions();

        return this.queryItems(new SqlQuerySpec(query), requestOptions, classType);

     * Query for items in the current container using a {@link SqlQuerySpec}.
     * <p>
     * After subscription the operation will be performed. The {@link CosmosPagedFlux} will
     * contain one or several feed response of the obtained items. In case of
     * failure the {@link CosmosPagedFlux} will error.
     * @param <T> the type parameter.
     * @param querySpec the SQL query specification.
     * @param classType the class type.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an
     * error.
    public <T> CosmosPagedFlux<T> queryItems(SqlQuerySpec querySpec, Class<T> classType) {
        return queryItemsHelper(querySpec, new CosmosQueryRequestOptions(), classType, false);

     * Query for items in the current container using a {@link SqlQuerySpec} and {@link CosmosQueryRequestOptions}.
     * <p>
     * After subscription the operation will be performed. The {@link Flux} will
     * contain one or several feed response of the obtained items. In case of
     * failure the {@link CosmosPagedFlux} will error.
     * @param <T> the type parameter.
     * @param query the SQL query specification.
     * @param requestOptions the query request options.
     * @param classType the class type.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an
     * error.
    public <T> CosmosPagedFlux<T> queryItems(SqlQuerySpec query, CosmosQueryRequestOptions requestOptions,
                                             Class<T> classType) {
        if (requestOptions == null) {
            requestOptions = new CosmosQueryRequestOptions();

        return queryItemsHelper(query, requestOptions, classType,false);

     * Query for items in the current container using a {@link SqlQuerySpecWithEncryption}.
     * <p>
     * After subscription the operation will be performed. The {@link CosmosPagedFlux} will contain one or several feed
     * response of the obtained items. In case of failure the {@link CosmosPagedFlux} will error.
     * @param <T>                        the type parameter.
     * @param sqlQuerySpecWithEncryption the sqlQuerySpecWithEncryption.
     * @param options                    the query request options.
     * @param classType                  the class type.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained items or an
     * error.
    public <T> CosmosPagedFlux<T> queryItemsOnEncryptedProperties(SqlQuerySpecWithEncryption sqlQuerySpecWithEncryption,
                                                                  CosmosQueryRequestOptions options,
                                                                  Class<T> classType) {
        if (options == null) {
            options = new CosmosQueryRequestOptions();

        if (EncryptionModelBridgeInternal.getEncryptionParamMap(sqlQuerySpecWithEncryption).size() > 0) {
            List<Mono<Void>> encryptionSqlParameterMonoList = new ArrayList<>();
            for (Map.Entry<String, SqlParameter> entry :
                EncryptionModelBridgeInternal.getEncryptionParamMap(sqlQuerySpecWithEncryption).entrySet()) {
                encryptionSqlParameterMonoList.add(EncryptionModelBridgeInternal.addEncryptionParameterAsync(sqlQuerySpecWithEncryption, entry.getKey(), entry.getValue(), this));
            Mono<List<Void>> listMono = Flux.mergeSequential(encryptionSqlParameterMonoList).collectList();
            Mono<SqlQuerySpec> sqlQuerySpecMono =
                listMono.flatMap(ignoreVoids -> Mono.just(EncryptionModelBridgeInternal.getSqlQuerySpec(sqlQuerySpecWithEncryption)));
            return queryItemsHelperWithMonoSqlQuerySpec(sqlQuerySpecMono, sqlQuerySpecWithEncryption, options, classType, false);
        } else {
            return queryItemsHelper(EncryptionModelBridgeInternal.getSqlQuerySpec(sqlQuerySpecWithEncryption),
                options, classType, false);

     * Query for items in the change feed of the current container using the {@link CosmosChangeFeedRequestOptions}.
     * <p>
     * After subscription the operation will be performed. The {@link Flux} will
     * contain one or several feed response of the obtained items. In case of
     * failure the {@link CosmosPagedFlux} will error.
     * @param <T> the type parameter.
     * @param options the change feed request options.
     * @param classType the class type.
     * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained
     * items or an error.
    @Beta(value = Beta.SinceVersion.V1, warningText =
    public <T> CosmosPagedFlux<T> queryChangeFeed(CosmosChangeFeedRequestOptions options, Class<T> classType) {
        checkNotNull(options, "Argument 'options' must not be null.");
        checkNotNull(classType, "Argument 'classType' must not be null.");

        return queryChangeFeedHelper(options, classType,false);

     * Run patch operations on an Item.
     * <p>
     * After subscription the operation will be performed.
     * The {@link Mono} upon successful completion will contain a single Cosmos item response with the patched item.
     * @param <T> the type parameter.
     * @param itemId the item id.
     * @param partitionKey the partition key.
     * @param cosmosPatchOperations Represents a container having list of operations to be sequentially applied to the referred Cosmos item.
     * @param options the request options.
     * @param itemType the item type.
     * @return an {@link Mono} containing the Cosmos item resource response with the patched item or an error.
    public <T> Mono<CosmosItemResponse<T>> patchItem(
        String itemId,
        PartitionKey partitionKey,
        CosmosPatchOperations cosmosPatchOperations,
        CosmosPatchItemRequestOptions options,
        Class<T> itemType) {

        checkNotNull(itemId, "expected non-null itemId");
        checkNotNull(partitionKey, "expected non-null partitionKey for patchItem");
        checkNotNull(cosmosPatchOperations, "expected non-null cosmosPatchOperations");

        if (options == null) {
            options = new CosmosPatchItemRequestOptions();

        return patchItemHelper(itemId, partitionKey, cosmosPatchOperations, options, itemType);

    private <T> Mono<CosmosItemResponse<T>> patchItemHelper(String itemId,
                                                           PartitionKey partitionKey,
                                                           CosmosPatchOperations cosmosPatchOperations,
                                                           CosmosPatchItemRequestOptions options,
                                                           Class<T> itemType) {
        List<Mono<PatchOperation>> monoList = new ArrayList<>();
        for (PatchOperation patchOperation : this.cosmosPatchOperationsAccessor.getPatchOperations(cosmosPatchOperations)) {
            Mono<PatchOperation> itemPatchOperationMono = null;
            if (patchOperation.getOperationType() == PatchOperationType.REMOVE) {
                itemPatchOperationMono = Mono.just(patchOperation);
            else if (patchOperation.getOperationType() == PatchOperationType.INCREMENT) {
                throw new IllegalArgumentException("Increment patch operation is not allowed for encrypted path");
            else if (patchOperation instanceof PatchOperationCore) {
                JsonNode objectNode = EncryptionUtils.getSimpleObjectMapper().valueToTree(((PatchOperationCore)patchOperation).getResource());
                itemPatchOperationMono =
                    encryptionProcessor.encryptPatchNode(objectNode, ((PatchOperationCore)patchOperation).getPath()).map(encryptedObjectNode -> {
                        return new PatchOperationCore<>(
        Mono<List<PatchOperation>> encryptedPatchOperationsListMono =
        CosmosPatchItemRequestOptions finalRequestOptions = options;

        CosmosPatchOperations encryptedCosmosPatchOperations = CosmosPatchOperations.create();

        return encryptedPatchOperationsListMono.flatMap(patchOperations -> {
            return patchItemInternalHelper(itemId, partitionKey, encryptedCosmosPatchOperations, finalRequestOptions,itemType, false);

    @SuppressWarnings("unchecked") // Casting cosmosItemResponse to CosmosItemResponse<byte[]> from CosmosItemResponse<T>
    private <T> Mono<CosmosItemResponse<T>> patchItemInternalHelper(String itemId,
                                                                    PartitionKey partitionKey,
                                                                    CosmosPatchOperations encryptedCosmosPatchOperations,
                                                                    CosmosPatchItemRequestOptions requestOptions,
                                                                    Class<T> itemType,
                                                                    boolean isRetry) {

        return this.container.patchItem(itemId, partitionKey, encryptedCosmosPatchOperations, requestOptions, itemType).publishOn(encryptionScheduler).
            flatMap(cosmosItemResponse -> setByteArrayContent((CosmosItemResponse<byte[]>) cosmosItemResponse,
                this.encryptionProcessor.decrypt(this.cosmosItemResponseBuilderAccessor.getByteArrayContent((CosmosItemResponse<byte[]>) cosmosItemResponse)))
                .map(bytes -> this.responseFactory.createItemResponse((CosmosItemResponse<byte[]>) cosmosItemResponse,
                    itemType))).onErrorResume(exception -> {
                if (!isRetry && exception instanceof CosmosException) {
                    final CosmosException cosmosException = (CosmosException) exception;
                    if (isIncorrectContainerRid(cosmosException)) {
                        return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).then
                            (Mono.defer(() -> patchItemInternalHelper(itemId, partitionKey, encryptedCosmosPatchOperations, requestOptions, itemType, true)));
                return Mono.error(exception);

     * Get the CosmosEncryptionAsyncClient
     * @return encrypted cosmosAsyncClient
    CosmosEncryptionAsyncClient getCosmosEncryptionAsyncClient() {
        return cosmosEncryptionAsyncClient;

     * Gets the CosmosAsyncContainer
     * @return cosmos container
    public CosmosAsyncContainer getCosmosAsyncContainer() {
        return container;

    <T> byte[] cosmosSerializerToStream(T item) {
        return EncryptionUtils.serializeJsonToByteArray(EncryptionUtils.getSimpleObjectMapper(), item);

    ItemDeserializer getItemDeserializer() {
        return CosmosBridgeInternal.getAsyncDocumentClient(container.getDatabase()).getItemDeserializer();

    Mono<JsonNode> decryptResponseNode(
        JsonNode jsonNode) {

        if (jsonNode == null) {
            return Mono.empty();

        return this.encryptionProcessor.decryptJsonNode(

    private Mono<CosmosItemResponse<byte[]>> setByteArrayContent(CosmosItemResponse<byte[]> rsp,
                                                                 Mono<byte[]> bytesMono) {
        return bytesMono.flatMap(
            bytes -> {
                this.cosmosItemResponseBuilderAccessor.setByteArrayContent(rsp, bytes);
                return Mono.just(rsp);

    private <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryDecryptionTransformer(Class<T> classType,
                                                                                                   boolean isChangeFeed,
                                                                                                   Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
        return func.andThen(flux ->
                    page -> {
                        boolean useEtagAsContinuation = isChangeFeed;
                        boolean isNoChangesResponse = isChangeFeed ?
                            : false;
                        List<Mono<JsonNode>> jsonNodeArrayMonoList =
                            page.getResults().stream().map(jsonNode -> decryptResponseNode(jsonNode)).collect(Collectors.toList());
                        return Flux.concat(jsonNodeArrayMonoList).map(
                            item -> getItemDeserializer().convert(classType, item)
                        ).collectList().map(itemList -> BridgeInternal.createFeedResponseWithQueryMetrics(itemList,

    private Mono<CosmosItemResponse<byte[]>> readItemHelper(String id,
                                                            PartitionKey partitionKey,
                                                            CosmosItemRequestOptions requestOptions,
                                                            boolean isRetry) {
        Mono<CosmosItemResponse<byte[]>> responseMessageMono = this.container.readItem(
            requestOptions, byte[].class);
        return responseMessageMono.onErrorResume(exception -> {
            if (!isRetry && exception instanceof CosmosException) {
                final CosmosException cosmosException = (CosmosException) exception;
                if (isIncorrectContainerRid(cosmosException)) {
                    return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).then(Mono.defer(() -> readItemHelper(id, partitionKey, requestOptions, true)
            return Mono.error(exception);

    private <T> Mono<CosmosItemResponse<T>> createItemHelper(byte[] streamPayload,
                                                             PartitionKey partitionKey,
                                                             CosmosItemRequestOptions requestOptions,
                                                             Class<T> itemClass,
                                                             boolean isRetry) {
        return this.encryptionProcessor.encrypt(streamPayload)
            .flatMap(encryptedPayload -> createItemHelper(
                .flatMap(cosmosItemResponse -> setByteArrayContent(cosmosItemResponse,
                    .map(bytes -> this.responseFactory.createItemResponse(cosmosItemResponse,
                        itemClass))).onErrorResume(exception -> {
                    if (!isRetry && exception instanceof CosmosException) {
                        final CosmosException cosmosException = (CosmosException) exception;
                        if (isIncorrectContainerRid(cosmosException)) {
                            return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).then
                                (Mono.defer(() -> createItemHelper(streamPayload, partitionKey, requestOptions,
                                    itemClass, true)));
                    return Mono.error(exception);

    private <T> Mono<CosmosItemResponse<byte[]>> createItemHelper(byte[] encryptedPayload,
                                                                  PartitionKey partitionKey,
                                                                  CosmosItemRequestOptions requestOptions) {
        return partitionKey != null
            ? this.container.createItem(encryptedPayload, partitionKey, requestOptions)
            : this.container.createItem(encryptedPayload, requestOptions);

    private <T> Mono<CosmosItemResponse<T>> upsertItemHelper(byte[] streamPayload,
                                                             PartitionKey partitionKey,
                                                             CosmosItemRequestOptions requestOptions,
                                                             Class<T> itemClass,
                                                             boolean isRetry) {
        return this.encryptionProcessor.encrypt(streamPayload)
            .flatMap(encryptedPayload -> upsertItemHelper(
                .flatMap(cosmosItemResponse -> setByteArrayContent(cosmosItemResponse,
                    .map(bytes -> this.responseFactory.createItemResponse(cosmosItemResponse, itemClass)))
                .onErrorResume(exception -> {
                    if (!isRetry && exception instanceof CosmosException) {
                        final CosmosException cosmosException = (CosmosException) exception;
                        if (isIncorrectContainerRid(cosmosException)) {
                            return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).then
                                (Mono.defer(() -> upsertItemHelper(streamPayload, partitionKey, requestOptions,
                                    itemClass, true)));
                    return Mono.error(exception);

    private <T> Mono<CosmosItemResponse<byte[]>> upsertItemHelper(byte[] encryptedPayload,
                                                             PartitionKey partitionKey,
                                                             CosmosItemRequestOptions requestOptions) {
        return partitionKey != null
            ? this.container.upsertItem(encryptedPayload, partitionKey, requestOptions)
            : this.container.upsertItem(encryptedPayload, requestOptions);

    private <T> Mono<CosmosItemResponse<T>> replaceItemHelper(byte[] streamPayload,
                                                              String itemId,
                                                             PartitionKey partitionKey,
                                                             CosmosItemRequestOptions requestOptions,
                                                             Class<T> itemClass,
                                                             boolean isRetry) {
        return this.encryptionProcessor.encrypt(streamPayload)
            .flatMap(encryptedPayload -> this.container.replaceItem(
                .flatMap(cosmosItemResponse -> setByteArrayContent(cosmosItemResponse,
                    .map(bytes -> this.responseFactory.createItemResponse(cosmosItemResponse, itemClass)))
                .onErrorResume(exception -> {
                    if (!isRetry && exception instanceof CosmosException) {
                        final CosmosException cosmosException = (CosmosException) exception;
                        if (isIncorrectContainerRid(cosmosException)) {
                            return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).then
                                (Mono.defer(() -> replaceItemHelper(streamPayload, itemId, partitionKey, requestOptions,
                                    itemClass, true)));
                    return Mono.error(exception);

    private <T> CosmosPagedFlux<T> queryItemsHelper(SqlQuerySpec sqlQuerySpec,
                                                    CosmosQueryRequestOptions options,
                                                    Class<T> classType,
                                                    boolean isRetry) {
        CosmosQueryRequestOptions finalOptions = options;
        Flux<FeedResponse<T>>  tFlux = CosmosBridgeInternal.queryItemsInternal(container, sqlQuerySpec, options,
            new Transformer<T>() {
                public Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> transform(Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
                    return queryDecryptionTransformer(classType, false, func);
            }).byPage().onErrorResume(exception -> {
            if (exception instanceof CosmosException) {
                final CosmosException cosmosException = (CosmosException) exception;
                if (!isRetry && isIncorrectContainerRid(cosmosException)) {
                    return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).thenMany(
                        (CosmosPagedFlux.defer(() -> queryItemsHelper(sqlQuerySpec,finalOptions, classType, true).byPage())));
            return Mono.error(exception);

        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, finalOptions);
            return tFlux;

    private <T> CosmosPagedFlux<T> queryChangeFeedHelper(CosmosChangeFeedRequestOptions options,
                                                         Class<T> classType,
                                                         boolean isRetry) {
        CosmosChangeFeedRequestOptions finalOptions = options;
        Flux<FeedResponse<T>> tFlux =
            UtilBridgeInternal.createCosmosPagedFlux(((Transformer<T>) func -> queryDecryptionTransformer(classType,
                func)).transform(cosmosAsyncContainerAccessor.queryChangeFeedInternalFunc(this.container, options,
                JsonNode.class))).byPage().onErrorResume(exception -> {
                if (exception instanceof CosmosException) {
                    final CosmosException cosmosException = (CosmosException) exception;
                    if (!isRetry && isIncorrectContainerRid(cosmosException)) {
                        return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).thenMany(
                            (CosmosPagedFlux.defer(() -> queryChangeFeedHelper(finalOptions, classType, true).byPage())));
                return Mono.error(exception);

        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            getEffectiveCosmosChangeFeedRequestOptions(pagedFluxOptions, finalOptions);
            return tFlux;

    private <T> CosmosPagedFlux<T> queryItemsHelperWithMonoSqlQuerySpec(Mono<SqlQuerySpec> sqlQuerySpecMono,
                                                                        SqlQuerySpecWithEncryption sqlQuerySpecWithEncryption,
                                                                        CosmosQueryRequestOptions options,
                                                                        Class<T> classType,
                                                                        boolean isRetry) {

        CosmosQueryRequestOptions finalOptions = options;

        Flux<FeedResponse<T>>  tFlux = CosmosBridgeInternal.queryItemsInternal(container, sqlQuerySpecMono, options,
            new Transformer<T>() {
                public Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> transform(Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
                    return queryDecryptionTransformer(classType, false, func);
            }).byPage().onErrorResume(exception -> {
            if (exception instanceof CosmosException) {
                final CosmosException cosmosException = (CosmosException) exception;
                if (!isRetry && isIncorrectContainerRid(cosmosException)) {
                    return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).thenMany(
                        (CosmosPagedFlux.defer(() -> queryItemsHelper(EncryptionModelBridgeInternal.getSqlQuerySpec(sqlQuerySpecWithEncryption), finalOptions, classType, true).byPage())));
            return Mono.error(exception);

        return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
            setContinuationTokenAndMaxItemCount(pagedFluxOptions, finalOptions);
            return tFlux;

     * Executes the encrypted transactional batch.
     * @param cosmosBatch Batch having list of operation and partition key which will be executed by this container.
     * @return A Mono response which contains details of execution of the transactional batch.
     * <p>
     * If the transactional batch executes successfully, the value returned by {@link
     * CosmosBatchResponse#getStatusCode} on the response returned will be set to 200}.
     * <p>
     * If an operation within the transactional batch fails during execution, no changes from the batch will be
     * committed and the status of the failing operation is made available by {@link
     * CosmosBatchResponse#getStatusCode} or by the exception. To obtain information about the operations
     * that failed in case of some user error like conflict, not found etc, the response can be enumerated.
     * This returns {@link CosmosBatchOperationResult} instances corresponding to each operation in the
     * transactional batch in the order they were added to the transactional batch.
     * For a result corresponding to an operation within the transactional batch, use
     * {@link CosmosBatchOperationResult#getStatusCode}
     * to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
     * another operation within the transactional batch, the value of this field will be 424;
     * for the operation that caused the batch to abort, the value of this field
     * will indicate the cause of failure.
     * <p>
     * If there are issues such as request timeouts, Gone, session not available, network failure
     * or if the service somehow returns 5xx then the Mono will return error instead of CosmosBatchResponse.
     * <p>
     * Use {@link CosmosBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
     * transactional batch succeeded.
    @Beta(value = Beta.SinceVersion.V1, warningText =
    public Mono<CosmosBatchResponse> executeCosmosBatch(CosmosBatch cosmosBatch) {
        return this.executeCosmosBatch(cosmosBatch, new CosmosBatchRequestOptions());

     * Executes the encrypted transactional batch.
     * @param cosmosBatch Batch having list of operation and partition key which will be executed by this container.
     * @param requestOptions Options that apply specifically to batch request.
     * @return A Mono response which contains details of execution of the transactional batch.
     * <p>
     * If the transactional batch executes successfully, the value returned by {@link
     * CosmosBatchResponse#getStatusCode} on the response returned will be set to 200}.
     * <p>
     * If an operation within the transactional batch fails during execution, no changes from the batch will be
     * committed and the status of the failing operation is made available by {@link
     * CosmosBatchResponse#getStatusCode} or by the exception. To obtain information about the operations
     * that failed in case of some user error like conflict, not found etc, the response can be enumerated.
     * This returns {@link CosmosBatchOperationResult} instances corresponding to each operation in the
     * transactional batch in the order they were added to the transactional batch.
     * For a result corresponding to an operation within the transactional batch, use
     * {@link CosmosBatchOperationResult#getStatusCode}
     * to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
     * another operation within the transactional batch, the value of this field will be 424;
     * for the operation that caused the batch to abort, the value of this field
     * will indicate the cause of failure.
     * <p>
     * If there are issues such as request timeouts, Gone, session not available, network failure
     * or if the service somehow returns 5xx then the Mono will return error instead of CosmosBatchResponse.
     * <p>
     * Use {@link CosmosBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
     * transactional batch succeeded.
    @Beta(value = Beta.SinceVersion.V1, warningText =
    public Mono<CosmosBatchResponse> executeCosmosBatch(CosmosBatch cosmosBatch, CosmosBatchRequestOptions requestOptions) {
        if (requestOptions == null) {
            requestOptions = new CosmosBatchRequestOptions();

        List<Mono<ItemBatchOperation<?>>> monoList = new ArrayList<>();
        for (ItemBatchOperation<?> itemBatchOperation : this.cosmosBatchAccessor.getOperationsInternal(cosmosBatch)) {
            Mono<ItemBatchOperation<?>> itemBatchOperationMono = null;
            if (itemBatchOperation.getItem() != null) {
                ObjectNode objectNode =
                itemBatchOperationMono =
                    encryptionProcessor.encryptObjectNode(objectNode).map(encryptedItem -> {
                        return new ItemBatchOperation<>(
            } else {
                itemBatchOperationMono =
                        new ItemBatchOperation<>(
        Mono<List<ItemBatchOperation<?>>> encryptedOperationListMono =
        CosmosBatchRequestOptions finalRequestOptions = requestOptions;

        CosmosBatch encryptedCosmosBatch = CosmosBatch.createCosmosBatch(cosmosBatch.getPartitionKeyValue());

        return encryptedOperationListMono.flatMap(itemBatchOperations -> {
            return executeCosmosBatchHelper(encryptedCosmosBatch, finalRequestOptions, false);

    private Mono<CosmosBatchResponse> executeCosmosBatchHelper(CosmosBatch encryptedCosmosBatch,
                                                               CosmosBatchRequestOptions requestOptions,
                                                               boolean isRetry) {
        return this.container.executeCosmosBatch(encryptedCosmosBatch, requestOptions).flatMap(cosmosBatchResponse -> {
            // TODO this should check for BadRequest StatusCode too, requires a service fix to return 400 instead of
            //  -1 which is currently returned inside the body.
            //  Once fixed from service below if condition can be removed, as this is already covered in onErrorResume.
            if (!isRetry && cosmosBatchResponse.getSubStatusCode() == 1024) {
                return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).then
                    (Mono.defer(() -> executeCosmosBatchHelper(encryptedCosmosBatch, requestOptions, true)));

            List<Mono<Void>> decryptMonoList = new ArrayList<>();
            for (CosmosBatchOperationResult cosmosBatchOperationResult :
                this.cosmosBatchResponseAccessor.getResults(cosmosBatchResponse)) {
                ObjectNode objectNode =
                if (objectNode != null) {
                    decryptMonoList.add(encryptionProcessor.decryptJsonNode(objectNode).flatMap(jsonNode -> {
                        this.cosmosBatchOperationResultAccessor.setResourceObject(cosmosBatchOperationResult, (ObjectNode) jsonNode);
                        return Mono.empty();

            Mono<List<Void>> listMono = Flux.mergeSequential(decryptMonoList).collectList();
            return -> cosmosBatchResponse);
        }).onErrorResume(exception -> {
            if (!isRetry && exception instanceof CosmosException) {
                final CosmosException cosmosException = (CosmosException) exception;
                if (isIncorrectContainerRid(cosmosException)) {
                    return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).then
                        (Mono.defer(() -> executeCosmosBatchHelper(encryptedCosmosBatch, requestOptions, true)));
            return Mono.error(exception);

    private void setRequestHeaders(CosmosItemRequestOptions requestOptions) {
        this.cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
        this.cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());

    private void setRequestHeaders(CosmosQueryRequestOptions requestOptions) {
        this.cosmosQueryRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
        this.cosmosQueryRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());

    private void setRequestHeaders(CosmosChangeFeedRequestOptions requestOptions) {
        this.cosmosChangeFeedRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
        this.cosmosChangeFeedRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());

    private void setRequestHeaders(CosmosBatchRequestOptions requestOptions) {
        this.cosmosBatchRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
        this.cosmosBatchRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());

    boolean isIncorrectContainerRid(CosmosException cosmosException) {
        return cosmosException.getStatusCode() == HttpConstants.StatusCodes.BADREQUEST &&