JdkHttpResponse.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.http.jdk.httpclient;
import com.azure.core.http.HttpRequest;
import com.azure.core.util.FluxUtil;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Flow;
import static com.azure.core.http.jdk.httpclient.JdkAsyncHttpClient.fromJdkHttpHeaders;
final class JdkHttpResponse extends JdkHttpResponseBase {
private final Flux<ByteBuffer> contentFlux;
private volatile boolean disposed = false;
JdkHttpResponse(final HttpRequest request,
java.net.http.HttpResponse<Flow.Publisher<List<ByteBuffer>>> response) {
super(request, response.statusCode(), fromJdkHttpHeaders(response.headers()));
this.contentFlux = JdkFlowAdapter.flowPublisherToFlux(response.body())
.flatMapSequential(Flux::fromIterable);
}
@Override
public Flux<ByteBuffer> getBody() {
return this.contentFlux.doFinally(signalType -> disposed = true);
}
@Override
public Mono<byte[]> getBodyAsByteArray() {
return FluxUtil.collectBytesInByteBufferStream(getBody());
}
@Override
public void close() {
if (!this.disposed) {
this.disposed = true;
this.contentFlux
.subscribe()
.dispose();
}
}
}