BlobBatchHelper.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.storage.blob.batch;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.implementation.models.ServicesSubmitBatchResponse;
import com.azure.storage.blob.models.BlobStorageException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/*
* This class contains helper methods for dealing with batch requests.
*/
class BlobBatchHelper {
/*
* Newline characters used in HTTP
*/
static final String HTTP_NEWLINE = "\r\n";
/*
* This pattern matches finding the "Content-Id" of the batch response.
*/
private static final Pattern CONTENT_ID_PATTERN = Pattern
.compile("Content-ID:\\s?(\\d+)", Pattern.CASE_INSENSITIVE);
/*
* This pattern matches finding the status code of the batch response.
*/
private static final Pattern STATUS_CODE_PATTERN = Pattern
.compile("HTTP\\/\\d\\.\\d\\s?(\\d+)\\s?\\w+", Pattern.CASE_INSENSITIVE);
/*
* This pattern matches finding the 'application/http' portion of the body.
*/
private static final Pattern APPLICATION_HTTP_PATTERN = Pattern
.compile("application\\/http", Pattern.CASE_INSENSITIVE);
// This method connects the batch response values to the individual batch operations based on their Content-Id
static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatchOperationInfo batchOperationInfo,
ServicesSubmitBatchResponse batchResponse, boolean throwOnAnyFailure, ClientLogger logger) {
/*
* Content-Type will contain the boundary for each batch response. The expected format is:
* "Content-Type: multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed"
*/
String contentType = batchResponse.getDeserializedHeaders().getContentType();
// Split on the boundary [ "multipart/mixed; boundary", "batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed"]
String[] boundaryPieces = contentType.split("=", 2);
if (boundaryPieces.length == 1) {
return Mono.error(logger
.logExceptionAsError(new IllegalStateException("Response doesn't contain a boundary.")));
}
String boundary = boundaryPieces[1];
return FluxUtil.collectBytesInByteBufferStream(batchResponse.getValue())
.flatMap(byteArrayBody -> Mono.fromRunnable(() -> {
String body = new String(byteArrayBody, StandardCharsets.UTF_8);
List<BlobStorageException> exceptions = new ArrayList<>();
String[] subResponses = body.split("--" + boundary);
if (subResponses.length == 3 && batchOperationInfo.getOperationCount() != 1) {
String[] exceptionSections = subResponses[1].split(HTTP_NEWLINE + HTTP_NEWLINE);
int statusCode = getStatusCode(exceptionSections[1], logger);
HttpHeaders headers = getHttpHeaders(exceptionSections[1]);
throw logger.logExceptionAsError(new BlobStorageException(headers.getValue("x-ms-error-code"),
createHttpResponse(batchResponse.getRequest(), statusCode, headers, body), body));
}
// Split the batch response body into batch operation responses.
for (String subResponse : subResponses) {
// This is a split value that isn't a response.
if (!APPLICATION_HTTP_PATTERN.matcher(subResponse).find()) {
continue;
}
// The batch operation response will be delimited by two new lines.
String[] subResponseSections = subResponse.split(HTTP_NEWLINE + HTTP_NEWLINE);
// The first section will contain batching metadata.
BlobBatchOperationResponse<?> batchOperationResponse =
getBatchOperation(batchOperationInfo, subResponseSections[0], logger);
// The second section will contain status code and header information.
batchOperationResponse.setStatusCode(getStatusCode(subResponseSections[1], logger));
batchOperationResponse.setHeaders(getHttpHeaders(subResponseSections[1]));
// The third section will contain the body.
if (subResponseSections.length > 2) {
// The body is optional and may not exist.
setBodyOrAddException(batchOperationResponse, subResponseSections[2], exceptions, logger);
}
}
if (throwOnAnyFailure && exceptions.size() != 0) {
throw logger.logExceptionAsError(new BlobBatchStorageException("Batch had operation failures.",
createHttpResponse(batchResponse), exceptions));
}
new SimpleResponse<>(batchResponse, null);
}));
}
private static BlobBatchOperationResponse<?> getBatchOperation(BlobBatchOperationInfo batchOperationInfo,
String responseBatchInfo, ClientLogger logger) {
Matcher contentIdMatcher = CONTENT_ID_PATTERN.matcher(responseBatchInfo);
int contentId;
if (contentIdMatcher.find() && contentIdMatcher.groupCount() >= 1) {
contentId = Integer.parseInt(contentIdMatcher.group(1));
} else {
throw logger.logExceptionAsError(
new IllegalStateException("Batch operation response doesn't contain a 'Content-Id' header."));
}
return batchOperationInfo.getBatchRequest(contentId).setResponseReceived();
}
private static int getStatusCode(String responseMetadata, ClientLogger logger) {
Matcher statusCodeMatcher = STATUS_CODE_PATTERN.matcher(responseMetadata);
if (statusCodeMatcher.find()) {
return Integer.parseInt(statusCodeMatcher.group(1));
} else {
throw logger.logExceptionAsError(new IllegalStateException("Unable to parse response status code."));
}
}
private static HttpHeaders getHttpHeaders(String responseMetadata) {
HttpHeaders headers = new HttpHeaders();
for (String line : responseMetadata.split(HTTP_NEWLINE)) {
if (CoreUtils.isNullOrEmpty(line) || (line.startsWith("HTTP") && !line.contains(":"))) {
continue;
}
String[] headerPieces = line.split(":\\s*", 2);
if (headerPieces.length == 1) {
headers.put(headerPieces[0], null);
} else {
headers.put(headerPieces[0], headerPieces[1]);
}
}
return headers;
}
private static void setBodyOrAddException(BlobBatchOperationResponse<?> batchOperationResponse,
String responseBody, List<BlobStorageException> exceptions, ClientLogger logger) {
/*
* Currently no batching operations will return a success body, they will only return a body on an exception.
* For now this will only construct the exception and throw if it should throw on an error.
*/
BlobStorageException exception = new BlobStorageException(responseBody,
batchOperationResponse.asHttpResponse(responseBody), responseBody);
logger.logExceptionAsError(exception);
batchOperationResponse.setException(exception);
exceptions.add(exception);
}
static HttpResponse createHttpResponse(HttpRequest request, int statusCode, HttpHeaders headers, String body) {
return new HttpResponse(request) {
@Override
public int getStatusCode() {
return statusCode;
}
@Override
public String getHeaderValue(String name) {
return headers.getValue(name);
}
@Override
public HttpHeaders getHeaders() {
return headers;
}
@Override
public Flux<ByteBuffer> getBody() {
return Flux.just(ByteBuffer.wrap(body.getBytes(StandardCharsets.UTF_8)));
}
@Override
public Mono<byte[]> getBodyAsByteArray() {
return Mono.just(body.getBytes(StandardCharsets.UTF_8));
}
@Override
public Mono<String> getBodyAsString() {
return Mono.just(body);
}
@Override
public Mono<String> getBodyAsString(Charset charset) {
return getBodyAsByteArray().map(body -> new String(body, charset));
}
};
}
private static HttpResponse createHttpResponse(ServicesSubmitBatchResponse response) {
return new HttpResponse(response.getRequest()) {
@Override
public int getStatusCode() {
return response.getStatusCode();
}
@Override
public String getHeaderValue(String name) {
return response.getHeaders().getValue(name);
}
@Override
public HttpHeaders getHeaders() {
return response.getHeaders();
}
@Override
public Flux<ByteBuffer> getBody() {
return response.getValue();
}
@Override
public Mono<byte[]> getBodyAsByteArray() {
return FluxUtil.collectBytesInByteBufferStream(getBody());
}
@Override
public Mono<String> getBodyAsString() {
return getBodyAsByteArray().map(body -> new String(body, StandardCharsets.UTF_8));
}
@Override
public Mono<String> getBodyAsString(Charset charset) {
return getBodyAsByteArray().map(body -> new String(body, charset));
}
};
}
}