// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.directconnectivity.rntbd;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.ConflictException;
import com.azure.cosmos.implementation.CosmosError;
import com.azure.cosmos.implementation.ForbiddenException;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.InvalidPartitionException;
import com.azure.cosmos.implementation.LockedException;
import com.azure.cosmos.implementation.MethodNotAllowedException;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.PreconditionFailedException;
import com.azure.cosmos.implementation.RequestEntityTooLargeException;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.RetryWithException;
import com.azure.cosmos.implementation.ServiceUnavailableException;
import com.azure.cosmos.implementation.UnauthorizedException;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.channel.EventLoop;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCounted;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.ThrowableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import static com.azure.cosmos.implementation.HttpConstants.StatusCodes;
import static com.azure.cosmos.implementation.HttpConstants.SubStatusCodes;
import static com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdClientChannelHealthChecker.Timestamps;
import static com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants.RntbdResponseHeader;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat;
public final class RntbdRequestManager implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler {
// region Fields
private static final ClosedChannelException ON_CHANNEL_UNREGISTERED =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(), RntbdRequestManager.class, "channelUnregistered");
private static final ClosedChannelException ON_CLOSE =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(), RntbdRequestManager.class, "close");
private static final ClosedChannelException ON_DEREGISTER =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(), RntbdRequestManager.class, "deregister");
private static final EventExecutor requestExpirationExecutor = new DefaultEventExecutor(new RntbdThreadFactory(
private static final Logger logger = LoggerFactory.getLogger(RntbdRequestManager.class);
private final CompletableFuture<RntbdContext> contextFuture = new CompletableFuture<>();
private final CompletableFuture<RntbdContextRequest> contextRequestFuture = new CompletableFuture<>();
private final ChannelHealthChecker healthChecker;
private final int pendingRequestLimit;
private final ConcurrentHashMap<Long, RntbdRequestRecord> pendingRequests;
private final Timestamps timestamps = new Timestamps();
private boolean closingExceptionally = false;
private CoalescingBufferQueue pendingWrites;
// endregion
public RntbdRequestManager(final ChannelHealthChecker healthChecker, final int pendingRequestLimit) {
checkArgument(pendingRequestLimit > 0, "pendingRequestLimit: %s", pendingRequestLimit);
checkNotNull(healthChecker, "healthChecker");
this.pendingRequests = new ConcurrentHashMap<>(pendingRequestLimit);
this.pendingRequestLimit = pendingRequestLimit;
this.healthChecker = healthChecker;
// region ChannelHandler methods
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
public void handlerAdded(final ChannelHandlerContext context) {
this.traceOperation(context, "handlerAdded");
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
public void handlerRemoved(final ChannelHandlerContext context) {
this.traceOperation(context, "handlerRemoved");
// endregion
// region ChannelInboundHandler methods
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
public void channelActive(final ChannelHandlerContext context) {
this.traceOperation(context, "channelActive");
* The {@link Channel} of the {@link ChannelHandlerContext} was registered and has reached the end of its lifetime
* <p>
* This method will only be called after the channel is closed.
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
public void channelInactive(final ChannelHandlerContext context) {
this.traceOperation(context, "channelInactive");
* The {@link Channel} of the {@link ChannelHandlerContext} has read a message from its peer.
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs.
* @param message The message read.
public void channelRead(final ChannelHandlerContext context, final Object message) {
this.traceOperation(context, "channelRead");
try {
if (message.getClass() == RntbdResponse.class) {
try {
this.messageReceived(context, (RntbdResponse) message);
} catch (CorruptedFrameException error) {
this.exceptionCaught(context, error);
} catch (Throwable throwable) {
reportIssue(context, "{} ", message, throwable);
this.exceptionCaught(context, throwable);
} else {
final IllegalStateException error = new IllegalStateException(
lenientFormat("expected message of %s, not %s: %s",
reportIssue(context, "", error);
this.exceptionCaught(context, error);
} finally {
if (message instanceof ReferenceCounted) {
boolean released = ((ReferenceCounted) message).release();
reportIssueUnless(released, context, "failed to release message: {}", message);
* The {@link Channel} of the {@link ChannelHandlerContext} has fully consumed the most-recent message read.
* <p>
* If {@link ChannelOption#AUTO_READ} is off, no further attempt to read inbound data from the current
* {@link Channel} will be made until {@link ChannelHandlerContext#read} is called. This leaves time
* for outbound messages to be written.
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
public void channelReadComplete(final ChannelHandlerContext context) {
this.traceOperation(context, "channelReadComplete");
* Constructs a {@link CoalescingBufferQueue} for buffering encoded requests until we have an {@link RntbdRequest}
* <p>
* This method then calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward to the next
* {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
* <p>
* Sub-classes may override this method to change behavior.
* @param context the {@link ChannelHandlerContext} for which the bind operation is made
public void channelRegistered(final ChannelHandlerContext context) {
this.traceOperation(context, "channelRegistered");
reportIssueUnless(this.pendingWrites == null, context, "pendingWrites: {}", pendingWrites);
this.pendingWrites = new CoalescingBufferQueue(context.channel());
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
public void channelUnregistered(final ChannelHandlerContext context) {
this.traceOperation(context, "channelUnregistered");
if (!this.closingExceptionally) {
this.completeAllPendingRequestsExceptionally(context, ON_CHANNEL_UNREGISTERED);
} else {
logger.debug("{} channelUnregistered exceptionally", context);
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
public void channelWritabilityChanged(final ChannelHandlerContext context) {
this.traceOperation(context, "channelWritabilityChanged");
* Processes {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} in the {@link ChannelPipeline}
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
* @param cause Exception caught
public void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) {
// TODO: DANOBLE: replace RntbdRequestManager.exceptionCaught with read/write listeners
// Notes:
// ChannelInboundHandler.exceptionCaught is deprecated and--today, prior to deprecation--only catches read--
// i.e., inbound--exceptions.
// Replacements:
// * read listener: unclear as there is no obvious replacement
// * write listener: implemented by RntbdTransportClient.DefaultEndpoint.doWrite
// Links:
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/373213
this.traceOperation(context, "exceptionCaught", cause);
if (!this.closingExceptionally) {
this.completeAllPendingRequestsExceptionally(context, cause);
logger.debug("{} closing due to:", context, cause);
* Processes inbound events triggered by channel handlers in the {@link RntbdClientChannelHandler} pipeline
* <p>
* All but inbound request management events are ignored.
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
* @param event An object representing a user event
public void userEventTriggered(final ChannelHandlerContext context, final Object event) {
this.traceOperation(context, "userEventTriggered", event);
try {
if (event instanceof IdleStateEvent) {
// NOTE: if the connection is killed this may not receive any event
this.healthChecker.isHealthy(context.channel()).addListener((Future<Boolean> future) -> {
final Throwable cause;
if (future.isSuccess()) {
if (future.get()) {
cause = UnhealthyChannelException.INSTANCE;
} else {
cause = future.cause();
this.exceptionCaught(context, cause);
if (event instanceof RntbdContext) {
this.contextFuture.complete((RntbdContext) event);
if (event instanceof RntbdContextException) {
this.contextFuture.completeExceptionally((RntbdContextException) event);
} catch (Throwable error) {
reportIssue(context, "{}: ", event, error);
this.exceptionCaught(context, error);
// endregion
// region ChannelOutboundHandler methods
* Called once a bind operation is made.
* @param context the {@link ChannelHandlerContext} for which the bind operation is made
* @param localAddress the {@link SocketAddress} to which it should bound
* @param promise the {@link ChannelPromise} to notify once the operation completes
public void bind(final ChannelHandlerContext context, final SocketAddress localAddress, final ChannelPromise promise) {
this.traceOperation(context, "bind", localAddress);
context.bind(localAddress, promise);
* Called once a close operation is made.
* @param context the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
public void close(final ChannelHandlerContext context, final ChannelPromise promise) {
this.traceOperation(context, "close");
if (!this.closingExceptionally) {
this.completeAllPendingRequestsExceptionally(context, ON_CLOSE);
} else {
logger.debug("{} closed exceptionally", context);
final SslHandler sslHandler = context.pipeline().get(SslHandler.class);
if (sslHandler != null) {
try {
// Netty 4.1.36.Final: SslHandler.closeOutbound must be called before closing the pipeline
// This ensures that all SSL engine and ByteBuf resources are released
// This is something that does not occur in the call to ChannelPipeline.close that follows
} catch (Exception exception) {
// Netty will throw the following exception here if the outbound SSL connection has been closed already
// javax.net.ssl.SSLException: SSLEngine closed already
// Reducing the noise level here because multiple concurrent closes can happen due to race conditions
// and there is no harm in this case
if (exception instanceof SSLException) {
"SslException when attempting to close the outbound SSL connection: ",
} else {
"Exception when attempting to close the outbound SSL connection: ",
throw exception;
* Called once a connect operation is made.
* @param context the {@link ChannelHandlerContext} for which the connect operation is made
* @param remoteAddress the {@link SocketAddress} to which it should connect
* @param localAddress the {@link SocketAddress} which is used as source on connect
* @param promise the {@link ChannelPromise} to notify once the operation completes
public void connect(
final ChannelHandlerContext context, final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise
) {
this.traceOperation(context, "connect", remoteAddress, localAddress);
context.connect(remoteAddress, localAddress, promise);
* Called once a deregister operation is made from the current registered {@link EventLoop}.
* @param context the {@link ChannelHandlerContext} for which the deregister operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
public void deregister(final ChannelHandlerContext context, final ChannelPromise promise) {
this.traceOperation(context, "deregister");
if (!this.closingExceptionally) {
this.completeAllPendingRequestsExceptionally(context, ON_DEREGISTER);
} else {
logger.debug("{} deregistered exceptionally", context);
* Called once a disconnect operation is made.
* @param context the {@link ChannelHandlerContext} for which the disconnect operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
public void disconnect(final ChannelHandlerContext context, final ChannelPromise promise) {
this.traceOperation(context, "disconnect");
* Called once a flush operation is made
* <p>
* The flush operation will try to flush out all previous written messages that are pending.
* @param context the {@link ChannelHandlerContext} for which the flush operation is made
public void flush(final ChannelHandlerContext context) {
this.traceOperation(context, "flush");
* Intercepts {@link ChannelHandlerContext#read}
* @param context the {@link ChannelHandlerContext} for which the read operation is made
public void read(final ChannelHandlerContext context) {
this.traceOperation(context, "read");
* Called once a write operation is made
* <p>
* The write operation will send messages through the {@link ChannelPipeline} which are then ready to be flushed
* to the actual {@link Channel}. This will occur when {@link Channel#flush} is called.
* @param context the {@link ChannelHandlerContext} for which the write operation is made
* @param message the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes
public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise promise) {
this.traceOperation(context, "write", message);
if (message instanceof RntbdRequestRecord) {
final RntbdRequestRecord record = (RntbdRequestRecord) message;
context.write(this.addPendingRequestRecord(context, record), promise).addListener(completed -> {
if (completed.isSuccess()) {
if (message == RntbdHealthCheckRequest.MESSAGE) {
context.write(RntbdHealthCheckRequest.MESSAGE, promise).addListener(completed -> {
if (completed.isSuccess()) {
final IllegalStateException error = new IllegalStateException(lenientFormat("message of %s: %s",
reportIssue(context, "", error);
this.exceptionCaught(context, error);
// endregion
// region Package private methods
int pendingRequestCount() {
return this.pendingRequests.size();
Optional<RntbdContext> rntbdContext() {
return Optional.of(this.contextFuture.getNow(null));
CompletableFuture<RntbdContextRequest> rntbdContextRequestFuture() {
return this.contextRequestFuture;
boolean hasRequestedRntbdContext() {
return this.contextRequestFuture.getNow(null) != null;
boolean hasRntbdContext() {
return this.contextFuture.getNow(null) != null;
RntbdChannelState getChannelState(final int demand) {
reportIssueUnless(this.hasRequestedRntbdContext(), this, "Direct TCP context request was not issued");
final int limit = this.hasRntbdContext() ? this.pendingRequestLimit : Math.min(this.pendingRequestLimit, demand);
if (this.pendingRequests.size() < limit) {
return RntbdChannelState.ok(this.pendingRequests.size());
if (this.hasRntbdContext()) {
return RntbdChannelState.pendingLimit(this.pendingRequests.size());
} else {
return RntbdChannelState.contextNegotiationPending((this.pendingRequests.size()));
void pendWrite(final ByteBuf out, final ChannelPromise promise) {
this.pendingWrites.add(out, promise);
Timestamps snapshotTimestamps() {
return new Timestamps(this.timestamps);
// endregion
// region Private methods
private RntbdRequestRecord addPendingRequestRecord(final ChannelHandlerContext context, final RntbdRequestRecord record) {
return this.pendingRequests.compute(record.transportRequestId(), (id, current) -> {
reportIssueUnless(current == null, context, "id: {}, current: {}, request: {}", record);
final Timeout pendingRequestTimeout = record.newTimeout(timeout -> {
// We don't wish to complete on the timeout thread, but rather on a thread doled out by our executor
record.whenComplete((response, error) -> {
return record;
private void completeAllPendingRequestsExceptionally(
final ChannelHandlerContext context, final Throwable throwable
) {
reportIssueUnless(!this.closingExceptionally, context, "", throwable);
this.closingExceptionally = true;
if (this.pendingWrites != null && !this.pendingWrites.isEmpty()) {
// an expensive call that fires at least one exceptionCaught event
this.pendingWrites.releaseAndFailAll(context, throwable);
if (this.pendingRequests.isEmpty()) {
if (!this.contextRequestFuture.isDone()) {
if (!this.contextFuture.isDone()) {
final int count = this.pendingRequests.size();
Exception contextRequestException = null;
String phrase = null;
if (this.contextRequestFuture.isCompletedExceptionally()) {
try {
} catch (final CancellationException error) {
phrase = "RNTBD context request write cancelled";
contextRequestException = error;
} catch (final Exception error) {
phrase = "RNTBD context request write failed";
contextRequestException = error;
} catch (final Throwable error) {
phrase = "RNTBD context request write failed";
contextRequestException = new ChannelException(error);
} else if (this.contextFuture.isCompletedExceptionally()) {
try {
} catch (final CancellationException error) {
phrase = "RNTBD context request read cancelled";
contextRequestException = error;
} catch (final Exception error) {
phrase = "RNTBD context request read failed";
contextRequestException = error;
} catch (final Throwable error) {
phrase = "RNTBD context request read failed";
contextRequestException = new ChannelException(error);
} else {
phrase = "closed exceptionally";
final String message = lenientFormat("%s %s with %s pending requests", context, phrase, count);
final Exception cause;
if (throwable instanceof ClosedChannelException) {
cause = contextRequestException == null
? (ClosedChannelException) throwable
: contextRequestException;
} else {
cause = throwable instanceof Exception
? (Exception) throwable
: new ChannelException(throwable);
for (RntbdRequestRecord record : this.pendingRequests.values()) {
final Map<String, String> requestHeaders = record.args().serviceRequest().getHeaders();
final String requestUri = record.args().physicalAddress().toString();
final GoneException error = new GoneException(message, cause, null, requestUri);
BridgeInternal.setRequestHeaders(error, requestHeaders);
* This method is called for each incoming message of type {@link RntbdResponse} to complete a request.
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager request manager} belongs.
* @param response the {@link RntbdResponse message} received.
private void messageReceived(final ChannelHandlerContext context, final RntbdResponse response) {
final Long transportRequestId = response.getTransportRequestId();
if (transportRequestId == null) {
reportIssue(context, "response ignored because its transportRequestId is missing: {}", response);
final RntbdRequestRecord requestRecord = this.pendingRequests.get(transportRequestId);
if (requestRecord == null) {
logger.debug("response {} ignored because its requestRecord is missing: {}", transportRequestId, response);
final HttpResponseStatus status = response.getStatus();
final UUID activityId = response.getActivityId();
final int statusCode = status.code();
if ((HttpResponseStatus.OK.code() <= statusCode && statusCode < HttpResponseStatus.MULTIPLE_CHOICES.code()) ||
statusCode == HttpResponseStatus.NOT_MODIFIED.code()) {
final StoreResponse storeResponse = response.toStoreResponse(this.contextFuture.getNow(null));
} else {
// Map response to a CosmosException
final CosmosException cause;
// ..Fetch required header values
final long lsn = response.getHeader(RntbdResponseHeader.LSN);
final String partitionKeyRangeId = response.getHeader(RntbdResponseHeader.PartitionKeyRangeId);
// ..Create Error instance
final CosmosError error = response.hasPayload()
? new CosmosError(RntbdObjectMapper.readTree(response))
: new CosmosError(Integer.toString(statusCode), status.reasonPhrase(), status.codeClass().name());
// ..Map RNTBD response headers to HTTP response headers
final Map<String, String> responseHeaders = response.getHeaders().asMap(
this.rntbdContext().orElseThrow(IllegalStateException::new), activityId
// ..Create CosmosException based on status and sub-status codes
final String resourceAddress = requestRecord.args().physicalAddress() != null ?
requestRecord.args().physicalAddress().toString() : null;
switch (status.code()) {
case StatusCodes.BADREQUEST:
cause = new BadRequestException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.CONFLICT:
cause = new ConflictException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.FORBIDDEN:
cause = new ForbiddenException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.GONE:
final int subStatusCode = Math.toIntExact(response.getHeader(RntbdResponseHeader.SubStatus));
switch (subStatusCode) {
case SubStatusCodes.COMPLETING_SPLIT:
cause = new PartitionKeyRangeIsSplittingException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = new PartitionIsMigratingException(error, lsn, partitionKeyRangeId, responseHeaders);
case SubStatusCodes.NAME_CACHE_IS_STALE:
cause = new InvalidPartitionException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = new PartitionKeyRangeGoneException(error, lsn, partitionKeyRangeId, responseHeaders);
GoneException goneExceptionFromService =
new GoneException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = goneExceptionFromService;
cause = new InternalServerErrorException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.LOCKED:
cause = new LockedException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.METHOD_NOT_ALLOWED:
cause = new MethodNotAllowedException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.NOTFOUND:
cause = new NotFoundException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = new PreconditionFailedException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = new RequestEntityTooLargeException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.REQUEST_TIMEOUT:
Exception inner = new RequestTimeoutException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = new GoneException(resourceAddress, error, lsn, partitionKeyRangeId, responseHeaders, inner);
case StatusCodes.RETRY_WITH:
cause = new RetryWithException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = new ServiceUnavailableException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.TOO_MANY_REQUESTS:
cause = new RequestRateTooLargeException(error, lsn, partitionKeyRangeId, responseHeaders);
case StatusCodes.UNAUTHORIZED:
cause = new UnauthorizedException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = BridgeInternal.createCosmosException(resourceAddress, status.code(), error, responseHeaders);
BridgeInternal.setResourceAddress(cause, resourceAddress);
private void removeContextNegotiatorAndFlushPendingWrites(final ChannelHandlerContext context) {
final RntbdContextNegotiator negotiator = context.pipeline().get(RntbdContextNegotiator.class);
if (!this.pendingWrites.isEmpty()) {
private static void reportIssue(final Object subject, final String format, final Object... args) {
RntbdReporter.reportIssue(logger, subject, format, args);
private static void reportIssueUnless(
final boolean predicate, final Object subject, final String format, final Object... args
) {
RntbdReporter.reportIssueUnless(logger, predicate, subject, format, args);
private void traceOperation(final ChannelHandlerContext context, final String operationName, final Object... args) {
logger.trace("{}\n{}\n{}", operationName, context, args);
// endregion
// region Types
private static final class UnhealthyChannelException extends ChannelException {
static final UnhealthyChannelException INSTANCE = new UnhealthyChannelException();
private UnhealthyChannelException() {
super("health check failed");
public Throwable fillInStackTrace() {
return this;
// endregion