DefaultServiceBusTopicClientFactory.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.spring.integration.servicebus.factory;
import com.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.management.servicebus.ServiceBusNamespace;
import com.microsoft.azure.management.servicebus.Topic;
import com.microsoft.azure.servicebus.IMessageSender;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.SubscriptionClient;
import com.microsoft.azure.servicebus.TopicClient;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.azure.spring.cloud.context.core.util.Memoizer;
import com.azure.spring.cloud.context.core.util.Tuple;
import org.springframework.util.StringUtils;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* Default implementation of {@link ServiceBusTopicClientFactory}.
* Client will be cached to improve performance
*
* @author Warren Zhu
*/
public class DefaultServiceBusTopicClientFactory extends AbstractServiceBusSenderFactory
implements ServiceBusTopicClientFactory {
private static final String SUBSCRIPTION_PATH = "%s/subscriptions/%s";
private final BiFunction<String, String, ISubscriptionClient> subscriptionClientCreator =
Memoizer.memoize(this::createSubscriptionClient);
private final Function<String, ? extends IMessageSender> sendCreator = Memoizer.memoize(this::createTopicClient);
public DefaultServiceBusTopicClientFactory(String connectionString) {
super(connectionString);
}
private ISubscriptionClient createSubscriptionClient(String topicName, String subscription) {
if (resourceManagerProvider != null && StringUtils.hasText(namespace)) {
ServiceBusNamespace serviceBusNamespace =
resourceManagerProvider.getServiceBusNamespaceManager().getOrCreate(namespace);
Topic topic = resourceManagerProvider.getServiceBusTopicManager()
.getOrCreate(Tuple.of(serviceBusNamespace, topicName));
resourceManagerProvider.getServiceBusTopicSubscriptionManager().getOrCreate(Tuple.of(topic, subscription));
}
String subscriptionPath = String.format(SUBSCRIPTION_PATH, topicName, subscription);
try {
return new SubscriptionClient(new ConnectionStringBuilder(connectionString, subscriptionPath),
ReceiveMode.PEEKLOCK);
} catch (InterruptedException | ServiceBusException e) {
throw new ServiceBusRuntimeException("Failed to create service bus subscription client", e);
}
}
private IMessageSender createTopicClient(String topicName) {
if (resourceManagerProvider != null && StringUtils.hasText(namespace)) {
ServiceBusNamespace serviceBusNamespace =
resourceManagerProvider.getServiceBusNamespaceManager().getOrCreate(namespace);
resourceManagerProvider.getServiceBusTopicManager().getOrCreate(Tuple.of(serviceBusNamespace, topicName));
}
try {
return new TopicClient(new ConnectionStringBuilder(connectionString, topicName));
} catch (InterruptedException | ServiceBusException e) {
throw new ServiceBusRuntimeException("Failed to create service bus topic client", e);
}
}
@Override
public ISubscriptionClient getOrCreateSubscriptionClient(String topic, String subscription) {
return this.subscriptionClientCreator.apply(topic, subscription);
}
@Override
public IMessageSender getOrCreateSender(String name) {
return this.sendCreator.apply(name);
}
}