ServiceBusProcessorClient.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
import java.io.Closeable;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
/**
* The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient} provides a push-based
* mechanism that invokes the message processing callback when a message is received or the error handler when an error
* occurs when receiving messages. A {@link ServiceBusProcessorClient} can be created to process messages for a
* session-enabled or non session-enabled Service Bus entity. It supports auto-settlement of messages by default.
*
* <p><strong>Create and run a processor</strong></p>
* {@codesnippet com.azure.messaging.servicebus.servicebusprocessorclient#instantiation}
*
* <p><strong>Create and run a session-enabled processor</strong></p>
* {@codesnippet com.azure.messaging.servicebus.servicebusprocessorclient#session-instantiation}
*
* @see ServiceBusProcessorClientBuilder
* @see ServiceBusSessionProcessorClientBuilder
*/
public final class ServiceBusProcessorClient implements AutoCloseable {
private static final int SCHEDULER_INTERVAL_IN_SECONDS = 10;
private final ClientLogger logger = new ClientLogger(ServiceBusProcessorClient.class);
private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder;
private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder;
private final Consumer<ServiceBusReceivedMessageContext> processMessage;
private final Consumer<ServiceBusErrorContext> processError;
private final ServiceBusProcessorClientOptions processorOptions;
private final AtomicReference<Subscription> receiverSubscription = new AtomicReference<>();
private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference<>();
private final AtomicBoolean isRunning = new AtomicBoolean();
private final TracerProvider tracerProvider;
private ScheduledExecutorService scheduledExecutor;
/**
* Constructor to create a sessions-enabled processor.
*
* @param sessionReceiverBuilder The session processor builder to create new instances of async clients.
* @param processMessage The message processing callback.
* @param processError The error handler.
* @param processorOptions Options to configure this instance of the processor.
*/
ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder,
Consumer<ServiceBusReceivedMessageContext> processMessage,
Consumer<ServiceBusErrorContext> processError,
ServiceBusProcessorClientOptions processorOptions) {
this.sessionReceiverBuilder = Objects.requireNonNull(sessionReceiverBuilder,
"'sessionReceiverBuilder' cannot be null");
this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
this.receiverBuilder = null;
this.tracerProvider = processorOptions.getTracerProvider();
}
/**
* Constructor to create a processor.
*
* @param receiverBuilder The processor builder to create new instances of async clients.
* @param processMessage The message processing callback.
* @param processError The error handler.
* @param processorOptions Options to configure this instance of the processor.
*/
ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder,
Consumer<ServiceBusReceivedMessageContext> processMessage,
Consumer<ServiceBusErrorContext> processError, ServiceBusProcessorClientOptions processorOptions) {
this.receiverBuilder = Objects.requireNonNull(receiverBuilder, "'receiverBuilder' cannot be null");
this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(receiverBuilder.buildAsyncClient());
this.sessionReceiverBuilder = null;
this.tracerProvider = processorOptions.getTracerProvider();
}
/**
* Starts the processor in the background. When this method is called, the processor will initiate a message
* receiver that will invoke the message handler when new messages are available. This method is idempotent (ie.
* calling {@code start()} again after the processor is already running is a no-op).
* <p>
* Calling {@code start()} after calling {@link #stop() stop()} will resume processing messages using the same
* underlying connection.
* </p>
* <p>
* Calling {@code start()} after calling {@link #close() close()} will start the processor with a new connection.
* </p>
*/
public synchronized void start() {
if (isRunning.getAndSet(true)) {
logger.info("Processor is already running");
return;
}
if (asyncClient.get() == null) {
ServiceBusReceiverAsyncClient newReceiverClient = this.receiverBuilder == null
? this.sessionReceiverBuilder.buildAsyncClientForProcessor()
: this.receiverBuilder.buildAsyncClient();
asyncClient.set(newReceiverClient);
}
receiveMessages();
// Start an executor to periodically check if the client's connection is active
if (this.scheduledExecutor == null) {
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
scheduledExecutor.scheduleWithFixedDelay(() -> {
if (this.asyncClient.get().isConnectionClosed()) {
restartMessageReceiver();
}
}, SCHEDULER_INTERVAL_IN_SECONDS, SCHEDULER_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
}
}
/**
* Stops the message processing for this processor. The receiving links and sessions are kept active and this
* processor can resume processing messages by calling {@link #start()} again.
*/
public synchronized void stop() {
isRunning.set(false);
}
/**
* Stops message processing and closes the processor. The receiving links and sessions are closed and calling
* {@link #start()} will create a new processing cycle with new links and new sessions.
*/
@Override
public synchronized void close() {
isRunning.set(false);
if (receiverSubscription.get() != null) {
receiverSubscription.get().cancel();
receiverSubscription.set(null);
}
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
scheduledExecutor = null;
}
if (asyncClient.get() != null) {
asyncClient.get().close();
asyncClient.set(null);
}
}
/**
* Returns {@code true} if the processor is running. If the processor is stopped or closed, this method returns
* {@code false}.
*
* @return {@code true} if the processor is running; {@code false} otherwise.
*/
public synchronized boolean isRunning() {
return isRunning.get();
}
private synchronized void receiveMessages() {
if (receiverSubscription.get() != null) {
receiverSubscription.get().request(1);
return;
}
ServiceBusReceiverAsyncClient receiverClient = asyncClient.get();
receiverClient.receiveMessagesWithContext()
.parallel(processorOptions.getMaxConcurrentCalls())
.runOn(Schedulers.boundedElastic())
.subscribe(new Subscriber<ServiceBusMessageContext>() {
@Override
public void onSubscribe(Subscription subscription) {
receiverSubscription.set(subscription);
receiverSubscription.get().request(1);
}
@Override
public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
Context processSpanContext = null;
try {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext =
new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);
processSpanContext =
startProcessTracingSpan(serviceBusMessageContext.getMessage(),
receiverClient.getEntityPath(), receiverClient.getFullyQualifiedNamespace());
if (processSpanContext.getData(SPAN_CONTEXT_KEY).isPresent()) {
serviceBusMessageContext.getMessage().addContext(SPAN_CONTEXT_KEY, processSpanContext);
}
processMessage.accept(serviceBusReceivedMessageContext);
endProcessTracingSpan(processSpanContext, Signal.complete());
} catch (Exception ex) {
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));
endProcessTracingSpan(processSpanContext, Signal.error(ex));
if (!processorOptions.isDisableAutoComplete()) {
logger.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
}
}
}
if (isRunning.get()) {
logger.verbose("Requesting 1 more message from upstream");
receiverSubscription.get().request(1);
}
}
@Override
public void onError(Throwable throwable) {
logger.info("Error receiving messages.", throwable);
handleError(throwable);
if (isRunning.get()) {
restartMessageReceiver();
}
}
@Override
public void onComplete() {
logger.info("Completed receiving messages.");
if (isRunning.get()) {
restartMessageReceiver();
}
}
});
}
private void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal) {
if (processSpanContext == null) {
return;
}
Optional<Object> spanScope = processSpanContext.getData(SCOPE_KEY);
// Disposes of the scope when the trace span closes.
if (!spanScope.isPresent() || !tracerProvider.isEnabled()) {
return;
}
if (spanScope.get() instanceof Closeable) {
Closeable close = (Closeable) processSpanContext.getData(SCOPE_KEY).get();
try {
close.close();
tracerProvider.endSpan(processSpanContext, signal);
} catch (IOException ioException) {
logger.error("endTracingSpan().close() failed with an error %s", ioException);
}
} else {
logger.warning(String.format(Locale.US,
"Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span",
spanScope.get() != null ? spanScope.getClass() : "null"));
}
}
private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, String entityPath,
String fullyQualifiedNamespace) {
Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY);
if (diagnosticId == null || !tracerProvider.isEnabled()) {
return Context.NONE;
}
Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);
spanContext = spanContext
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, fullyQualifiedNamespace)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
spanContext = receivedMessage.getEnqueuedTime() == null
? spanContext
: spanContext.addData(MESSAGE_ENQUEUED_TIME,
receivedMessage.getEnqueuedTime().toInstant().getEpochSecond());
return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, ProcessKind.PROCESS);
}
private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext,
ServiceBusReceiverAsyncClient receiverClient) {
try {
receiverClient.abandon(serviceBusMessageContext.getMessage()).block();
} catch (Exception exception) {
logger.verbose("Failed to abandon message", exception);
}
}
private void handleError(Throwable throwable) {
try {
ServiceBusReceiverAsyncClient client = asyncClient.get();
final String fullyQualifiedNamespace = client.getFullyQualifiedNamespace();
final String entityPath = client.getEntityPath();
processError.accept(new ServiceBusErrorContext(throwable, fullyQualifiedNamespace, entityPath));
} catch (Exception ex) {
logger.verbose("Error from error handler. Ignoring error.", ex);
}
}
private void restartMessageReceiver() {
if (!isRunning()) {
return;
}
receiverSubscription.set(null);
ServiceBusReceiverAsyncClient receiverClient = asyncClient.get();
receiverClient.close();
ServiceBusReceiverAsyncClient newReceiverClient = this.receiverBuilder == null
? this.sessionReceiverBuilder.buildAsyncClientForProcessor()
: this.receiverBuilder.buildAsyncClient();
asyncClient.set(newReceiverClient);
receiveMessages();
}
}