ReactorProvider.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.amqp.implementation;
import com.azure.core.amqp.implementation.handler.CustomIOHandler;
import com.azure.core.amqp.implementation.handler.ReactorHandler;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;
import java.io.IOException;
import java.util.Objects;
public class ReactorProvider {
private final ClientLogger logger = new ClientLogger(ReactorProvider.class);
private final Object lock = new Object();
private Reactor reactor;
private ReactorDispatcher reactorDispatcher;
public Reactor getReactor() {
synchronized (lock) {
return reactor;
}
}
public ReactorDispatcher getReactorDispatcher() {
synchronized (lock) {
return reactorDispatcher;
}
}
/**
* Creates a reactor and replaces the existing instance of it.
*
* @param connectionId Identifier for Reactor.
* @return The newly created reactor instance.
* @throws IOException If the service could not create a Reactor instance.
*/
public Reactor createReactor(String connectionId, int maxFrameSize) throws IOException {
final CustomIOHandler globalHandler = new CustomIOHandler(connectionId);
final ReactorHandler reactorHandler = new ReactorHandler(connectionId);
return createReactor(maxFrameSize, globalHandler, reactorHandler);
}
/**
* Creates a new reactor with the given reactor handler and IO handler.
*
* @param globalHandler The global handler for reactor instance. Useful for logging events that were missed.
* @param baseHandlers Handler for reactor instance. Usually: {@link ReactorHandler}
* @return A new reactor instance.
*/
private Reactor createReactor(final int maxFrameSize, final Handler globalHandler,
final BaseHandler... baseHandlers) throws IOException {
Objects.requireNonNull(baseHandlers);
Objects.requireNonNull(globalHandler);
if (maxFrameSize <= 0) {
throw logger.logExceptionAsError(new IllegalArgumentException("'maxFrameSize' must be a positive number."));
}
final ReactorOptions reactorOptions = new ReactorOptions();
reactorOptions.setMaxFrameSize(maxFrameSize);
reactorOptions.setEnableSaslByDefault(true);
final Reactor reactor = Proton.reactor(reactorOptions, baseHandlers);
reactor.setGlobalHandler(globalHandler);
final ReactorDispatcher dispatcher = new ReactorDispatcher(reactor);
synchronized (lock) {
this.reactor = reactor;
this.reactorDispatcher = dispatcher;
}
return this.reactor;
}
}