RntbdTransportClient.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.directconnectivity;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.RequestTimeline;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint;
import com.azure.cosmos.implementation.guava25.base.Strings;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.micrometer.core.instrument.Tag;
import io.netty.handler.ssl.SslContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Iterator;
import java.util.Locale;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import static com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdReporter.reportIssue;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkState;
import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat;
@JsonSerialize(using = RntbdTransportClient.JsonSerializer.class)
public class RntbdTransportClient extends TransportClient {
// region Fields
private static final String TAG_NAME = RntbdTransportClient.class.getSimpleName();
private static final AtomicLong instanceCount = new AtomicLong();
private static final Logger logger = LoggerFactory.getLogger(RntbdTransportClient.class);
/**
* NOTE: This context key name has been copied from {link Hooks#KEY_ON_ERROR_DROPPED} which is
* not exposed as public Api but package internal only
*
* A key that can be used to store a sequence-specific {@link Hooks#onErrorDropped(Consumer)}
* hook in a {@link Context}, as a {@link Consumer Consumer<Throwable>}.
*/
private static final String KEY_ON_ERROR_DROPPED = "reactor.onErrorDropped.local";
/**
* This lambda gets injected into the local Reactor Context to react tot he onErrorDropped event and
* log the throwable with DEBUG level instead of the ERROR level used in the default hook.
* This is safe here because we guarantee resource clean-up with the doFinally-lambda
*/
private static final Consumer<? super Throwable> onErrorDropHookWithReduceLogLevel =
throwable -> {
if (logger.isDebugEnabled()) {
logger.debug(
"Extra error - on error dropped - operator called :",
throwable);
}
};
private final AtomicBoolean closed = new AtomicBoolean();
private final RntbdEndpoint.Provider endpointProvider;
private final long id;
private final Tag tag;
// endregion
// region Constructors
/**
* Initializes a newly created {@linkplain RntbdTransportClient} object.
*
* @param configs A {@link Configs} instance containing the {@link SslContext} to be used.
* @param connectionPolicy The {@linkplain ConnectionPolicy connection policy} to be applied.
* @param userAgent The {@linkplain UserAgentContainer user agent} identifying.
* @param addressResolver The address resolver to be used for connection endpoint rediscovery, if connection
* endpoint rediscovery is enabled by {@code connectionPolicy}.
*/
public RntbdTransportClient(
final Configs configs,
final ConnectionPolicy connectionPolicy,
final UserAgentContainer userAgent,
final IAddressResolver addressResolver) {
this(
new Options.Builder(connectionPolicy).userAgent(userAgent).build(),
configs.getSslContext(),
addressResolver);
}
RntbdTransportClient(final RntbdEndpoint.Provider endpointProvider) {
this.endpointProvider = endpointProvider;
this.id = instanceCount.incrementAndGet();
this.tag = RntbdTransportClient.tag(this.id);
}
RntbdTransportClient(
final Options options,
final SslContext sslContext,
final IAddressResolver addressResolver) {
this.endpointProvider = new RntbdServiceEndpoint.Provider(
this,
options,
checkNotNull(sslContext, "expected non-null sslContext"),
addressResolver);
this.id = instanceCount.incrementAndGet();
this.tag = RntbdTransportClient.tag(this.id);
}
// endregion
// region Methods
/**
* {@code true} if this {@linkplain RntbdTransportClient client} is closed.
*
* @return {@code true} if this {@linkplain RntbdTransportClient client} is closed; {@code false} otherwise.
*/
public boolean isClosed() {
return this.closed.get();
}
/**
* Closes this {@linkplain RntbdTransportClient client} and releases all resources associated with it.
*/
@Override
public void close() {
if (this.closed.compareAndSet(false, true)) {
logger.debug("close {}", this);
this.endpointProvider.close();
return;
}
logger.debug("already closed {}", this);
}
/**
* The number of {@linkplain RntbdEndpoint endpoints} allocated to this {@linkplain RntbdTransportClient client}.
*
* @return The number of {@linkplain RntbdEndpoint endpoints} associated with this {@linkplain RntbdTransportClient
* client}.
*/
public int endpointCount() {
return this.endpointProvider.count();
}
public int endpointEvictionCount() {
return this.endpointProvider.evictions();
}
/**
* The integer identity of this {@linkplain RntbdTransportClient client}.
* <p>
* Clients are numbered sequentially based on the order in which they are initialized.
*
* @return The integer identity of this {@linkplain RntbdTransportClient client}.
*/
public long id() {
return this.id;
}
/**
* Issues a Direct TCP request to the specified Cosmos service address asynchronously.
*
* @param addressUri A Cosmos service address.
* @param request The {@linkplain RxDocumentServiceRequest request} to issue.
*
* @return A {@link Mono} of type {@link StoreResponse} that will complete when the Direct TCP request completes.
* I shI
* @throws TransportException if this {@linkplain RntbdTransportClient client} is closed.
*/
@Override
public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocumentServiceRequest request) {
checkNotNull(addressUri, "expected non-null addressUri");
checkNotNull(request, "expected non-null request");
this.throwIfClosed();
final URI address = addressUri.getURI();
final RntbdRequestArgs requestArgs = new RntbdRequestArgs(request, address);
final RntbdEndpoint endpoint = this.endpointProvider.get(address);
final RntbdRequestRecord record = endpoint.request(requestArgs);
final Context reactorContext = Context.of(KEY_ON_ERROR_DROPPED, onErrorDropHookWithReduceLogLevel);
final Mono<StoreResponse> result = Mono.fromFuture(record.whenComplete((response, throwable) -> {
record.stage(RntbdRequestRecord.Stage.COMPLETED);
if (request.requestContext.cosmosDiagnostics == null) {
request.requestContext.cosmosDiagnostics = request.createCosmosDiagnostics();
}
if (response != null) {
RequestTimeline timeline = record.takeTimelineSnapshot();
response.setRequestTimeline(timeline);
response.setEndpointStatistics(record.serviceEndpointStatistics());
response.setRntbdResponseLength(record.responseLength());
response.setRntbdRequestLength(record.requestLength());
response.setRequestPayloadLength(request.getContentLength());
response.setRntbdChannelTaskQueueSize(record.channelTaskQueueLength());
response.setRntbdPendingRequestSize(record.pendingRequestQueueSize());
response.setChannelAcquisitionTimeline(record.getChannelAcquisitionTimeline());
}
})).onErrorMap(throwable -> {
Throwable error = throwable instanceof CompletionException ? throwable.getCause() : throwable;
if (!(error instanceof CosmosException)) {
String unexpectedError = RntbdObjectMapper.toJson(error);
reportIssue(logger, endpoint,
"request completed with an unexpected {}: \\{\"record\":{},\"error\":{}}",
error.getClass(),
record,
unexpectedError);
error = new GoneException(
lenientFormat("an unexpected %s occurred: %s", unexpectedError),
address,
error instanceof Exception ? (Exception) error : new RuntimeException(error));
}
assert error instanceof CosmosException;
CosmosException cosmosException = (CosmosException) error;
BridgeInternal.setServiceEndpointStatistics(cosmosException, record.serviceEndpointStatistics());
BridgeInternal.setRntbdRequestLength(cosmosException, record.requestLength());
BridgeInternal.setRntbdResponseLength(cosmosException, record.responseLength());
BridgeInternal.setRequestBodyLength(cosmosException, request.getContentLength());
BridgeInternal.setRequestTimeline(cosmosException, record.takeTimelineSnapshot());
BridgeInternal.setRntbdPendingRequestQueueSize(cosmosException, record.pendingRequestQueueSize());
BridgeInternal.setChannelTaskQueueSize(cosmosException, record.channelTaskQueueLength());
BridgeInternal.setSendingRequestStarted(cosmosException, record.hasSendingRequestStarted());
BridgeInternal.setChannelAcquisitionTimeline(cosmosException, record.getChannelAcquisitionTimeline());
return cosmosException;
});
return result.doFinally(signalType -> {
// This lambda ensures that a pending Direct TCP request in a reactive stream dropped by an end user or the
// HA layer completes without bubbling up to reactor.core.publisher.Hooks#onErrorDropped as a
// CompletionException error. Pending requests may be left outstanding when, for example, an end user calls
// CosmosAsyncClient#close or the HA layer detects that a partition split has occurred. This code guarantees
// that each pending Mono<StoreResponse> in the stream will run to completion with a new subscriber.
// Consequently the default Hooks#onErrorDropped method will not be called thus preventing distracting error
// messages.
//
// This lambda does not prevent requests that complete exceptionally before the call to this lambda from
// bubbling up to Hooks#onErrorDropped as CompletionException errors. We will still see some onErrorDropped
// messages due to CompletionException errors. Anecdotal evidence shows that this is more likely to be seen
// in low latency environments on Azure cloud. To avoid the onErrorDropped events to get logged in the
// default hook (which logs with level ERROR) we inject a local hook in the Reactor Context to just log it
// as DEBUG level for the lifecycle of this Mono (safe here because we know the onErrorDropped doesn't have
// any functional issues.
//
// One might be tempted to complete a pending request here, but that is ill advised. Testing and
// inspection of the reactor code shows that this does not prevent errors from bubbling up to
// reactor.core.publisher.Hooks#onErrorDropped. Worse than this it has been seen to cause failures in
// the HA layer:
//
// * Calling record.cancel or record.completeExceptionally causes failures in (low-latency) cloud
// environments and all errors bubble up Hooks#onErrorDropped.
//
// * Calling record.complete with a null value causes failures in all environments, depending on the
// operation being performed. In short: many of our tests fail.
if (signalType != SignalType.CANCEL) {
return;
}
result.subscribe(
response -> {
if (logger.isDebugEnabled()) {
logger.debug(
"received response to cancelled request: {\"request\":{},\"response\":{\"type\":{},"
+ "\"value\":{}}}}",
RntbdObjectMapper.toJson(record),
response.getClass().getSimpleName(),
RntbdObjectMapper.toJson(response));
}
},
throwable -> {
if (logger.isDebugEnabled()) {
logger.debug(
"received response to cancelled request: {\"request\":{},\"response\":{\"type\":{},"
+ "\"value\":{}}}",
RntbdObjectMapper.toJson(record),
throwable.getClass().getSimpleName(),
RntbdObjectMapper.toJson(throwable));
}
});
}).subscriberContext(reactorContext);
}
/**
* The key-value pair used to classify and drill into metrics produced by this {@linkplain RntbdTransportClient
* client}.
*
* @return The key-value pair used to classify and drill into metrics collected by this {@linkplain
* RntbdTransportClient client}.
*/
public Tag tag() {
return this.tag;
}
@Override
public String toString() {
return RntbdObjectMapper.toString(this);
}
// endregion
// region Privates
private static Tag tag(long id) {
return Tag.of(TAG_NAME, Strings.padStart(Long.toHexString(id).toUpperCase(Locale.ROOT), 4, '0'));
}
private void throwIfClosed() {
if (this.closed.get()) {
throw new TransportException(lenientFormat("%s is closed", this), null);
}
}
// endregion
// region Types
public static final class Options {
private static final int DEFAULT_MIN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT = 10_000;
// region Fields
@JsonProperty()
private final int bufferPageSize;
@JsonProperty()
private final Duration connectionAcquisitionTimeout;
@JsonProperty()
private final boolean connectionEndpointRediscoveryEnabled;
@JsonProperty()
private final Duration connectTimeout;
@JsonProperty()
private final Duration idleChannelTimeout;
@JsonProperty()
private final Duration idleChannelTimerResolution;
@JsonProperty()
private final Duration idleEndpointTimeout;
@JsonProperty()
private final int maxBufferCapacity;
@JsonProperty()
private final int maxChannelsPerEndpoint;
@JsonProperty()
private final int maxRequestsPerChannel;
@JsonProperty()
private final int maxConcurrentRequestsPerEndpointOverride;
@JsonProperty()
private final Duration receiveHangDetectionTime;
@JsonProperty()
private final Duration tcpNetworkRequestTimeout;
@JsonProperty()
private final Duration requestTimerResolution;
@JsonProperty()
private final Duration sendHangDetectionTime;
@JsonProperty()
private final Duration shutdownTimeout;
@JsonProperty()
private final int threadCount;
@JsonIgnore()
private final UserAgentContainer userAgent;
@JsonProperty()
private final boolean channelAcquisitionContextEnabled;
@JsonProperty()
private final int ioThreadPriority;
@JsonProperty()
private final int tcpKeepIntvl;
@JsonProperty()
private final int tcpKeepIdle;
@JsonProperty()
private final boolean preferTcpNative;
// endregion
// region Constructors
@JsonCreator
private Options() {
this(ConnectionPolicy.getDefaultPolicy());
}
private Options(final Builder builder) {
this.bufferPageSize = builder.bufferPageSize;
this.connectionAcquisitionTimeout = builder.connectionAcquisitionTimeout;
this.connectionEndpointRediscoveryEnabled = builder.connectionEndpointRediscoveryEnabled;
this.idleChannelTimeout = builder.idleChannelTimeout;
this.idleChannelTimerResolution = builder.idleChannelTimerResolution;
this.idleEndpointTimeout = builder.idleEndpointTimeout;
this.maxBufferCapacity = builder.maxBufferCapacity;
this.maxChannelsPerEndpoint = builder.maxChannelsPerEndpoint;
this.maxRequestsPerChannel = builder.maxRequestsPerChannel;
this.maxConcurrentRequestsPerEndpointOverride = builder.maxConcurrentRequestsPerEndpointOverride;
this.receiveHangDetectionTime = builder.receiveHangDetectionTime;
this.tcpNetworkRequestTimeout = builder.tcpNetworkRequestTimeout;
this.requestTimerResolution = builder.requestTimerResolution;
this.sendHangDetectionTime = builder.sendHangDetectionTime;
this.shutdownTimeout = builder.shutdownTimeout;
this.threadCount = builder.threadCount;
this.userAgent = builder.userAgent;
this.channelAcquisitionContextEnabled = builder.channelAcquisitionContextEnabled;
this.ioThreadPriority = builder.ioThreadPriority;
this.tcpKeepIntvl = builder.tcpKeepIntvl;
this.tcpKeepIdle = builder.tcpKeepIdle;
this.preferTcpNative = builder.preferTcpNative;
this.connectTimeout = builder.connectTimeout == null
? builder.tcpNetworkRequestTimeout
: builder.connectTimeout;
}
private Options(final ConnectionPolicy connectionPolicy) {
this.bufferPageSize = 8192;
this.connectionAcquisitionTimeout = Duration.ofSeconds(5L);
this.connectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled();
this.connectTimeout = connectionPolicy.getConnectTimeout();
this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout();
this.idleChannelTimerResolution = Duration.ofMillis(100);
this.idleEndpointTimeout = connectionPolicy.getIdleTcpEndpointTimeout();
this.maxBufferCapacity = 8192 << 10;
this.maxChannelsPerEndpoint = connectionPolicy.getMaxConnectionsPerEndpoint();
this.maxRequestsPerChannel = connectionPolicy.getMaxRequestsPerConnection();
this.maxConcurrentRequestsPerEndpointOverride = -1;
this.receiveHangDetectionTime = Duration.ofSeconds(65L);
this.tcpNetworkRequestTimeout = connectionPolicy.getTcpNetworkRequestTimeout();
this.requestTimerResolution = Duration.ofMillis(100L);
this.sendHangDetectionTime = Duration.ofSeconds(10L);
this.shutdownTimeout = Duration.ofSeconds(15L);
this.threadCount = connectionPolicy.getIoThreadCountPerCoreFactor() *
Runtime.getRuntime().availableProcessors();
this.userAgent = new UserAgentContainer();
this.channelAcquisitionContextEnabled = false;
this.ioThreadPriority = Thread.NORM_PRIORITY;
this.tcpKeepIntvl = 1; // Configuration for EpollChannelOption.TCP_KEEPINTVL
this.tcpKeepIdle = 30; // Configuration for EpollChannelOption.TCP_KEEPIDLE
this.preferTcpNative = true;
}
// endregion
// region Accessors
public int bufferPageSize() {
return this.bufferPageSize;
}
public Duration connectionAcquisitionTimeout() {
return this.connectionAcquisitionTimeout;
}
public Duration connectTimeout() {
return this.connectTimeout;
}
public Duration idleChannelTimeout() {
return this.idleChannelTimeout;
}
public Duration idleChannelTimerResolution() { return this.idleChannelTimerResolution; }
public Duration idleEndpointTimeout() {
return this.idleEndpointTimeout;
}
public boolean isConnectionEndpointRediscoveryEnabled() {
return this.connectionEndpointRediscoveryEnabled;
}
public int maxBufferCapacity() {
return this.maxBufferCapacity;
}
public int maxChannelsPerEndpoint() {
return this.maxChannelsPerEndpoint;
}
public int maxRequestsPerChannel() {
return this.maxRequestsPerChannel;
}
public int maxConcurrentRequestsPerEndpoint() {
if (this.maxConcurrentRequestsPerEndpointOverride > 0) {
return maxConcurrentRequestsPerEndpointOverride;
}
return Math.max(
DEFAULT_MIN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT,
this.maxChannelsPerEndpoint * this.maxRequestsPerChannel);
}
public Duration receiveHangDetectionTime() {
return this.receiveHangDetectionTime;
}
public Duration tcpNetworkRequestTimeout() {
return this.tcpNetworkRequestTimeout;
}
public Duration requestTimerResolution() {
return this.requestTimerResolution;
}
public Duration sendHangDetectionTime() {
return this.sendHangDetectionTime;
}
public Duration shutdownTimeout() {
return this.shutdownTimeout;
}
public int threadCount() {
return this.threadCount;
}
public UserAgentContainer userAgent() {
return this.userAgent;
}
public boolean isChannelAcquisitionContextEnabled() { return this.channelAcquisitionContextEnabled; }
public int ioThreadPriority() {
checkArgument(
this.ioThreadPriority >= Thread.MIN_PRIORITY && this.ioThreadPriority <= Thread.MAX_PRIORITY,
"Expect ioThread priority between [%s, %s]",
Thread.MIN_PRIORITY,
Thread.MAX_PRIORITY);
return this.ioThreadPriority;
}
public int tcpKeepIntvl() { return this.tcpKeepIntvl; }
public int tcpKeepIdle() { return this.tcpKeepIdle; }
public boolean preferTcpNative() { return this.preferTcpNative; }
// endregion
// region Methods
@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
}
// endregion
// region Types
/**
* A builder for constructing {@link Options} instances.
*
* <h3>Using system properties to set the default {@link Options} used by an {@link Builder}</h3>
* <p>
* A default options instance is created when the {@link Builder} class is initialized. This instance specifies
* the default options used by every {@link Builder} instance. In priority order the default options instance
* is created from:
* <ol>
* <li>The JSON value of system property {@code azure.cosmos.directTcp.defaultOptions}.
* <p>Example:
* <pre>{@code -Dazure.cosmos.directTcp.defaultOptions={\"maxChannelsPerEndpoint\":5,\"maxRequestsPerChannel\":30}}</pre>
* </li>
* <li>The contents of the JSON file located by system property {@code azure.cosmos.directTcp
* .defaultOptionsFile}.
* <p>Example:
* <pre>{@code -Dazure.cosmos.directTcp.defaultOptionsFile=/path/to/default/options/file}</pre>
* </li>
* <li>The contents of JSON resource file {@code azure.cosmos.directTcp.defaultOptions.json}.
* <p>Specifically, the resource file is read from this stream:
* <pre>{@code RntbdTransportClient.class.getClassLoader().getResourceAsStream("azure.cosmos.directTcp.defaultOptions.json")}</pre>
* <p>Example: <pre>{@code {
* "bufferPageSize": 8192,
* "connectionEndpointRediscoveryEnabled": false,
* "connectTimeout": "PT5S",
* "idleChannelTimeout": "PT0S",
* "idleEndpointTimeout": "PT1H",
* "maxBufferCapacity": 8388608,
* "maxChannelsPerEndpoint": 130,
* "maxRequestsPerChannel": 30,
* "maxConcurrentRequestsPerEndpointOverride": -1,
* "receiveHangDetectionTime": "PT1M5S",
* "requestTimeout": "PT5S",
* "requestTimerResolution": "PT100MS",
* "sendHangDetectionTime": "PT10S",
* "shutdownTimeout": "PT15S",
* "threadCount": 16
* }}</pre>
* </li>
* </ol>
* <p>JSON value errors are logged and then ignored. If none of the above values are available or all available
* values are in error, the default options instance is created from the private parameterless constructor for
* {@link Options}.
*/
@SuppressWarnings("UnusedReturnValue")
public static class Builder {
// region Fields
private static final String DEFAULT_OPTIONS_PROPERTY_NAME = "azure.cosmos.directTcp.defaultOptions";
private static final Options DEFAULT_OPTIONS;
static {
Options options = null;
try {
final String string = System.getProperty(DEFAULT_OPTIONS_PROPERTY_NAME);
if (string != null) {
// Attempt to set default options based on the JSON string value of "{propertyName}"
try {
options = RntbdObjectMapper.readValue(string, Options.class);
} catch (IOException error) {
logger.error("failed to parse default Direct TCP options {} due to ", string, error);
}
}
if (options == null) {
final String path = System.getProperty(DEFAULT_OPTIONS_PROPERTY_NAME + "File");
if (path != null) {
// Attempt to load default options from the JSON file on the path specified by
// "{propertyName}File"
try {
options = RntbdObjectMapper.readValue(new File(path), Options.class);
} catch (IOException error) {
logger.error("failed to load default Direct TCP options from {} due to ", path, error);
}
}
}
if (options == null) {
final ClassLoader loader = RntbdTransportClient.class.getClassLoader();
final String name = DEFAULT_OPTIONS_PROPERTY_NAME + ".json";
try (InputStream stream = loader.getResourceAsStream(name)) {
if (stream != null) {
// Attempt to load default options from the JSON resource file "{propertyName}.json"
options = RntbdObjectMapper.readValue(stream, Options.class);
}
} catch (IOException error) {
logger.error("failed to load Direct TCP options from resource {} due to ", name, error);
}
}
} finally {
if (options == null) {
logger.info("Using default Direct TCP options: {}", DEFAULT_OPTIONS_PROPERTY_NAME);
DEFAULT_OPTIONS = new Options(ConnectionPolicy.getDefaultPolicy());
} else {
logger.info("Updated default Direct TCP options from system property {}: {}",
DEFAULT_OPTIONS_PROPERTY_NAME,
options);
DEFAULT_OPTIONS = options;
}
}
}
private int bufferPageSize;
private Duration connectionAcquisitionTimeout;
private boolean connectionEndpointRediscoveryEnabled;
private Duration connectTimeout;
private Duration idleChannelTimeout;
private Duration idleChannelTimerResolution;
private Duration idleEndpointTimeout;
private int maxBufferCapacity;
private int maxChannelsPerEndpoint;
private int maxRequestsPerChannel;
private int maxConcurrentRequestsPerEndpointOverride;
private Duration receiveHangDetectionTime;
private Duration tcpNetworkRequestTimeout;
private Duration requestTimerResolution;
private Duration sendHangDetectionTime;
private Duration shutdownTimeout;
private int threadCount;
private UserAgentContainer userAgent;
private boolean channelAcquisitionContextEnabled;
private int ioThreadPriority;
private int tcpKeepIntvl;
private int tcpKeepIdle;
private boolean preferTcpNative;
// endregion
// region Constructors
public Builder(ConnectionPolicy connectionPolicy) {
this.bufferPageSize = DEFAULT_OPTIONS.bufferPageSize;
this.connectionAcquisitionTimeout = DEFAULT_OPTIONS.connectionAcquisitionTimeout;
this.connectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled();
this.connectTimeout = connectionPolicy.getConnectTimeout();
this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout();
this.idleChannelTimerResolution = DEFAULT_OPTIONS.idleChannelTimerResolution;
this.idleEndpointTimeout = connectionPolicy.getIdleTcpEndpointTimeout();
this.maxBufferCapacity = DEFAULT_OPTIONS.maxBufferCapacity;
this.maxChannelsPerEndpoint = connectionPolicy.getMaxConnectionsPerEndpoint();
this.maxRequestsPerChannel = connectionPolicy.getMaxRequestsPerConnection();
this.maxConcurrentRequestsPerEndpointOverride =
DEFAULT_OPTIONS.maxConcurrentRequestsPerEndpointOverride;
this.receiveHangDetectionTime = DEFAULT_OPTIONS.receiveHangDetectionTime;
this.tcpNetworkRequestTimeout = connectionPolicy.getTcpNetworkRequestTimeout();
this.requestTimerResolution = DEFAULT_OPTIONS.requestTimerResolution;
this.sendHangDetectionTime = DEFAULT_OPTIONS.sendHangDetectionTime;
this.shutdownTimeout = DEFAULT_OPTIONS.shutdownTimeout;
this.threadCount = DEFAULT_OPTIONS.threadCount;
this.userAgent = DEFAULT_OPTIONS.userAgent;
this.channelAcquisitionContextEnabled = DEFAULT_OPTIONS.channelAcquisitionContextEnabled;
this.ioThreadPriority = DEFAULT_OPTIONS.ioThreadPriority;
this.tcpKeepIntvl = DEFAULT_OPTIONS.tcpKeepIntvl;
this.tcpKeepIdle = DEFAULT_OPTIONS.tcpKeepIdle;
this.preferTcpNative = DEFAULT_OPTIONS.preferTcpNative;
}
// endregion
// region Methods
public Builder bufferPageSize(final int value) {
checkArgument(value >= 4096 && (value & (value - 1)) == 0,
"expected value to be a power of 2 >= 4096, not %s",
value);
this.bufferPageSize = value;
return this;
}
public Options build() {
checkState(this.bufferPageSize <= this.maxBufferCapacity,
"expected bufferPageSize (%s) <= maxBufferCapacity (%s)",
this.bufferPageSize,
this.maxBufferCapacity);
return new Options(this);
}
public Builder connectionAcquisitionTimeout(final Duration value) {
checkNotNull(value, "expected non-null value");
this.connectionAcquisitionTimeout = value.compareTo(Duration.ZERO) < 0 ? Duration.ZERO : value;
return this;
}
public Builder connectionEndpointRediscoveryEnabled(final boolean value) {
this.connectionEndpointRediscoveryEnabled = value;
return this;
}
public Builder connectionTimeout(final Duration value) {
checkArgument(value == null || value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
value);
this.connectTimeout = value;
return this;
}
public Builder idleChannelTimeout(final Duration value) {
checkNotNull(value, "expected non-null value");
this.idleChannelTimeout = value;
return this;
}
public Builder idleChannelTimerResolution(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) <= 0,
"expected positive value, not %s",
value);
this.idleChannelTimerResolution = value;
return this;
}
public Builder idleEndpointTimeout(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
value);
this.idleEndpointTimeout = value;
return this;
}
public Builder maxBufferCapacity(final int value) {
checkArgument(value > 0 && (value & (value - 1)) == 0,
"expected positive value, not %s",
value);
this.maxBufferCapacity = value;
return this;
}
public Builder maxChannelsPerEndpoint(final int value) {
checkArgument(value > 0, "expected positive value, not %s", value);
this.maxChannelsPerEndpoint = value;
return this;
}
public Builder maxRequestsPerChannel(final int value) {
checkArgument(value > 0, "expected positive value, not %s", value);
this.maxRequestsPerChannel = value;
return this;
}
public Builder maxConcurrentRequestsPerEndpointOverride(final int value) {
checkArgument(value > 0, "expected positive value, not %s", value);
this.maxConcurrentRequestsPerEndpointOverride = value;
return this;
}
public Builder receiveHangDetectionTime(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
value);
this.receiveHangDetectionTime = value;
return this;
}
public Builder tcpNetworkRequestTimeout(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
value);
this.tcpNetworkRequestTimeout = value;
return this;
}
public Builder requestTimerResolution(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
value);
this.requestTimerResolution = value;
return this;
}
public Builder sendHangDetectionTime(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
value);
this.sendHangDetectionTime = value;
return this;
}
public Builder shutdownTimeout(final Duration value) {
checkArgument(value != null && value.compareTo(Duration.ZERO) > 0,
"expected positive value, not %s",
value);
this.shutdownTimeout = value;
return this;
}
public Builder threadCount(final int value) {
checkArgument(value > 0, "expected positive value, not %s", value);
this.threadCount = value;
return this;
}
public Builder userAgent(final UserAgentContainer value) {
checkNotNull(value, "expected non-null value");
this.userAgent = value;
return this;
}
// endregion
}
// endregion
}
static final class JsonSerializer extends StdSerializer<RntbdTransportClient> {
private static final long serialVersionUID = 1007663695768825670L;
JsonSerializer() {
super(RntbdTransportClient.class);
}
@Override
public void serialize(
final RntbdTransportClient value,
final JsonGenerator generator,
final SerializerProvider provider
) throws IOException {
generator.writeStartObject();
generator.writeNumberField("id", value.id());
generator.writeBooleanField("isClosed", value.isClosed());
generator.writeObjectField("configuration", value.endpointProvider.config());
generator.writeObjectFieldStart("serviceEndpoints");
generator.writeNumberField("count", value.endpointCount());
generator.writeArrayFieldStart("items");
for (final Iterator<RntbdEndpoint> iterator = value.endpointProvider.list().iterator(); iterator.hasNext(); ) {
generator.writeObject(iterator.next());
}
generator.writeEndArray();
generator.writeEndObject();
generator.writeEndObject();
}
}
// endregion
}