ChangeFeedEncryptionProcessorBuilder.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.encryption;

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ChangeFeedProcessorBuilder;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
 * Helper class to build a encryption supported {@link ChangeFeedProcessor} instance.
 *
 */
public class ChangeFeedEncryptionProcessorBuilder {

    private String hostName ;
    private ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private Consumer<List<JsonNode>> encryptionConsumer;
    private CosmosEncryptionAsyncContainer feedContainer = null;
    private CosmosAsyncContainer leaseContainer = null;

    /**
     * Helper class to build a encryption supported {@link ChangeFeedProcessor} instance.
     *
     */
    public ChangeFeedEncryptionProcessorBuilder() {
    }

    /**
     * Sets the host name.
     *
     * @param hostName the name to be used for the host. When using multiple hosts, each host must have a unique
     * name.
     * @return current Builder.
     */
    public ChangeFeedEncryptionProcessorBuilder hostName(String hostName) {
        this.hostName = hostName;
        return this;
    }

    /**
     * Sets an existing {@link CosmosEncryptionAsyncContainer} to be used to read from the monitored container.
     *
     * @param feedContainer the instance of {@link CosmosEncryptionAsyncContainer} to be used.
     * @return current Builder.
     */
    public ChangeFeedEncryptionProcessorBuilder feedContainer(CosmosEncryptionAsyncContainer feedContainer) {
        this.feedContainer = feedContainer;

        return this;
    }

    /**
     * Sets an existing {@link CosmosAsyncContainer} to be used to read from the leases container.
     *
     * @param leaseContainer the instance of {@link CosmosAsyncContainer} to use.
     * @return current Builder.
     */
    public ChangeFeedEncryptionProcessorBuilder leaseContainer(CosmosAsyncContainer leaseContainer) {
        this.leaseContainer = leaseContainer;

        return this;
    }

    /**
     * Sets a consumer function which will be called to process changes.
     *
     * @param consumer the {@link Consumer} to call for handling the feeds.
     * @return current Builder.
     */
    public ChangeFeedEncryptionProcessorBuilder handleChanges(Consumer<List<JsonNode>> consumer) {
        this.encryptionConsumer = jsonNodes -> {
            List<Mono<JsonNode>> objectNodeMonoList =
                jsonNodes.stream().map(jsonNode -> {
                    if (jsonNode.isObject()) {
                        return feedContainer.decryptResponseNode((ObjectNode) jsonNode);
                    } else {
                        throw new IllegalStateException("Current operation not supported in change feed encryption");
                    }
                }).collect(Collectors.toList());
            Flux.concat(objectNodeMonoList).publishOn(Schedulers.boundedElastic()).
                collectList().doOnSuccess(consumer).block(); //TODO: https://github.com/Azure/azure-sdk-for-java/issues/23738
        };

        return this;
    }

    /**
     * Sets the {@link ChangeFeedProcessorOptions} to be used.
     * Unless specifically set the default values that will be used are:
     * <ul>
     * <li>maximum items per page or FeedResponse: 100</li>
     * <li>lease renew interval: 17 seconds</li>
     * <li>lease acquire interval: 13 seconds</li>
     * <li>lease expiration interval: 60 seconds</li>
     * <li>feed poll delay: 5 seconds</li>
     * <li>maximum scale count: unlimited</li>
     * </ul>
     *
     * @param changeFeedProcessorOptions the change feed processor options to use.
     * @return current Builder.
     */
    public ChangeFeedEncryptionProcessorBuilder options(ChangeFeedProcessorOptions changeFeedProcessorOptions) {
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;

        return this;
    }

    /**
     * Builds a new instance of the {@link ChangeFeedProcessor} with the specified configuration.
     *
     * @return an instance of {@link ChangeFeedProcessor}.
     */
    public ChangeFeedProcessor buildChangeFeedProcessor() {
        ChangeFeedProcessorBuilder changeFeedProcessorBuilder = new ChangeFeedProcessorBuilder()
            .hostName(this.hostName)
            .feedContainer(this.feedContainer.getCosmosAsyncContainer())
            .leaseContainer(this.leaseContainer)
            .handleChanges(this.encryptionConsumer)
            .options(this.changeFeedProcessorOptions);

        return changeFeedProcessorBuilder.buildChangeFeedProcessor();
    }
}