SynchronousEventSubscriber.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventhubs.implementation;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.WORK_ID_KEY;
/**
* Subscriber that takes {@link SynchronousReceiveWork} and publishes events to them in the order received.
*/
public class SynchronousEventSubscriber extends BaseSubscriber<PartitionEvent> {
private final Timer timer = new Timer();
private final ClientLogger logger;
private final SynchronousReceiveWork work;
private volatile Subscription subscription;
public SynchronousEventSubscriber(SynchronousReceiveWork work) {
this.work = Objects.requireNonNull(work, "'work' cannot be null.");
Map<String, Object> loggingContext = new HashMap<>();
loggingContext.put(WORK_ID_KEY, this.work.getId());
this.logger = new ClientLogger(SynchronousEventSubscriber.class, loggingContext);
}
/**
* On an initial subscription, will take the first work item, and request that amount of work for it.
*
* @param subscription Subscription for upstream.
*/
@Override
protected void hookOnSubscribe(Subscription subscription) {
if (this.subscription == null) {
this.subscription = subscription;
}
logger.atInfo()
.addKeyValue("pendingEvents", work.getNumberOfEvents())
.log("Scheduling receive timeout task.");
subscription.request(work.getNumberOfEvents());
timer.schedule(new ReceiveTimeoutTask(this::dispose, this.logger), work.getTimeout().toMillis());
}
/**
* Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of
* the subscriber.
*
* @param value Event to publish.
*/
@Override
protected void hookOnNext(PartitionEvent value) {
work.next(value);
if (work.isTerminal()) {
logger.info("Work completed. Closing Flux and cancelling subscription.");
dispose();
}
}
@Override
protected void hookOnComplete() {
logger.info("Completed. No events to listen to.");
dispose();
}
/**
* {@inheritDoc}
*/
@Override
protected void hookOnError(Throwable throwable) {
logger.error(Messages.ERROR_OCCURRED_IN_SUBSCRIBER_ERROR, throwable);
work.error(throwable);
dispose();
}
/**
* {@inheritDoc}
*/
@Override
public void dispose() {
work.complete();
subscription.cancel();
timer.cancel();
super.dispose();
}
private static class ReceiveTimeoutTask extends TimerTask {
private final ClientLogger logger;
private final Runnable onDispose;
ReceiveTimeoutTask(Runnable onDispose, ClientLogger logger) {
this.onDispose = onDispose;
this.logger = logger;
}
@Override
public void run() {
logger.info("Timeout encountered, disposing of subscriber.");
onDispose.run();
}
}
}