RntbdRequestManager.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.directconnectivity.rntbd;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.ConflictException;
import com.azure.cosmos.implementation.CosmosError;
import com.azure.cosmos.implementation.ForbiddenException;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.InvalidPartitionException;
import com.azure.cosmos.implementation.LockedException;
import com.azure.cosmos.implementation.MethodNotAllowedException;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.PreconditionFailedException;
import com.azure.cosmos.implementation.RequestEntityTooLargeException;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.RetryWithException;
import com.azure.cosmos.implementation.ServiceUnavailableException;
import com.azure.cosmos.implementation.UnauthorizedException;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.channel.EventLoop;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCounted;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.ThrowableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import static com.azure.cosmos.implementation.HttpConstants.StatusCodes;
import static com.azure.cosmos.implementation.HttpConstants.SubStatusCodes;
import static com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdClientChannelHealthChecker.Timestamps;
import static com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants.RntbdResponseHeader;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat;
public final class RntbdRequestManager implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler {
// region Fields
private static final ClosedChannelException ON_CHANNEL_UNREGISTERED =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(), RntbdRequestManager.class, "channelUnregistered");
private static final ClosedChannelException ON_CLOSE =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(), RntbdRequestManager.class, "close");
private static final ClosedChannelException ON_DEREGISTER =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(), RntbdRequestManager.class, "deregister");
private static final EventExecutor requestExpirationExecutor = new DefaultEventExecutor(new RntbdThreadFactory(
"request-expirator",
true,
Thread.NORM_PRIORITY));
private static final Logger logger = LoggerFactory.getLogger(RntbdRequestManager.class);
private final CompletableFuture<RntbdContext> contextFuture = new CompletableFuture<>();
private final CompletableFuture<RntbdContextRequest> contextRequestFuture = new CompletableFuture<>();
private final ChannelHealthChecker healthChecker;
private final int pendingRequestLimit;
private final ConcurrentHashMap<Long, RntbdRequestRecord> pendingRequests;
private final Timestamps timestamps = new Timestamps();
private boolean closingExceptionally = false;
private CoalescingBufferQueue pendingWrites;
// endregion
public RntbdRequestManager(final ChannelHealthChecker healthChecker, final int pendingRequestLimit) {
checkArgument(pendingRequestLimit > 0, "pendingRequestLimit: %s", pendingRequestLimit);
checkNotNull(healthChecker, "healthChecker");
this.pendingRequests = new ConcurrentHashMap<>(pendingRequestLimit);
this.pendingRequestLimit = pendingRequestLimit;
this.healthChecker = healthChecker;
}
// region ChannelHandler methods
/**
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
*/
@Override
public void handlerAdded(final ChannelHandlerContext context) {
this.traceOperation(context, "handlerAdded");
}
/**
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
*/
@Override
public void handlerRemoved(final ChannelHandlerContext context) {
this.traceOperation(context, "handlerRemoved");
}
// endregion
// region ChannelInboundHandler methods
/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
*/
@Override
public void channelActive(final ChannelHandlerContext context) {
this.traceOperation(context, "channelActive");
context.fireChannelActive();
}
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered and has reached the end of its lifetime
* <p>
* This method will only be called after the channel is closed.
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
*/
@Override
public void channelInactive(final ChannelHandlerContext context) {
this.traceOperation(context, "channelInactive");
context.fireChannelInactive();
}
/**
* The {@link Channel} of the {@link ChannelHandlerContext} has read a message from its peer.
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs.
* @param message The message read.
*/
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) {
this.traceOperation(context, "channelRead");
try {
if (message.getClass() == RntbdResponse.class) {
try {
this.messageReceived(context, (RntbdResponse) message);
} catch (CorruptedFrameException error) {
this.exceptionCaught(context, error);
} catch (Throwable throwable) {
reportIssue(context, "{} ", message, throwable);
this.exceptionCaught(context, throwable);
}
} else {
final IllegalStateException error = new IllegalStateException(
lenientFormat("expected message of %s, not %s: %s",
RntbdResponse.class,
message.getClass(),
message));
reportIssue(context, "", error);
this.exceptionCaught(context, error);
}
} finally {
if (message instanceof ReferenceCounted) {
boolean released = ((ReferenceCounted) message).release();
reportIssueUnless(released, context, "failed to release message: {}", message);
}
}
}
/**
* The {@link Channel} of the {@link ChannelHandlerContext} has fully consumed the most-recent message read.
* <p>
* If {@link ChannelOption#AUTO_READ} is off, no further attempt to read inbound data from the current
* {@link Channel} will be made until {@link ChannelHandlerContext#read} is called. This leaves time
* for outbound messages to be written.
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
*/
@Override
public void channelReadComplete(final ChannelHandlerContext context) {
this.traceOperation(context, "channelReadComplete");
this.timestamps.channelReadCompleted();
context.fireChannelReadComplete();
}
/**
* Constructs a {@link CoalescingBufferQueue} for buffering encoded requests until we have an {@link RntbdRequest}
* <p>
* This method then calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward to the next
* {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
* <p>
* Sub-classes may override this method to change behavior.
*
* @param context the {@link ChannelHandlerContext} for which the bind operation is made
*/
@Override
public void channelRegistered(final ChannelHandlerContext context) {
this.traceOperation(context, "channelRegistered");
reportIssueUnless(this.pendingWrites == null, context, "pendingWrites: {}", pendingWrites);
this.pendingWrites = new CoalescingBufferQueue(context.channel());
context.fireChannelRegistered();
}
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
*/
@Override
public void channelUnregistered(final ChannelHandlerContext context) {
this.traceOperation(context, "channelUnregistered");
if (!this.closingExceptionally) {
this.completeAllPendingRequestsExceptionally(context, ON_CHANNEL_UNREGISTERED);
} else {
logger.debug("{} channelUnregistered exceptionally", context);
}
context.fireChannelUnregistered();
}
/**
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
*/
@Override
public void channelWritabilityChanged(final ChannelHandlerContext context) {
this.traceOperation(context, "channelWritabilityChanged");
context.fireChannelWritabilityChanged();
}
/**
* Processes {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} in the {@link ChannelPipeline}
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
* @param cause Exception caught
*/
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) {
// TODO: DANOBLE: replace RntbdRequestManager.exceptionCaught with read/write listeners
// Notes:
// ChannelInboundHandler.exceptionCaught is deprecated and--today, prior to deprecation--only catches read--
// i.e., inbound--exceptions.
// Replacements:
// * read listener: unclear as there is no obvious replacement
// * write listener: implemented by RntbdTransportClient.DefaultEndpoint.doWrite
// Links:
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/373213
this.traceOperation(context, "exceptionCaught", cause);
if (!this.closingExceptionally) {
this.completeAllPendingRequestsExceptionally(context, cause);
logger.debug("{} closing due to:", context, cause);
context.flush().close();
}
}
/**
* Processes inbound events triggered by channel handlers in the {@link RntbdClientChannelHandler} pipeline
* <p>
* All but inbound request management events are ignored.
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager} belongs
* @param event An object representing a user event
*/
@Override
public void userEventTriggered(final ChannelHandlerContext context, final Object event) {
this.traceOperation(context, "userEventTriggered", event);
try {
if (event instanceof IdleStateEvent) {
// NOTE: if the connection is killed this may not receive any event
this.healthChecker.isHealthy(context.channel()).addListener((Future<Boolean> future) -> {
final Throwable cause;
if (future.isSuccess()) {
if (future.get()) {
return;
}
cause = UnhealthyChannelException.INSTANCE;
} else {
cause = future.cause();
}
this.exceptionCaught(context, cause);
});
return;
}
if (event instanceof RntbdContext) {
this.contextFuture.complete((RntbdContext) event);
this.removeContextNegotiatorAndFlushPendingWrites(context);
return;
}
if (event instanceof RntbdContextException) {
this.contextFuture.completeExceptionally((RntbdContextException) event);
context.pipeline().flush().close();
return;
}
context.fireUserEventTriggered(event);
} catch (Throwable error) {
reportIssue(context, "{}: ", event, error);
this.exceptionCaught(context, error);
}
}
// endregion
// region ChannelOutboundHandler methods
/**
* Called once a bind operation is made.
*
* @param context the {@link ChannelHandlerContext} for which the bind operation is made
* @param localAddress the {@link SocketAddress} to which it should bound
* @param promise the {@link ChannelPromise} to notify once the operation completes
*/
@Override
public void bind(final ChannelHandlerContext context, final SocketAddress localAddress, final ChannelPromise promise) {
this.traceOperation(context, "bind", localAddress);
context.bind(localAddress, promise);
}
/**
* Called once a close operation is made.
*
* @param context the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
*/
@Override
public void close(final ChannelHandlerContext context, final ChannelPromise promise) {
this.traceOperation(context, "close");
if (!this.closingExceptionally) {
this.completeAllPendingRequestsExceptionally(context, ON_CLOSE);
} else {
logger.debug("{} closed exceptionally", context);
}
final SslHandler sslHandler = context.pipeline().get(SslHandler.class);
if (sslHandler != null) {
try {
// Netty 4.1.36.Final: SslHandler.closeOutbound must be called before closing the pipeline
// This ensures that all SSL engine and ByteBuf resources are released
// This is something that does not occur in the call to ChannelPipeline.close that follows
sslHandler.closeOutbound();
} catch (Exception exception) {
// Netty will throw the following exception here if the outbound SSL connection has been closed already
// javax.net.ssl.SSLException: SSLEngine closed already
// Reducing the noise level here because multiple concurrent closes can happen due to race conditions
// and there is no harm in this case
if (exception instanceof SSLException) {
logger.debug(
"SslException when attempting to close the outbound SSL connection: ",
exception);
} else {
logger.warn(
"Exception when attempting to close the outbound SSL connection: ",
exception);
throw exception;
}
}
}
context.close(promise);
}
/**
* Called once a connect operation is made.
*
* @param context the {@link ChannelHandlerContext} for which the connect operation is made
* @param remoteAddress the {@link SocketAddress} to which it should connect
* @param localAddress the {@link SocketAddress} which is used as source on connect
* @param promise the {@link ChannelPromise} to notify once the operation completes
*/
@Override
public void connect(
final ChannelHandlerContext context, final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise
) {
this.traceOperation(context, "connect", remoteAddress, localAddress);
context.connect(remoteAddress, localAddress, promise);
}
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param context the {@link ChannelHandlerContext} for which the deregister operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
*/
@Override
public void deregister(final ChannelHandlerContext context, final ChannelPromise promise) {
this.traceOperation(context, "deregister");
if (!this.closingExceptionally) {
this.completeAllPendingRequestsExceptionally(context, ON_DEREGISTER);
} else {
logger.debug("{} deregistered exceptionally", context);
}
context.deregister(promise);
}
/**
* Called once a disconnect operation is made.
*
* @param context the {@link ChannelHandlerContext} for which the disconnect operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
*/
@Override
public void disconnect(final ChannelHandlerContext context, final ChannelPromise promise) {
this.traceOperation(context, "disconnect");
context.disconnect(promise);
}
/**
* Called once a flush operation is made
* <p>
* The flush operation will try to flush out all previous written messages that are pending.
*
* @param context the {@link ChannelHandlerContext} for which the flush operation is made
*/
@Override
public void flush(final ChannelHandlerContext context) {
this.traceOperation(context, "flush");
context.flush();
}
/**
* Intercepts {@link ChannelHandlerContext#read}
*
* @param context the {@link ChannelHandlerContext} for which the read operation is made
*/
@Override
public void read(final ChannelHandlerContext context) {
this.traceOperation(context, "read");
context.read();
}
/**
* Called once a write operation is made
* <p>
* The write operation will send messages through the {@link ChannelPipeline} which are then ready to be flushed
* to the actual {@link Channel}. This will occur when {@link Channel#flush} is called.
*
* @param context the {@link ChannelHandlerContext} for which the write operation is made
* @param message the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes
*/
@Override
public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise promise) {
this.traceOperation(context, "write", message);
if (message instanceof RntbdRequestRecord) {
final RntbdRequestRecord record = (RntbdRequestRecord) message;
this.timestamps.channelWriteAttempted();
record.setSendingRequestHasStarted();
context.write(this.addPendingRequestRecord(context, record), promise).addListener(completed -> {
record.stage(RntbdRequestRecord.Stage.SENT);
if (completed.isSuccess()) {
this.timestamps.channelWriteCompleted();
}
});
return;
}
if (message == RntbdHealthCheckRequest.MESSAGE) {
context.write(RntbdHealthCheckRequest.MESSAGE, promise).addListener(completed -> {
if (completed.isSuccess()) {
this.timestamps.channelPingCompleted();
}
});
return;
}
final IllegalStateException error = new IllegalStateException(lenientFormat("message of %s: %s",
message.getClass(),
message));
reportIssue(context, "", error);
this.exceptionCaught(context, error);
}
// endregion
// region Package private methods
int pendingRequestCount() {
return this.pendingRequests.size();
}
Optional<RntbdContext> rntbdContext() {
return Optional.of(this.contextFuture.getNow(null));
}
CompletableFuture<RntbdContextRequest> rntbdContextRequestFuture() {
return this.contextRequestFuture;
}
boolean hasRequestedRntbdContext() {
return this.contextRequestFuture.getNow(null) != null;
}
boolean hasRntbdContext() {
return this.contextFuture.getNow(null) != null;
}
RntbdChannelState getChannelState(final int demand) {
reportIssueUnless(this.hasRequestedRntbdContext(), this, "Direct TCP context request was not issued");
final int limit = this.hasRntbdContext() ? this.pendingRequestLimit : Math.min(this.pendingRequestLimit, demand);
if (this.pendingRequests.size() < limit) {
return RntbdChannelState.ok(this.pendingRequests.size());
}
if (this.hasRntbdContext()) {
return RntbdChannelState.pendingLimit(this.pendingRequests.size());
} else {
return RntbdChannelState.contextNegotiationPending((this.pendingRequests.size()));
}
}
void pendWrite(final ByteBuf out, final ChannelPromise promise) {
this.pendingWrites.add(out, promise);
}
Timestamps snapshotTimestamps() {
return new Timestamps(this.timestamps);
}
// endregion
// region Private methods
private RntbdRequestRecord addPendingRequestRecord(final ChannelHandlerContext context, final RntbdRequestRecord record) {
return this.pendingRequests.compute(record.transportRequestId(), (id, current) -> {
reportIssueUnless(current == null, context, "id: {}, current: {}, request: {}", record);
record.pendingRequestQueueSize(pendingRequests.size());
final Timeout pendingRequestTimeout = record.newTimeout(timeout -> {
// We don't wish to complete on the timeout thread, but rather on a thread doled out by our executor
requestExpirationExecutor.execute(record::expire);
});
record.whenComplete((response, error) -> {
this.pendingRequests.remove(id);
pendingRequestTimeout.cancel();
});
return record;
});
}
private void completeAllPendingRequestsExceptionally(
final ChannelHandlerContext context, final Throwable throwable
) {
reportIssueUnless(!this.closingExceptionally, context, "", throwable);
this.closingExceptionally = true;
if (this.pendingWrites != null && !this.pendingWrites.isEmpty()) {
// an expensive call that fires at least one exceptionCaught event
this.pendingWrites.releaseAndFailAll(context, throwable);
}
if (this.pendingRequests.isEmpty()) {
return;
}
if (!this.contextRequestFuture.isDone()) {
this.contextRequestFuture.completeExceptionally(throwable);
}
if (!this.contextFuture.isDone()) {
this.contextFuture.completeExceptionally(throwable);
}
final int count = this.pendingRequests.size();
Exception contextRequestException = null;
String phrase = null;
if (this.contextRequestFuture.isCompletedExceptionally()) {
try {
this.contextRequestFuture.get();
} catch (final CancellationException error) {
phrase = "RNTBD context request write cancelled";
contextRequestException = error;
} catch (final Exception error) {
phrase = "RNTBD context request write failed";
contextRequestException = error;
} catch (final Throwable error) {
phrase = "RNTBD context request write failed";
contextRequestException = new ChannelException(error);
}
} else if (this.contextFuture.isCompletedExceptionally()) {
try {
this.contextFuture.get();
} catch (final CancellationException error) {
phrase = "RNTBD context request read cancelled";
contextRequestException = error;
} catch (final Exception error) {
phrase = "RNTBD context request read failed";
contextRequestException = error;
} catch (final Throwable error) {
phrase = "RNTBD context request read failed";
contextRequestException = new ChannelException(error);
}
} else {
phrase = "closed exceptionally";
}
final String message = lenientFormat("%s %s with %s pending requests", context, phrase, count);
final Exception cause;
if (throwable instanceof ClosedChannelException) {
cause = contextRequestException == null
? (ClosedChannelException) throwable
: contextRequestException;
} else {
cause = throwable instanceof Exception
? (Exception) throwable
: new ChannelException(throwable);
}
for (RntbdRequestRecord record : this.pendingRequests.values()) {
final Map<String, String> requestHeaders = record.args().serviceRequest().getHeaders();
final String requestUri = record.args().physicalAddress().toString();
final GoneException error = new GoneException(message, cause, null, requestUri);
BridgeInternal.setRequestHeaders(error, requestHeaders);
record.completeExceptionally(error);
}
}
/**
* This method is called for each incoming message of type {@link RntbdResponse} to complete a request.
*
* @param context {@link ChannelHandlerContext} to which this {@link RntbdRequestManager request manager} belongs.
* @param response the {@link RntbdResponse message} received.
*/
private void messageReceived(final ChannelHandlerContext context, final RntbdResponse response) {
final Long transportRequestId = response.getTransportRequestId();
if (transportRequestId == null) {
reportIssue(context, "response ignored because its transportRequestId is missing: {}", response);
return;
}
final RntbdRequestRecord requestRecord = this.pendingRequests.get(transportRequestId);
if (requestRecord == null) {
logger.debug("response {} ignored because its requestRecord is missing: {}", transportRequestId, response);
return;
}
requestRecord.responseLength(response.getMessageLength());
requestRecord.stage(RntbdRequestRecord.Stage.RECEIVED);
final HttpResponseStatus status = response.getStatus();
final UUID activityId = response.getActivityId();
final int statusCode = status.code();
if ((HttpResponseStatus.OK.code() <= statusCode && statusCode < HttpResponseStatus.MULTIPLE_CHOICES.code()) ||
statusCode == HttpResponseStatus.NOT_MODIFIED.code()) {
final StoreResponse storeResponse = response.toStoreResponse(this.contextFuture.getNow(null));
requestRecord.complete(storeResponse);
} else {
// Map response to a CosmosException
final CosmosException cause;
// ..Fetch required header values
final long lsn = response.getHeader(RntbdResponseHeader.LSN);
final String partitionKeyRangeId = response.getHeader(RntbdResponseHeader.PartitionKeyRangeId);
// ..Create Error instance
final CosmosError error = response.hasPayload()
? new CosmosError(RntbdObjectMapper.readTree(response))
: new CosmosError(Integer.toString(statusCode), status.reasonPhrase(), status.codeClass().name());
// ..Map RNTBD response headers to HTTP response headers
final Map<String, String> responseHeaders = response.getHeaders().asMap(
this.rntbdContext().orElseThrow(IllegalStateException::new), activityId
);
// ..Create CosmosException based on status and sub-status codes
final String resourceAddress = requestRecord.args().physicalAddress() != null ?
requestRecord.args().physicalAddress().toString() : null;
switch (status.code()) {
case StatusCodes.BADREQUEST:
cause = new BadRequestException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.CONFLICT:
cause = new ConflictException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.FORBIDDEN:
cause = new ForbiddenException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.GONE:
final int subStatusCode = Math.toIntExact(response.getHeader(RntbdResponseHeader.SubStatus));
switch (subStatusCode) {
case SubStatusCodes.COMPLETING_SPLIT:
cause = new PartitionKeyRangeIsSplittingException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case SubStatusCodes.COMPLETING_PARTITION_MIGRATION:
cause = new PartitionIsMigratingException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case SubStatusCodes.NAME_CACHE_IS_STALE:
cause = new InvalidPartitionException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case SubStatusCodes.PARTITION_KEY_RANGE_GONE:
cause = new PartitionKeyRangeGoneException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
default:
GoneException goneExceptionFromService =
new GoneException(error, lsn, partitionKeyRangeId, responseHeaders);
goneExceptionFromService.setIsBasedOn410ResponseFromService();
cause = goneExceptionFromService;
break;
}
break;
case StatusCodes.INTERNAL_SERVER_ERROR:
cause = new InternalServerErrorException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.LOCKED:
cause = new LockedException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.METHOD_NOT_ALLOWED:
cause = new MethodNotAllowedException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.NOTFOUND:
cause = new NotFoundException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.PRECONDITION_FAILED:
cause = new PreconditionFailedException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.REQUEST_ENTITY_TOO_LARGE:
cause = new RequestEntityTooLargeException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.REQUEST_TIMEOUT:
Exception inner = new RequestTimeoutException(error, lsn, partitionKeyRangeId, responseHeaders);
cause = new GoneException(resourceAddress, error, lsn, partitionKeyRangeId, responseHeaders, inner);
break;
case StatusCodes.RETRY_WITH:
cause = new RetryWithException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.SERVICE_UNAVAILABLE:
cause = new ServiceUnavailableException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.TOO_MANY_REQUESTS:
cause = new RequestRateTooLargeException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
case StatusCodes.UNAUTHORIZED:
cause = new UnauthorizedException(error, lsn, partitionKeyRangeId, responseHeaders);
break;
default:
cause = BridgeInternal.createCosmosException(resourceAddress, status.code(), error, responseHeaders);
break;
}
BridgeInternal.setResourceAddress(cause, resourceAddress);
requestRecord.completeExceptionally(cause);
}
}
private void removeContextNegotiatorAndFlushPendingWrites(final ChannelHandlerContext context) {
final RntbdContextNegotiator negotiator = context.pipeline().get(RntbdContextNegotiator.class);
negotiator.removeInboundHandler();
negotiator.removeOutboundHandler();
if (!this.pendingWrites.isEmpty()) {
this.pendingWrites.writeAndRemoveAll(context);
context.flush();
}
}
private static void reportIssue(final Object subject, final String format, final Object... args) {
RntbdReporter.reportIssue(logger, subject, format, args);
}
private static void reportIssueUnless(
final boolean predicate, final Object subject, final String format, final Object... args
) {
RntbdReporter.reportIssueUnless(logger, predicate, subject, format, args);
}
private void traceOperation(final ChannelHandlerContext context, final String operationName, final Object... args) {
logger.trace("{}\n{}\n{}", operationName, context, args);
}
// endregion
// region Types
private static final class UnhealthyChannelException extends ChannelException {
static final UnhealthyChannelException INSTANCE = new UnhealthyChannelException();
private UnhealthyChannelException() {
super("health check failed");
}
@Override
public Throwable fillInStackTrace() {
return this;
}
}
// endregion
}