DefaultStorageQueueClientFactory.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.integration.storage.queue.factory;

import com.azure.core.http.policy.HttpLogOptions;
import com.azure.storage.queue.QueueAsyncClient;
import com.azure.storage.queue.QueueClientBuilder;
import com.azure.storage.queue.models.QueueStorageException;
import com.azure.spring.cloud.context.core.util.Memoizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;

import java.util.function.Function;

import static com.azure.spring.cloud.context.core.util.Constants.SPRING_INTEGRATION_STORAGE_QUEUE_APPLICATION_ID;

/**
 * Default client factory for Storage Queue.
 */
public class DefaultStorageQueueClientFactory implements StorageQueueClientFactory {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultStorageQueueClientFactory.class);
    private final String connectionString;
    private final Function<String, QueueAsyncClient> queueClientCreator = Memoizer.memoize(this::createQueueClient);

    public DefaultStorageQueueClientFactory(@NonNull String connectionString) {
        this.connectionString = connectionString;
    }


    @Override
    public QueueAsyncClient getOrCreateQueueClient(String queueName) {
        return this.queueClientCreator.apply(queueName);
    }

    private QueueAsyncClient createQueueClient(String queueName) {
        final QueueAsyncClient queueClient = new QueueClientBuilder()
            .connectionString(this.connectionString)
            .queueName(queueName)
            .httpLogOptions(new HttpLogOptions().setApplicationId(SPRING_INTEGRATION_STORAGE_QUEUE_APPLICATION_ID))
            .buildAsyncClient();

        queueClient.create()
            .onErrorContinue(QueueStorageException.class, (e, r) -> LOG.error(e.getMessage()))
            .subscribe();

        return queueClient;
    }

}