BatchResponseParser.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.batch;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.models.CosmosBatchOperationResult;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkState;
public final class BatchResponseParser {
private final static Logger logger = LoggerFactory.getLogger(BatchResponseParser.class);
private final static char HYBRID_V1 = 129;
/** Creates a transactional batch response from a documentServiceResponse.
*
* @param documentServiceResponse the {@link RxDocumentServiceResponse response message}.
* @param request the {@link ServerBatchRequest batch request} that produced {@code message}.
* @param shouldPromoteOperationStatus indicates whether the operation status should be promoted.
*
* @return the {@link CosmosBatchResponse cosmos batch response} created
* from {@link RxDocumentServiceResponse message} when the batch operation completes.
*/
public static CosmosBatchResponse fromDocumentServiceResponse(
final RxDocumentServiceResponse documentServiceResponse,
final ServerBatchRequest request,
final boolean shouldPromoteOperationStatus) {
CosmosBatchResponse response = null;
final byte[] responseContent = documentServiceResponse.getResponseBodyAsByteArray();
if (responseContent != null && responseContent.length > 0) {
response = BatchResponseParser.populateFromResponseContent(documentServiceResponse, request, shouldPromoteOperationStatus);
if (response == null) {
// Convert any payload read failures as InternalServerError
response = ModelBridgeInternal.createCosmosBatchResponse(
HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
HttpConstants.SubStatusCodes.UNKNOWN,
"ServerResponseDeserializationFailure",
documentServiceResponse.getResponseHeaders(),
documentServiceResponse.getCosmosDiagnostics());
}
}
int responseStatusCode = documentServiceResponse.getStatusCode();
int responseSubStatusCode = BatchExecUtils.getSubStatusCode(documentServiceResponse.getResponseHeaders());
if (response == null) {
response = ModelBridgeInternal.createCosmosBatchResponse(
responseStatusCode,
responseSubStatusCode,
null,
documentServiceResponse.getResponseHeaders(),
documentServiceResponse.getCosmosDiagnostics());
}
if (response.size() != request.getOperations().size()) {
if (responseStatusCode >= 200 && responseStatusCode <= 299) {
// Server should be guaranteeing number of results equal to operations when
// batch request is successful - so fail as InternalServerError if this is not the case.
response = ModelBridgeInternal.createCosmosBatchResponse(
HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
HttpConstants.SubStatusCodes.UNKNOWN,
"Invalid server response",
documentServiceResponse.getResponseHeaders(),
documentServiceResponse.getCosmosDiagnostics());
}
// When the overall response status code is TooManyRequests, propagate the RetryAfter into the individual operations.
Duration retryAfterDuration = Duration.ZERO;
if (responseStatusCode == HttpResponseStatus.TOO_MANY_REQUESTS.code()) {
retryAfterDuration = BatchExecUtils.getRetryAfterDuration(documentServiceResponse.getResponseHeaders());
}
BatchResponseParser.createAndPopulateResults(response, request.getOperations(), retryAfterDuration);
}
checkState(response.size() == request.getOperations().size(),
"Number of responses should be equal to number of operations in request.");
return response;
}
private static CosmosBatchResponse populateFromResponseContent(
final RxDocumentServiceResponse documentServiceResponse,
final ServerBatchRequest request,
final boolean shouldPromoteOperationStatus) {
final List<CosmosBatchOperationResult> results = new ArrayList<>(request.getOperations().size());
final byte[] responseContent = documentServiceResponse.getResponseBodyAsByteArray();
if (responseContent[0] != (byte)HYBRID_V1) {
// Read from a json response body. To enable hybrid row just complete the else part
final ObjectMapper mapper = Utils.getSimpleObjectMapper();
try {
final List<CosmosItemOperation> cosmosItemOperations = request.getOperations();
final ObjectNode[] objectNodes = mapper.readValue(responseContent, ObjectNode[].class);
for (int index = 0; index < objectNodes.length; index++) {
ObjectNode objectInArray = objectNodes[index];
results.add(
BatchResponseParser.createBatchOperationResultFromJson(objectInArray, cosmosItemOperations.get(index)));
}
} catch (IOException ex) {
logger.error("Exception in parsing response", ex);
}
} else {
// TODO(rakkuma): Implement hybrid row response parsing logic here.
// Issue: https://github.com/Azure/azure-sdk-for-java/issues/15856
logger.error("Hybrid row is not implemented right now");
return null;
}
int responseStatusCode = documentServiceResponse.getStatusCode();
int responseSubStatusCode = BatchExecUtils.getSubStatusCode(documentServiceResponse.getResponseHeaders());
// Status code of the exact operation which failed.
if (responseStatusCode == HttpResponseStatus.MULTI_STATUS.code() && shouldPromoteOperationStatus) {
for (CosmosBatchOperationResult result : results) {
if (result.getStatusCode() != HttpResponseStatus.FAILED_DEPENDENCY.code() &&
result.getStatusCode() >= 400) {
responseStatusCode = result.getStatusCode();
responseSubStatusCode = result.getSubStatusCode();
break;
}
}
}
final CosmosBatchResponse response = ModelBridgeInternal.createCosmosBatchResponse(
responseStatusCode,
responseSubStatusCode,
null,
documentServiceResponse.getResponseHeaders(),
documentServiceResponse.getCosmosDiagnostics());
ModelBridgeInternal.addCosmosBatchResultInResponse(response, results);
assert (response.getResults().size() == request.getOperations().size());
return response;
}
/**
* Read batch operation result result.
*
* TODO(rakkuma): Similarly hybrid row result needs to be parsed.
* Issue: https://github.com/Azure/azure-sdk-for-java/issues/15856
*
* @param objectNode having response for a single operation.
*
* @return the result
*/
private static CosmosBatchOperationResult createBatchOperationResultFromJson(
ObjectNode objectNode,
CosmosItemOperation cosmosItemOperation) {
final JsonSerializable jsonSerializable = new JsonSerializable(objectNode);
final int statusCode = jsonSerializable.getInt(BatchRequestResponseConstants.FIELD_STATUS_CODE);
Integer subStatusCode = jsonSerializable.getInt(BatchRequestResponseConstants.FIELD_SUBSTATUS_CODE);
if (subStatusCode == null) {
subStatusCode = HttpConstants.SubStatusCodes.UNKNOWN;
}
Double requestCharge = jsonSerializable.getDouble(BatchRequestResponseConstants.FIELD_REQUEST_CHARGE);
if (requestCharge == null) {
requestCharge = (double) 0;
}
final String eTag = jsonSerializable.getString(BatchRequestResponseConstants.FIELD_ETAG);
final ObjectNode resourceBody = jsonSerializable.getObject(BatchRequestResponseConstants.FIELD_RESOURCE_BODY);
final Integer retryAfterMilliseconds = jsonSerializable.getInt(BatchRequestResponseConstants.FIELD_RETRY_AFTER_MILLISECONDS);
return ModelBridgeInternal.createCosmosBatchResult(
eTag,
requestCharge,
resourceBody,
statusCode,
retryAfterMilliseconds != null ? Duration.ofMillis(retryAfterMilliseconds) : Duration.ZERO,
subStatusCode,
cosmosItemOperation);
}
/**
* Populate results to match number of operations to number of results in case of any error.
*
* @param response The transactionalBatchResponse in which to add the results
* @param operations List of operations for which the wrapper TransactionalBatchResponse is returned.
* @param retryAfterDuration retryAfterDuration.
* */
private static void createAndPopulateResults(final CosmosBatchResponse response,
final List<CosmosItemOperation> operations,
final Duration retryAfterDuration) {
final List<CosmosBatchOperationResult> results = new ArrayList<>(operations.size());
for (CosmosItemOperation cosmosItemOperation : operations) {
results.add(
ModelBridgeInternal.createCosmosBatchResult(
null,
response.getRequestCharge(),
null,
response.getStatusCode(),
retryAfterDuration,
response.getSubStatusCode(),
cosmosItemOperation
));
}
ModelBridgeInternal.addCosmosBatchResultInResponse(response, results);
}
}