NettyAsyncHttpResponse.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.http.netty.implementation;
import com.azure.core.http.HttpRequest;
import com.azure.core.util.CoreUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientResponse;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import static com.azure.core.http.netty.implementation.Utility.closeConnection;
import static com.azure.core.http.netty.implementation.Utility.deepCopyBuffer;
/**
* Default HTTP response for Reactor Netty.
*/
public final class NettyAsyncHttpResponse extends NettyAsyncHttpResponseBase {
private final Connection reactorNettyConnection;
private final boolean disableBufferCopy;
public NettyAsyncHttpResponse(HttpClientResponse reactorNettyResponse, Connection reactorNettyConnection,
HttpRequest httpRequest, boolean disableBufferCopy) {
super(reactorNettyResponse, httpRequest);
this.reactorNettyConnection = reactorNettyConnection;
this.disableBufferCopy = disableBufferCopy;
}
@Override
public Flux<ByteBuffer> getBody() {
return bodyIntern().doFinally(ignored -> close())
.map(byteBuf -> this.disableBufferCopy ? byteBuf.nioBuffer() : deepCopyBuffer(byteBuf));
}
@Override
public Mono<byte[]> getBodyAsByteArray() {
return bodyIntern().aggregate().asByteArray().doFinally(ignored -> close());
}
@Override
public Mono<String> getBodyAsString() {
return getBodyAsByteArray().map(bytes -> CoreUtils.bomAwareToString(bytes, getHeaderValue("Content-Type")));
}
@Override
public Mono<String> getBodyAsString(Charset charset) {
return bodyIntern().aggregate().asString(charset).doFinally(ignored -> close());
}
@Override
public Mono<InputStream> getBodyAsInputStream() {
return bodyIntern().aggregate().asInputStream();
}
@Override
public void close() {
closeConnection(reactorNettyConnection);
}
private ByteBufFlux bodyIntern() {
return reactorNettyConnection.inbound().receive();
}
// used for testing only
public Connection internConnection() {
return reactorNettyConnection;
}
}