RntbdClientChannelPool.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.directconnectivity.rntbd;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint.Config;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.util.AttributeKey;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.ThrowableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.reportIssueUnless;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkState;
import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat;
/**
* A {@link ChannelPool} implementation that enforces a maximum number of concurrent direct TCP Cosmos connections.
*
* RntbdClientChannelPool: Actors
* - acquire (RntbdServiceEndpoint): acquire a channel to use
* - release (RntbdServiceEndpoint): channel usage is complete and returning it back to pool
* - Channel.closeChannel() Future: Event handling notifying the channel termination to refresh bookkeeping
* - acquisitionTimeoutTimer: channel acquisition time-out handler
* - monitoring (through RntbdServiceEndpoint): get monitoring metrics
*
* Behaviors/Expectations:
* - Bounds:
* - max requests in-flight per channelPool: MAX_CHANNELS_PER_ENDPOINT * MAX_REQUESTS_ENDPOINT (NOT A GUARANTEE)
* - AvailableChannels.size() + AcquiredChannels.size() + (connections in connecting state, i.e., connecting.get()) <= MAX_CHANNELS_PER_ENDPOINT
* - PendingAcquisition queue default-size: Max(10_000, MAX_CHANNELS_PER_ENDPOINT * MAX_REQUESTS_ENDPOINT)
* - ChannelPool executor included event-loop task: MAX_CHANNELS_PER_ENDPOINT * MAX_REQUESTS_ENDPOINT + newInFlightAcquisitions (not yet in pendingAcquisitionQueue)
* - newInFlightAcquisitions: is expected to very very short. Hard-bound to ADMINSSON_CONTROL (upstream in RntbdServiceEndpoint)
* - NewChannel vs ReUseChannel:
* - NewChannels are serially created (reasonable current state, possible future change, upstream please DON'T TAKE any dependency)
* - Will re-use an existing channel when possible (with MAX_REQUESTS_ENDPOINT attempt not GUARANTEED)
* - Channel usage fairness: fairness is attempted but not guaranteed
* - When loadFactor is > 90%, fairness is attempted by selecting Channel with less concurrency
* - Otherwise no guarantees on fairness per channel with-in bounds of MAX_REQUESTS_ENDPOINT. I.e. some channel might have high request concurrency compared to others
* - Channel serving guarantees:
* - Ordered delivery is not guaranteed (by-design)
* - Fairness is attempted but not a guarantee
* - [UNRELATED TO CHANNEL-POOL] [CURRENT DESIGN]: RntbdServiceEndpoint.write releases Channel before its usage -> acquisition order and channel user order might differ.
* - AcquisitionTimeout: if not can't be served in an expected time, fails gracefully
* - Metrics: are approximations and might be in-consistent(by-design) as well
* - EventLoop
* - ChannelPool executor might be shared across ChannelPools or Channel
*
* Design Notes:
* - channelPool.eventLoop{@Link executor}: (executes on a single & same thread, serially)
* - Each channelPool gets an EventLoop (selection is round-robin)
* - Schedule only when it can be served immediately
* - Updates and reads that depend on "strong consistency" - like whether to create a new connection or not.
* - Updates to below data structures should be done only when inside eventLoop
* - {@Link acquiredChannels}
* - {@Link availableChannels}
* - AcquisitionTimeout handling:
* - A global single threaded scheduler
* - [***] Each channel independently schedules acquisitionTimeout handlers
* - touches {@Link pendingAcquisitions} might result in impacting the fairness
* - RntbdServiceEndpoint.write:
* - Promise<Channel> might AcquisitionTimeout
* - RntbdServiceEndpoint.writeWhenConnected
* - releaseToPool immediately -> unblocks next acquisition if-any
* - **Uses Channel even after release**, in channelEventLoop [Not a functional issue but to be noted]
* - Possible that acquisition order might differ the ChannelWrite order
* - MAX_REQUESTS_ENDPOINT: Truth managed by RntbdRequestManager in Channel.Pipeline
* - RequestManager only known when the Channel process them.
* - In-flight scheduled ones are unknown -> its a SOFT BOUND
*
*/
@JsonSerialize(using = RntbdClientChannelPool.JsonSerializer.class)
public final class RntbdClientChannelPool implements ChannelPool {
// TODO: moderakh setup proper retry in higher stack for the exceptions here
private static final TimeoutException ACQUISITION_TIMEOUT = ThrowableUtil.unknownStackTrace(
new TimeoutException("acquisition took longer than the configured maximum time"),
RntbdClientChannelPool.class, "<init>");
private static final ClosedChannelException CHANNEL_CLOSED_ON_ACQUIRE = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), RntbdClientChannelPool.class, "acquire");
private static final IllegalStateException POOL_CLOSED_ON_ACQUIRE = ThrowableUtil.unknownStackTrace(
new ChannelAcquisitionException("service endpoint was closed while acquiring a channel"),
RntbdClientChannelPool.class, "acquire");
private static final IllegalStateException POOL_CLOSED_ON_RELEASE = ThrowableUtil.unknownStackTrace(
new ChannelAcquisitionException("service endpoint was closed while releasing a channel"),
RntbdClientChannelPool.class, "release");
private static final AttributeKey<RntbdClientChannelPool> POOL_KEY = AttributeKey.newInstance(
RntbdClientChannelPool.class.getName());
private static final IllegalStateException TOO_MANY_PENDING_ACQUISITIONS = ThrowableUtil.unknownStackTrace(
new ChannelAcquisitionException("too many outstanding acquire operations"),
RntbdClientChannelPool.class, "acquire");
private static final EventExecutor closer = new DefaultEventExecutor(new RntbdThreadFactory(
"channel-pool-closer",
true,
Thread.NORM_PRIORITY));
private static final EventExecutor pendingAcquisitionExpirationExecutor = new DefaultEventExecutor(new RntbdThreadFactory(
"pending-acquisition-expirator",
true,
Thread.NORM_PRIORITY));
private static final HashedWheelTimer acquisitionAndIdleEndpointDetectionTimer =
new HashedWheelTimer(new RntbdThreadFactory(
"channel-acquisition-timer",
true,
Thread.NORM_PRIORITY));
private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelPool.class);
private final long acquisitionTimeoutInNanos;
private final Runnable acquisitionTimeoutTask;
private final PooledByteBufAllocatorMetric allocatorMetric;
private final Bootstrap bootstrap;
private final RntbdServiceEndpoint endpoint;
private final EventExecutor executor;
private final ChannelHealthChecker healthChecker;
// private final ScheduledFuture<?> idleStateDetectionScheduledFuture;
private final int maxChannels;
private final int maxPendingAcquisitions;
private final int maxRequestsPerChannel;
private final ChannelPoolHandler poolHandler;
private final boolean releaseHealthCheck;
// Because state from these fields can be requested on any thread...
private final AtomicReference<Timeout> acquisitionAndIdleEndpointDetectionTimeout = new AtomicReference<>();
private final ConcurrentHashMap<Channel, Channel> acquiredChannels = new ConcurrentHashMap<>();
private final Deque<Channel> availableChannels = new ConcurrentLinkedDeque<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean connecting = new AtomicBoolean();
private final Queue<AcquireListener> pendingAcquisitions = new PriorityBlockingQueue<>(
100,
Comparator.comparingLong((task) -> task.originalPromise.getExpiryTimeInNanos()));
private final ScheduledFuture<?> pendingAcquisitionExpirationFuture;
/**
* Initializes a newly created {@link RntbdClientChannelPool} instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections.
* @param config the {@link Config} that is used for the channel pool instance created.
*/
RntbdClientChannelPool(final RntbdServiceEndpoint endpoint, final Bootstrap bootstrap, final Config config) {
this(endpoint, bootstrap, config, new RntbdClientChannelHealthChecker(config));
}
private RntbdClientChannelPool(
final RntbdServiceEndpoint endpoint,
final Bootstrap bootstrap,
final Config config,
final RntbdClientChannelHealthChecker healthChecker) {
checkNotNull(endpoint, "expected non-null endpoint");
checkNotNull(bootstrap, "expected non-null bootstrap");
checkNotNull(config, "expected non-null config");
checkNotNull(healthChecker, "expected non-null healthChecker");
this.endpoint = endpoint;
this.poolHandler = new RntbdClientChannelHandler(config, healthChecker);
this.executor = bootstrap.config().group().next();
this.healthChecker = healthChecker;
this.bootstrap = bootstrap.clone().handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel channel) throws Exception {
checkState(channel.eventLoop().inEventLoop());
RntbdClientChannelPool.this.poolHandler.channelCreated(channel);
}
});
// TODO (DANOBLE) Consider moving or removing this.allocatorMetric
// The metric is redundant in the scope of this class and should be pulled up to RntbdServiceEndpoint or
// entirely removed.
this.acquisitionTimeoutInNanos = config.connectionAcquisitionTimeoutInNanos();
this.allocatorMetric = config.allocator().metric();
this.maxChannels = config.maxChannelsPerEndpoint();
this.maxRequestsPerChannel = config.maxRequestsPerChannel();
this.maxPendingAcquisitions = Integer.MAX_VALUE;
this.releaseHealthCheck = true;
this.acquisitionTimeoutTask = acquisitionTimeoutInNanos <= 0 ? null : new AcquireTimeoutTask(this) {
/**
* Fails a request due to a channel acquisition timeout.
*
* @param task a {@link AcquireListener channel acquisition task} that has timed out.
*/
@Override
public void onTimeout(AcquireListener task) {
task.originalPromise.setFailure(ACQUISITION_TIMEOUT);
RntbdChannelAcquisitionTimeline.startNewEvent(
task.originalPromise.getChannelAcquisitionTimeline(),
RntbdChannelAcquisitionEventType.PENDING_TIME_OUT);
}
};
newTimeout(endpoint, config.idleEndpointTimeoutInNanos(), config.requestTimerResolutionInNanos());
if (this.acquisitionTimeoutTask != null) {
this.pendingAcquisitionExpirationFuture =
pendingAcquisitionExpirationExecutor.scheduleAtFixedRate(
this.acquisitionTimeoutTask,
this.acquisitionTimeoutInNanos,
this.acquisitionTimeoutInNanos,
TimeUnit.NANOSECONDS);
} else {
this.pendingAcquisitionExpirationFuture = null;
}
// this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate(
// () -> {
// final long elapsedTimeInNanos = System.nanoTime() - endpoint.lastRequestNanoTime();
//
// if (idleEndpointTimeoutInNanos - elapsedTimeInNanos <= 0) {
// if (logger.isDebugEnabled()) {
// logger.debug(
// "{} closing endpoint due to inactivity (elapsedTime: {} > idleEndpointTimeout: {})",
// endpoint,
// Duration.ofNanos(elapsedTimeInNanos),
// Duration.ofNanos(idleEndpointTimeoutInNanos));
// }
// endpoint.close();
// return;
// }
//
// this.runTasksInPendingAcquisitionQueue();
//
// }, requestTimerResolutionInNanos, requestTimerResolutionInNanos, TimeUnit.NANOSECONDS);
}
// region Accessors
/**
* Gets the current channel count.
* <p>
* The value returned is consistent, if called from the {@link RntbdClientChannelPool pool}'s thread
* {@link #executor}. It is an approximation that may be inconsistent depending on the pattern of {@link #acquire}
* and {@link #release} operations, if called from any other thread.
*
* @param approximationAcceptable if approximation is acceptable.
* @return the current channel count.
*/
public int channels(boolean approximationAcceptable) {
if (!approximationAcceptable) {
ensureInEventLoop();
}
return this.acquiredChannels.size() + this.availableChannels.size() + (this.connecting.get() ? 1 : 0);
}
/**
* Gets the current acquired channel count.
*
* @return the current acquired channel count.
*/
public int channelsAcquiredMetrics() {
return this.acquiredChannels.size();
}
/**
* Gets the current available channel count.
*
* NOTE: this only provides approximation for metrics
*
* @return the current available channel count.
*/
public int channelsAvailableMetrics() {
return this.availableChannels.size();
}
/**
* Gets the number of connections which are getting established.
*
* @return the number of connections which are getting established.
*/
public int attemptingToConnectMetrics() {
return this.connecting.get() ? 1 : 0;
}
/**
* Gets the current tasks in the executor pool
*
* NOTE: this only provides approximation for metrics
*
* @return the current tasks in the executor pool
*/
public int executorTaskQueueMetrics() {
return RntbdUtils.tryGetExecutorTaskQueueSize(this.executor);
}
/**
* {@code true} if this {@link RntbdClientChannelPool pool} is closed or {@code false} otherwise.
*
* @return {@code true} if this {@link RntbdClientChannelPool pool} is closed or {@code false} otherwise.
*/
public boolean isClosed() {
return this.closed.get();
}
/**
* Gets the maximum number of channels that will be allocated to this {@link RntbdClientChannelPool pool}.
* <p>
* No more than {@code maxChannels} channels will ever be pooled.
*
* @return the maximum number of channels that will be allocated to this {@link RntbdClientChannelPool pool}.
*/
public int maxChannels() {
return this.maxChannels;
}
/**
* Gets the maximum number of requests that will be left pending on any {@link Channel channel} in the {@link
* RntbdClientChannelPool pool}.
* <p>
* Healthy channels with fewer than {@code maxRequestsPerChannel} requests pending are considered to be serviceable
* channels. Unhealthy channels are regularly removed from the pool and--depending on load--may be replaced with new
* channels later.
*
* @return the maximum number of channels that will be allocated to this {@link RntbdClientChannelPool pool}.
*/
public int maxRequestsPerChannel() {
return this.maxRequestsPerChannel;
}
/**
* Gets the {@link SocketAddress address} of the replica served by this {@link RntbdClientChannelPool pool}.
*
* @return the {@link SocketAddress address} of the replica served by this {@link RntbdClientChannelPool pool}.
*/
public SocketAddress remoteAddress() {
return this.bootstrap.config().remoteAddress();
}
/**
* Gets the number of pending (asynchronous) channel acquisitions.
* <p>
* Pending acquisitions map to requests that have not yet been dispatched to a channel. Such requests are said to be
* queued. Hence the count of the the number of pending channel acquisitions is called the {@code
* requestQueueLength}.
*
* @return the number of pending channel acquisitions.
*/
public int requestQueueLength() {
return this.pendingAcquisitions.size();
}
public long usedDirectMemory() {
return this.allocatorMetric.usedDirectMemory();
}
public long usedHeapMemory() {
return this.allocatorMetric.usedHeapMemory();
}
// endregion
// region Methods
/**
* Acquire a {@link Channel channel} from the current {@link RntbdClientChannelPool pool}.
* <p>
* TODO (DANOBLE) Javadoc for {@link #acquire}.
*
* @return a {@link Promise promise} to be notified when the operation completes. If the operation fails, {@code
* channel} will be closed automatically.
*
* <p><strong>
* It is important to {@link #release} every {@link Channel channel} acquired from the pool, even when the {@link
* Channel channel} is closed explicitly.</strong>
*/
@Override
public Future<Channel> acquire() {
return this.acquire(new ChannelPromiseWithExpiryTime(
this.bootstrap.config().group().next().newPromise(),
System.nanoTime() + this.acquisitionTimeoutInNanos));
}
public Future<Channel> acquire(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
return this.acquire(new ChannelPromiseWithExpiryTime(
this.bootstrap.config().group().next().newPromise(),
System.nanoTime() + this.acquisitionTimeoutInNanos,
channelAcquisitionTimeline));
}
/**
* Acquire a {@link Channel channel} from the current {@link RntbdClientChannelPool pool}.
* <p>
* TODO (DANOBLE) Javadoc for {@link #acquire}.
*
* @param promise a {@link Promise promise} to be notified when the operation completes.
*
* @return a reference to {@code promise}. If the operation fails, {@code channel} will be closed automatically.
*
* <p><strong>
* It is important to {@link #release} every {@link Channel channel} acquired from the pool, even when the {@link
* Channel channel} is closed explicitly.</strong>
*
* @throws IllegalStateException if this {@link RntbdClientChannelPool} is closed.
*/
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
this.throwIfClosed();
final ChannelPromiseWithExpiryTime promiseWithExpiryTime = promise instanceof ChannelPromiseWithExpiryTime ?
(ChannelPromiseWithExpiryTime) promise :
new ChannelPromiseWithExpiryTime(promise, System.nanoTime() + acquisitionTimeoutInNanos);
try {
if (this.executor.inEventLoop()) {
this.acquireChannel(promiseWithExpiryTime);
} else {
if (pendingAcquisitions.size() > 1000) {
addTaskToPendingAcquisitionQueue(promiseWithExpiryTime);
} else {
this.executor.execute(() -> this.acquireChannel(promiseWithExpiryTime));
}
}
} catch (Throwable cause) {
promiseWithExpiryTime.setFailure(cause);
}
return promiseWithExpiryTime;
}
@Override
public void close() {
if (this.closed.compareAndSet(false, true)) {
if (this.executor.inEventLoop()) {
this.doClose();
} else {
this.executor.submit(this::doClose).awaitUninterruptibly(); // block until complete
}
}
this.pendingAcquisitionExpirationFuture.cancel(false);
}
/**
* Releases a {@link Channel channel} back to the current {@link RntbdClientChannelPool pool}.
* <p>
* TODO (DANOBLE) Javadoc for {@link RntbdClientChannelPool#release}.
*
* @param channel a {@link Channel channel} to release back to the {@link RntbdClientChannelPool channel pool}.
*
* @return asynchronous result of the operation. If the operation fails, {@code channel} will be closed
* automatically.
*/
@Override
public Future<Void> release(final Channel channel) {
return this.release(channel, channel.eventLoop().newPromise());
}
/**
* Releases a {@link Channel channel} back to the current {@link RntbdClientChannelPool pool}.
* <p>
* TODO (DANOBLE) Javadoc for {@link RntbdClientChannelPool#release}.
*
* @param channel a {@link Channel channel} to release back to the {@link RntbdClientChannelPool channel pool}.
* @param promise a {@link Promise promise} to be notified once the release is successful; failed otherwise.
*
* @return a reference to {@code promise}. If the operation fails, {@code channel} will be closed automatically.
*/
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
// We do not call this.throwIfClosed because a channel may be released back to the pool during close
checkNotNull(channel, "expected non-null channel");
checkNotNull(promise, "expected non-null promise");
Promise<Void> anotherPromise = this.executor.newPromise(); // ensures we finish in our executor's event loop
try {
final EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
this.releaseChannel(channel, anotherPromise);
} else {
loop.execute(() -> this.releaseChannel(channel, anotherPromise));
}
} catch (Throwable cause) {
if (this.executor.inEventLoop()) {
this.closeChannelAndFail(channel, cause, anotherPromise);
} else {
this.executor.submit(() -> this.closeChannelAndFail(channel, cause, anotherPromise));
}
}
anotherPromise.addListener((FutureListener<Void>) future -> {
this.ensureInEventLoop();
if (this.isClosed()) {
// We have no choice but to close the channel
promise.setFailure(POOL_CLOSED_ON_RELEASE);
this.closeChannel(channel);
return;
}
if (future.isSuccess()) {
this.runTasksInPendingAcquisitionQueue();
promise.setSuccess(null);
} else {
// TODO (DANOBLE) Is this check for an IllegalArgumentException required?
// Explain here, if so; otherwise remove the check.
final Throwable cause = future.cause();
if (!(cause instanceof IllegalArgumentException)) {
this.runTasksInPendingAcquisitionQueue();
}
promise.setFailure(cause);
}
});
return promise;
}
@Override
public String toString() {
return RntbdObjectMapper.toString(this);
}
// endregion
// region Privates
/**
* Acquires a serviceable channel from the {@link RntbdClientChannelPool pool}.
* <p>
* This method acquires the first channel that's both available and serviceable in LIFO order. A new channel is
* created and added to the pool if and only if:
* <ul>
* <li>fewer than {@link #maxChannels} channels have been created ({@link #channels} < {@link #maxChannels()}))
* and</li>
* <li>there are no acquired channels pending release ({@link #channelsAcquiredMetrics} == 0).</li>
* </ul>
* Under load it is possible that:
* <ul>
* <li>no available channels are serviceable ({@link RntbdRequestManager#pendingRequestCount()} ==
* {@link #maxChannels()})</li>
* <li>there are acquired channels pending release, and</li>
* <li>{@link #maxChannels} channels have been created.</li>
* </ul>
* Under these circumstances a request to acquire a channel will be satisfied by the
* {@link #acquisitionTimeoutTask} which will:
* <ul>
* <li>process items in the {@link #pendingAcquisitions} on each call to {@link #acquire} or {@link #release},
* and</li>
* <li>each {@link #acquisitionTimeoutInNanos} nanoseconds
* </ul>
* until a channel is acquired.
*
* @param promise the promise of a {@link Channel channel}.
*
* @see #getChannelState(Channel) (Channel)
* @see AcquireTimeoutTask
*/
private void acquireChannel(final ChannelPromiseWithExpiryTime promise) {
this.ensureInEventLoop();
reportIssueUnless(logger, promise != null, this, "Channel promise should not be null");
RntbdChannelAcquisitionTimeline channelAcquisitionTimeline = promise.getChannelAcquisitionTimeline();
if (this.isClosed()) {
promise.setFailure(POOL_CLOSED_ON_ACQUIRE);
return;
}
try {
Channel candidate = this.pollChannel(channelAcquisitionTimeline);
if (candidate != null) {
// Fulfill this request with our candidate, assuming it's healthy
// If our candidate is unhealthy, notifyChannelHealthCheck will call us again
doAcquireChannel(promise, candidate);
return;
}
// make sure to retrieve the actual channel count to avoid establishing more
// TCP connections than allowed.
final int channelCount = this.channels(false);
if (channelCount < this.maxChannels) {
if (this.connecting.compareAndSet(false, true)) {
// Fulfill this request with a new channel, assuming we can connect one
// If our connection attempt fails, notifyChannelConnect will call us again
final Promise<Channel> anotherPromise = this.newChannelPromiseForToBeEstablishedChannel(promise);
RntbdChannelAcquisitionTimeline.startNewEvent(
channelAcquisitionTimeline,
RntbdChannelAcquisitionEventType.ATTEMPT_TO_CREATE_NEW_CHANNEL);
final ChannelFuture future = this.bootstrap.clone().attr(POOL_KEY, this).connect();
if (future.isDone()) {
this.safeNotifyChannelConnect(future, anotherPromise);
} else {
future.addListener(ignored -> this.safeNotifyChannelConnect(future, anotherPromise));
}
return;
}
} else if (this.computeLoadFactor() > 0.90D) {
// All channels are swamped and we'll pick the one with the lowest pending request count
long pendingRequestCountMin = Long.MAX_VALUE;
for (Channel channel : this.availableChannels) {
final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class);
if (manager == null) {
logger.debug("Channel({} --> {}) closed", channel, this.remoteAddress());
} else {
final long pendingRequestCount = manager.pendingRequestCount();
// we accept the risk of reusing the channel even if more than maxPendingRequests are
// queued - by picking the channel with the least number of outstanding requests we load
// balance reasonably
if (pendingRequestCount < pendingRequestCountMin) {
RntbdChannelState channelState = this.getChannelState(channel);
RntbdChannelAcquisitionTimeline.addDetailsToLastEvent(channelAcquisitionTimeline, channelState);
if (channelState.isOk()) {
pendingRequestCountMin = pendingRequestCount;
candidate = channel;
}
}
}
}
if (candidate != null && this.availableChannels.remove(candidate)) {
this.doAcquireChannel(promise, candidate);
return;
}
} else {
for (Channel channel : this.availableChannels) {
// we pick the first available channel to avoid the additional cost of load balancing
// as long as the load is lower than the load factor threshold above.
RntbdChannelState channelState = this.getChannelState(channel);
RntbdChannelAcquisitionTimeline.addDetailsToLastEvent(channelAcquisitionTimeline, channelState);
if (channelState.isOk()) {
if (this.availableChannels.remove(channel)) {
this.doAcquireChannel(promise, channel);
return;
}
}
}
}
this.addTaskToPendingAcquisitionQueue(promise);
} catch (Throwable cause) {
promise.tryFailure(cause);
}
}
/**
* Add a task to the pending acquisition queue to fulfill the request for a {@link Channel channel} later.
* <p>
* Tasks in the pending acquisition queue are run whenever a channel is released. This ensures that pending requests
* for channels are fulfilled as soon as possible.
*
* @param promise a {@link Promise promise} that will be completed when a {@link Channel channel} is acquired or an
* error is encountered.
*
* @see #runTasksInPendingAcquisitionQueue
*/
private void addTaskToPendingAcquisitionQueue(ChannelPromiseWithExpiryTime promise) {
if (logger.isDebugEnabled()) {
logger.debug("{}, {}, {}, {}, {}, {}",
Instant.now(),
this.remoteAddress(),
this.channels(true),
this.channelsAcquiredMetrics(),
this.channelsAvailableMetrics(),
this.requestQueueLength());
}
if (this.pendingAcquisitions.size() >= this.maxPendingAcquisitions) {
promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS);
} else {
final AcquireListener acquireTask = new AcquireListener(this, promise);
if (!this.pendingAcquisitions.offer(acquireTask)) {
promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS);
} else {
RntbdChannelAcquisitionTimeline.startNewEvent(
promise.getChannelAcquisitionTimeline(),
RntbdChannelAcquisitionEventType.ADD_TO_PENDING_QUEUE);
}
}
}
/**
* Closes a {@link Channel channel} and removes it from the {@link RntbdClientChannelPool pool}.
*
* @param channel the {@link Channel channel} to close and remove from the {@link RntbdClientChannelPool pool}.
*/
private void closeChannel(final Channel channel) {
this.ensureInEventLoop();
this.acquiredChannels.remove(channel);
this.availableChannels.remove(channel);
channel.attr(POOL_KEY).set(null);
channel.close();
}
private void closeChannelAndFail(final Channel channel, final Throwable cause, final Promise<?> promise) {
this.ensureInEventLoop();
this.closeChannel(channel);
promise.tryFailure(cause);
}
private double computeLoadFactor() {
// TODO: moderakh improve logic and use in acquire?
ensureInEventLoop();
long pendingRequestCountMin = Long.MAX_VALUE;
long pendingRequestCountTotal = 0L;
long channelCount = 0;
for (Channel channel : this.availableChannels) {
final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class);
if (manager == null) {
logger.debug("Channel({}) connection lost", channel);
continue;
}
final long pendingRequestCount = manager.pendingRequestCount();
if (pendingRequestCount < pendingRequestCountMin) {
pendingRequestCountMin = pendingRequestCount;
}
pendingRequestCountTotal += pendingRequestCount;
channelCount++;
}
for (Channel channel : this.acquiredChannels.values()) {
final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class);
if (manager != null) {
final long pendingRequestCount = manager.pendingRequestCount();
if (pendingRequestCount < pendingRequestCountMin) {
pendingRequestCountMin = pendingRequestCount;
}
pendingRequestCountTotal += pendingRequestCount;
}
channelCount++;
}
return channelCount > 0 ? (double) pendingRequestCountTotal / (channelCount * this.maxRequestsPerChannel) : 1D;
}
private void doAcquireChannel(final ChannelPromiseWithExpiryTime promise, final Channel candidate) {
this.ensureInEventLoop();
acquiredChannels.put(candidate, candidate);
final ChannelPromiseWithExpiryTime anotherPromise =
this.newChannelPromiseForAvailableChannel(promise, candidate);
final EventLoop loop = candidate.eventLoop();
if (loop.inEventLoop()) {
this.doChannelHealthCheck(candidate, anotherPromise);
} else {
loop.execute(() -> this.doChannelHealthCheck(candidate, anotherPromise));
}
}
private void doChannelHealthCheck(final Channel channel, final ChannelPromiseWithExpiryTime promise) {
checkState(channel.eventLoop().inEventLoop());
final Future<Boolean> isHealthy = this.healthChecker.isHealthy(channel);
if (isHealthy.isDone()) {
this.notifyChannelHealthCheck(isHealthy, channel, promise);
} else {
isHealthy.addListener((FutureListener<Boolean>) future -> this.notifyChannelHealthCheck(
future,
channel,
promise));
}
}
private void doChannelHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) {
try {
checkState(channel.eventLoop().inEventLoop());
final Future<Boolean> future = this.healthChecker.isHealthy(channel);
if (future.isDone()) {
this.releaseAndOfferChannelIfHealthy(channel, promise, future);
} else {
future.addListener(ignored -> this.releaseAndOfferChannelIfHealthy(channel, promise, future));
}
} catch (Throwable error) {
if (this.executor.inEventLoop()) {
this.closeChannelAndFail(channel, error, promise);
} else {
this.executor.submit(() -> this.closeChannelAndFail(channel, error, promise));
}
}
}
private void doClose() {
this.ensureInEventLoop();
this.acquisitionAndIdleEndpointDetectionTimeout.getAndUpdate(timeout -> {
timeout.cancel();
return null;
});
// TODO (DANOBLE) this.idleStateDetectionScheduledFuture.cancel(true);
if (logger.isDebugEnabled()) {
logger.debug("{} closing with {} pending channel acquisitions", this, this.requestQueueLength());
}
for (; ; ) {
final AcquireListener task = this.pendingAcquisitions.poll();
if (task == null) {
break;
}
task.originalPromise.setFailure(new ClosedChannelException());
}
// NOTE: we must dispatch this request on another thread--the closer thread--as this.doClose is called on
// this.executor and we need to ensure we will not block it.
this.executor.submit(() -> {
// TODO: moderakh how can we ensure no one else is creating connections right now ???
// validate race condition
ensureInEventLoop();
this.availableChannels.addAll(this.acquiredChannels.values());
this.acquiredChannels.clear();
List<Channel> channelList = new ArrayList<>();
for (; ; ) {
// will remove from available channels
final Channel channel = this.pollChannel(null);
if (channel == null) {
break;
}
channelList.add(channel);
}
assert this.acquiredChannels.isEmpty() && this.availableChannels.isEmpty();
closer.submit(() -> {
for (Channel channel : channelList) {
channel.close().awaitUninterruptibly(); // block and ignore errors reported back from channel
// .close
}
}
);
}).addListener(closed -> {
if (!closed.isSuccess()) {
logger.error("[{}] close failed due to ", this, closed.cause());
} else {
logger.debug("[{}] closed", this);
}
});
}
private void ensureInEventLoop() {
reportIssueUnless(logger, this.executor.inEventLoop(), this,
"expected to be in event loop {}, not thread {}",
this.executor,
Thread.currentThread());
}
/**
* Creates a new {@link Channel channel} {@link Promise promise} that completes on a dedicated
* {@link EventExecutor executor} to avoid spamming the {@link RntbdClientChannelPool pool}'s
* {@link EventExecutor executor}.
*
* @return a newly created {@link Promise promise} that completes on a dedicated
* {@link EventExecutor executor} to avoid spamming the {@link RntbdClientChannelPool pool}'s
* {@link EventExecutor executor}.
*/
private ChannelPromiseWithExpiryTime newChannelPromiseForAvailableChannel(
final ChannelPromiseWithExpiryTime promise,
final Channel candidate) {
return this.createNewChannelPromise(promise, candidate.eventLoop());
}
/**
* Creates a new {@link Channel channel} {@link Promise promise} that completes on a dedicated
* {@link EventExecutor executor} to avoid spamming the {@link RntbdClientChannelPool pool}'s
* {@link EventExecutor executor}.
*
* @return a newly created {@link Promise promise} that completes on a dedicated
* {@link EventExecutor executor} to avoid spamming the {@link RntbdClientChannelPool pool}'s
* {@link EventExecutor executor}.
*/
private ChannelPromiseWithExpiryTime newChannelPromiseForToBeEstablishedChannel(
final ChannelPromiseWithExpiryTime promise) {
return this.createNewChannelPromise(promise, this.executor);
}
private ChannelPromiseWithExpiryTime createNewChannelPromise(
final ChannelPromiseWithExpiryTime promise,
final EventExecutor eventLoop) {
checkNotNull(promise, "expected non-null promise");
final AcquireListener listener = new AcquireListener(this, promise);
final Promise<Channel> anotherPromise = eventLoop.newPromise();
listener.acquired();
anotherPromise.addListener(listener);
return new ChannelPromiseWithExpiryTime(anotherPromise, promise.getExpiryTimeInNanos(), promise.getChannelAcquisitionTimeline());
}
private void newTimeout(
final RntbdServiceEndpoint endpoint,
final long idleEndpointTimeoutInNanos,
final long requestTimerResolutionInNanos) {
this.acquisitionAndIdleEndpointDetectionTimeout.set(acquisitionAndIdleEndpointDetectionTimer.newTimeout(
(Timeout timeout) -> {
if (idleEndpointTimeoutInNanos == 0) {
if (logger.isDebugEnabled()) {
logger.debug("Idle endpoint check is disabled");
}
} else {
final long elapsedTimeInNanos = System.nanoTime() - endpoint.lastRequestNanoTime();
if (idleEndpointTimeoutInNanos - elapsedTimeInNanos <= 0) {
if (logger.isDebugEnabled()) {
logger.debug(
"{} closing endpoint due to inactivity (elapsedTime: {} > idleEndpointTimeout: {})",
endpoint,
Duration.ofNanos(elapsedTimeInNanos),
Duration.ofNanos(idleEndpointTimeoutInNanos));
}
endpoint.close();
return;
}
}
if (this.requestQueueLength() <= 0) {
this.newTimeout(endpoint, idleEndpointTimeoutInNanos, requestTimerResolutionInNanos);
return;
}
this.executor.submit(this::runTasksInPendingAcquisitionQueue).addListener(future -> {
reportIssueUnless(logger, future.isSuccess(), this, "failed due to ", future.cause());
this.newTimeout(endpoint, idleEndpointTimeoutInNanos, requestTimerResolutionInNanos);
});
}, requestTimerResolutionInNanos, TimeUnit.NANOSECONDS));
}
private void safeNotifyChannelConnect(final ChannelFuture future, final Promise<Channel> promise) {
if (this.executor.inEventLoop()) {
notifyChannelConnect(future, promise);
} else {
this.executor.submit(() -> notifyChannelConnect(future, promise));
}
}
private void safeCloseChannel(final Channel channel) {
if (this.executor.inEventLoop()) {
this.closeChannel(channel);
} else {
this.executor.submit(() -> this.closeChannel(channel));
}
}
private void notifyChannelConnect(final ChannelFuture future, final Promise<Channel> promise) {
ensureInEventLoop();
reportIssueUnless(logger, this.connecting.get(), this, "connecting: false");
try {
if (future.isSuccess()) {
final Channel channel = future.channel();
channel.closeFuture().addListener((ChannelFuture f) -> {
if (logger.isDebugEnabled()) {
logger.debug("Channel to endpoint {} is closed. " +
"isInAvailableChannels={}, " +
"isInAcquiredChannels={}, " +
"isOnChannelEventLoop={}, " +
"isActive={}, " +
"isOpen={}, " +
"isRegistered={}, " +
"isWritable={}, " +
"threadName={}",
channel.remoteAddress(),
availableChannels.contains(channel),
acquiredChannels.contains(channel),
channel.eventLoop().inEventLoop(),
channel.isActive(),
channel.isOpen(),
channel.isRegistered(),
channel.isWritable(),
Thread.currentThread().getName()
);
}
this.safeCloseChannel(channel);
});
try {
this.poolHandler.channelAcquired(channel);
} catch (Throwable error) {
this.closeChannelAndFail(channel, error, promise);
return;
}
if (promise.trySuccess(channel)) {
if (logger.isDebugEnabled()) {
logger.debug("established a channel local {}, remote {}", channel.localAddress(), channel.remoteAddress());
}
this.acquiredChannels.compute(channel, (ignored, acquiredChannel) -> {
reportIssueUnless(logger, acquiredChannel == null, this,
"Channel({}) to be acquired has already been acquired",
channel);
reportIssueUnless(logger, !this.availableChannels.remove(channel), this,
"Channel({}) to be acquired is still in the list of available channels",
channel);
return channel;
});
} else {
if (logger.isDebugEnabled()) {
logger.debug("notifyChannelConnect promise.trySuccess(channel)=false");
}
// Promise was completed in the meantime (like cancelled), just close the channel
this.closeChannel(channel);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("notifyChannelConnect future was not successful");
}
promise.tryFailure(future.cause());
}
} finally {
if (promise instanceof ChannelPromiseWithExpiryTime) {
RntbdChannelAcquisitionTimeline.startNewEvent(
((ChannelPromiseWithExpiryTime) promise).getChannelAcquisitionTimeline(),
RntbdChannelAcquisitionEventType.ATTEMPT_TO_CREATE_NEW_CHANNEL_COMPLETE
);
}
this.connecting.set(false);
}
}
private void notifyChannelHealthCheck(
final Future<Boolean> future,
final Channel channel,
final ChannelPromiseWithExpiryTime promise) {
checkState(channel.eventLoop().inEventLoop());
if (future.isSuccess()) {
final boolean isHealthy = future.getNow();
if (isHealthy) {
try {
channel.attr(POOL_KEY).set(this);
this.poolHandler.channelAcquired(channel);
promise.setSuccess(channel);
} catch (Throwable cause) {
if (this.executor.inEventLoop()) {
this.closeChannelAndFail(channel, cause, promise);
} else {
this.executor.submit(() -> this.closeChannelAndFail(channel, cause, promise));
}
}
return;
}
}
if (this.executor.inEventLoop()) {
this.closeChannel(channel);
this.acquireChannel(promise);
} else {
this.executor.submit(() -> {
this.closeChannel(channel);
this.acquireChannel(promise);
});
}
}
/**
* Offer a {@link Channel} back to the available channel pool.
* <p>
* Maintainers: Implementations of this method must be thread-safe and this type ensures thread safety by calling
* this method serially on a single-threaded EventExecutor. As a result this method need not (and should not) be
* synchronized.
*
* @param channel the {@link Channel} to return to internal storage.
*
* @return {@code true}, if the {@link Channel} could be added to internal storage; otherwise {@code false}.
*/
private boolean offerChannel(final Channel channel) {
this.ensureInEventLoop();
return this.availableChannels.offer(channel);
}
/**
* Return {@link RntbdChannelState}.
* <p>
* A serviceable channel is one that is open, has an {@link RntbdContext RNTBD context}, and has fewer than {@link
* #maxRequestsPerChannel} requests in its pipeline. An inactive channel will not have a {@link RntbdRequestManager
* request manager}. Hence, this method first checks that the channel's request manager is non-null.
*
* @param channel the channel to check.
*
* @return {@link RntbdChannelState}.
*/
private RntbdChannelState getChannelState(Channel channel) {
checkNotNull(channel, "Channel cannot be null");
final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class);
if (manager == null) {
return RntbdChannelState.NULL_REQUEST_MANAGER;
}
if (!channel.isOpen()) {
return RntbdChannelState.CLOSED;
}
return manager.getChannelState(this.maxPendingAcquisitions);
}
/**
* Poll a {@link Channel} out of internal storage to reuse it
* <p>
* Maintainers: Implementations of this method must be thread-safe and this type ensures thread safety by calling
* this method serially on a single-threaded EventExecutor. As a result this method need not (and should not) be
* synchronized.
*
*
* @param channelAcquisitionTimeline the {@link RntbdChannelAcquisitionTimeline}.
* @return a value of {@code null}, if no {@link Channel} is ready to be reused
*
* @see #acquire(Promise)
*/
private Channel pollChannel(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
ensureInEventLoop();
RntbdPollChannelEvent event =
RntbdChannelAcquisitionTimeline.startNewPollEvent(
channelAcquisitionTimeline,
this.availableChannels.size(),
this.acquiredChannels.size());
final Channel first = this.availableChannels.pollFirst();
if (first == null) {
return null; // because there are no available channels
}
if (this.isClosed()) {
return first; // because this.close -> this.close0 -> this.pollChannel
}
// Only return channels as servicable here if less than maxPendingRequests
// are queued on them
RntbdChannelState channelState = this.getChannelState(first);
RntbdChannelAcquisitionEvent.addDetail(event, channelState);
if (channelState.isOk()) {
return first;
}
this.availableChannels.offer(first); // because we need a non-null sentinel to stop the search for a channel
for (Channel next = this.availableChannels.pollFirst(); next != first; next = this.availableChannels.pollFirst()) {
assert next != null : "impossible";
if (next.isActive()) {
// Only return channels as serviceable here if less than maxPendingRequests
// are queued on them
RntbdChannelState state = this.getChannelState(next);
RntbdChannelAcquisitionEvent.addDetail(event, state);
if (state.isOk()) {
return next;
}
this.availableChannels.offer(next);
}
}
this.availableChannels.offer(first); // we choose not to check any channel more than once in a single call
return null;
}
/**
* Releases a {@link Channel channel} and offers it back to the {@link RntbdClientChannelPool pool}.
*
* @param channel the channel to put back to the pool.
* @param promise a promise to fulfill when the operation completes. If the operation fails, {@code channel} will be
* closed.
*/
private void releaseAndOfferChannel(final Channel channel, final Promise<Void> promise) {
this.ensureInEventLoop();
try {
// NOTE: The check below is just defense in-depth. We would only ever
// try to remove a channel from acquiredChannels unsuccessfully if releaseChannel
// is called concurrently on the same channel instance.
//
// We grab the channel from acquiredChannels optimistically - so
// could end-up retrieving the same channel multiple times
// before switching to event loop thread and removing it here
// so we need to make sure that we only move the channel
// back to availableChannels once
if (this.acquiredChannels.remove(channel) == null) {
logger.warn(
"Unexpected race condition - releaseChannel called twice for the same channel [{} -> {}]",
channel.id(),
this.remoteAddress());
promise.setSuccess(null);
return;
}
if (this.offerChannel(channel)) {
this.poolHandler.channelReleased(channel);
promise.setSuccess(null);
} else {
final IllegalStateException error = new ChannelAcquisitionException(lenientFormat(
"cannot offer channel back to pool because the pool is at capacity (%s)\n %s\n %s",
this.maxChannels,
this,
channel));
this.closeChannelAndFail(channel, error, promise);
}
} catch (Throwable error) {
this.closeChannelAndFail(channel, error, promise);
}
}
/**
* Adds a {@link Channel channel} back to the {@link RntbdClientChannelPool pool} only if the channel is healthy.
*
* @param channel the {@link Channel channel} to put back to the {@link RntbdClientChannelPool pool}.
* @param promise offer operation promise.
* @param future contains a value of {@code true}, if (@code channel} is healthy; {@code false} otherwise.
*/
private void releaseAndOfferChannelIfHealthy(
final Channel channel,
final Promise<Void> promise,
final Future<Boolean> future) {
final boolean isHealthy = future.getNow();
if (isHealthy) {
// Channel is healthy so...
if (this.executor.inEventLoop()) {
this.releaseAndOfferChannel(channel, promise);
} else {
this.executor.submit(() -> this.releaseAndOfferChannel(channel, promise));
}
} else {
// Channel is unhealthy so just close and release it
try {
this.poolHandler.channelReleased(channel);
} catch (Throwable error) {
logger.debug("[{}] pool handler failed due to ", this, error);
} finally {
if (this.executor.inEventLoop()) {
this.closeChannel(channel);
} else {
this.executor.submit(() -> this.closeChannel(channel));
}
promise.setSuccess(null);
}
}
}
/**
* Releases a {@link Channel channel} back to the {@link RntbdClientChannelPool pool}.
*
* @param channel a {@link Channel channel} to be released to the current {@link RntbdClientChannelPool pool}.
* @param promise a promise that completes when {@code channel} is released.
* <p>
* If {@code channel} was not acquired from the current {@link RntbdClientChannelPool pool}, it is closed and {@code
* promise} completes with an {@link IllegalStateException}.
*/
private void releaseChannel(final Channel channel, final Promise<Void> promise) {
checkState(channel.eventLoop().inEventLoop());
final ChannelPool pool = channel.attr(POOL_KEY).getAndSet(null);
final boolean acquired = this.acquiredChannels.get(channel) != null;
if (acquired && pool == this) {
try {
if (this.releaseHealthCheck) {
this.doChannelHealthCheckOnRelease(channel, promise);
} else {
if (this.executor.inEventLoop()) {
this.releaseAndOfferChannel(channel, promise);
} else {
this.executor.submit(() -> this.releaseAndOfferChannel(channel, promise));
}
}
} catch (Throwable cause) {
if (this.executor.inEventLoop()) {
this.closeChannelAndFail(channel, cause, promise);
} else {
this.executor.submit(() -> this.closeChannelAndFail(channel, cause, promise));
}
}
} else {
final IllegalStateException error = new IllegalStateException(lenientFormat(
"%s cannot be released because it was not acquired by this pool: %s",
RntbdObjectMapper.toJson(channel),
this));
if (this.executor.inEventLoop()) {
this.closeChannelAndFail(channel, error, promise);
} else {
this.executor.submit(() -> this.closeChannelAndFail(channel, error, promise));
}
}
}
/**
* Runs tasks in the pending acquisition queue until it's empty.
* <p>
* Tasks that run without being fulfilled will be added back to the {@link #pendingAcquisitions} by a call to
* {@link #acquire}.
*/
private void runTasksInPendingAcquisitionQueue() {
this.ensureInEventLoop();
int channelsAvailable = this.availableChannels.size();
// NOTE: this potentially will cause unfair-ness with respect to task scheduling because
// task from head of the pendingAcquisitions queue
// can be taken out and be added to the end of the queue if no channel can be acquired.
do {
// translate a pending acquisition item to a task
final AcquireListener task = this.pendingAcquisitions.poll();
if (task == null) {
break;
}
task.acquired();
this.acquire(task.originalPromise);
} while (--channelsAvailable > 0);
}
private void throwIfClosed() {
checkState(!this.isClosed(), "%s is closed", this);
}
// endregion
// region Types
private static class AcquireListener implements FutureListener<Channel> {
private final ChannelPromiseWithExpiryTime originalPromise;
private final RntbdClientChannelPool pool;
private boolean acquired;
AcquireListener(RntbdClientChannelPool pool, ChannelPromiseWithExpiryTime originalPromise) {
this.originalPromise = originalPromise;
this.pool = pool;
}
public final boolean isAcquired() {
return this.acquired;
}
public final AcquireListener acquired() {
if (this.acquired) {
return this;
}
this.acquired = true;
return this;
}
private void doOperationComplete(Channel channel) {
checkState(channel.eventLoop().inEventLoop());
if (!channel.isActive()) {
this.fail(CHANNEL_CLOSED_ON_ACQUIRE);
return;
}
final ChannelPipeline pipeline = channel.pipeline();
checkState(pipeline != null, "expected non-null channel pipeline");
final RntbdRequestManager requestManager = pipeline.get(RntbdRequestManager.class);
checkState(requestManager != null, "expected non-null request manager");
if (requestManager.hasRequestedRntbdContext()) {
this.originalPromise.setSuccess(channel);
} else {
channel.writeAndFlush(RntbdHealthCheckRequest.MESSAGE).addListener(completed -> {
if (completed.isSuccess()) {
reportIssueUnless(
logger,
this.acquired && requestManager.hasRntbdContext(),
this,
"acquired: {}, rntbdContext: {}",
this.acquired,
requestManager.rntbdContext());
this.originalPromise.setSuccess(channel);
} else {
final Throwable cause = completed.cause();
logger.warn("Channel({}) health check request failed due to:", channel, cause);
this.fail(cause);
}
});
}
}
/**
* Ensures that a channel in the {@link RntbdClientChannelPool pool} is ready to receive requests.
* <p>
* A Direct TCP channel is ready to receive requests when it is active and has an {@link RntbdContext}.
* This method sends a health check request on a channel without an {@link RntbdContext} to force:
* <ol>
* <li>SSL negotiation</li>
* <li>RntbdContextRequest -> RntbdContext</li>
* <li>RntbdHealthCheckRequest -> receive acknowledgement</li>
* </ol>
*
* @param future a channel {@link Future future}.
* <p>
* {@link #originalPromise} is completed asynchronously when this method determines that the channel is ready to
* receive requests or an error is encountered.
*/
@Override
public final void operationComplete(Future<Channel> future) {
if (this.pool.isClosed()) {
if (future.isSuccess()) {
// Since the pool is closed, we have no choice but to close the channel
future.getNow().close();
}
this.originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE);
return;
}
if (future.isSuccess()) {
// Ensure that the channel is active and ready to receive requests
final Channel channel = future.getNow();
if (channel.eventLoop().inEventLoop()) {
doOperationComplete(channel);
} else {
channel.eventLoop().execute(() -> doOperationComplete(channel));
}
} else {
logger.warn("channel acquisition failed due to ", future.cause());
this.fail(future.cause());
}
}
public long getAcquisitionTimeoutInNanos() {
return this.originalPromise.getExpiryTimeInNanos();
}
private void fail(Throwable cause) {
this.originalPromise.setFailure(cause);
if (this.pool.executor.inEventLoop()) {
this.pool.runTasksInPendingAcquisitionQueue();
} else {
this.pool.executor.submit(this.pool::runTasksInPendingAcquisitionQueue);
}
}
}
private static abstract class AcquireTimeoutTask implements Runnable {
private final RntbdClientChannelPool pool;
public AcquireTimeoutTask(RntbdClientChannelPool pool) {
this.pool = pool;
}
public abstract void onTimeout(AcquireListener task);
/**
* Runs the {@link #onTimeout} method on each expired task in {@link
* RntbdClientChannelPool#pendingAcquisitions}.
*/
@Override
public final void run() {
if (logger.isDebugEnabled()) {
logger.debug("Starting the AcquireTimeoutTask to clean for endpoint [{}].", this.pool.remoteAddress());
}
long currentNanoTime = System.nanoTime();
while (true) {
AcquireListener removedTask = this.pool.pendingAcquisitions.poll();
if (removedTask == null) {
// queue is empty
break;
}
long expiryTime = removedTask.getAcquisitionTimeoutInNanos();
// Compare nanoTime as described in the System.nanoTime documentation
// See:
// * https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
// * https://github.com/netty/netty/issues/3705
if (expiryTime - currentNanoTime <= 0) {
this.onTimeout(removedTask);
} else {
if (!this.pool.pendingAcquisitions.offer(removedTask)) {
logger.error("Unexpected failure when returning the removed task"
+ " to pending acquisition queue. current size [{}]",
this.pool.pendingAcquisitions.size());
}
break;
}
}
}
}
static final class JsonSerializer extends StdSerializer<RntbdClientChannelPool> {
private static final long serialVersionUID = -8688539496437151693L;
JsonSerializer() {
super(RntbdClientChannelPool.class);
}
@Override
public void serialize(
final RntbdClientChannelPool value,
final JsonGenerator generator,
final SerializerProvider provider) throws IOException {
final RntbdClientChannelHealthChecker healthChecker = (RntbdClientChannelHealthChecker) value.healthChecker;
generator.writeStartObject();
generator.writeStringField("remoteAddress", value.remoteAddress().toString());
generator.writeBooleanField("isClosed", value.isClosed());
generator.writeObjectFieldStart("configuration");
generator.writeNumberField("maxChannels", value.maxChannels());
generator.writeNumberField("maxRequestsPerChannel", value.maxRequestsPerChannel());
generator.writeNumberField("idleConnectionTimeout", healthChecker.idleConnectionTimeoutInNanos());
generator.writeNumberField("readDelayLimit", healthChecker.readDelayLimitInNanos());
generator.writeNumberField("writeDelayLimit", healthChecker.writeDelayLimitInNanos());
generator.writeEndObject();
generator.writeObjectFieldStart("state");
generator.writeNumberField("channelsAcquired", value.channelsAcquiredMetrics());
generator.writeNumberField("channelsAvailable", value.channelsAvailableMetrics());
generator.writeNumberField("requestQueueLength", value.requestQueueLength());
generator.writeEndObject();
generator.writeEndObject();
}
}
private static class ChannelAcquisitionException extends IllegalStateException {
private static final long serialVersionUID = -6011782222645074949L;
public ChannelAcquisitionException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
// endregion
}