OkHttpAsyncHttpClient.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.http.okhttp;
import com.azure.core.http.HttpClient;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.util.Context;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okio.ByteString;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
/**
* HttpClient implementation for OkHttp.
*/
class OkHttpAsyncHttpClient implements HttpClient {
private final OkHttpClient httpClient;
//
private static final Mono<okio.ByteString> EMPTY_BYTE_STRING_MONO = Mono.just(okio.ByteString.EMPTY);
OkHttpAsyncHttpClient(OkHttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public Mono<HttpResponse> send(HttpRequest request) {
return send(request, Context.NONE);
}
@Override
public Mono<HttpResponse> send(HttpRequest request, Context context) {
boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
return Mono.create(sink -> sink.onRequest(value -> {
// Using MonoSink::onRequest for back pressure support.
// The blocking behavior toOkHttpRequest(r).subscribe call:
//
// The okhttp3.Request emitted by toOkHttpRequest(r) is chained from the body of request Flux<ByteBuffer>:
// 1. If Flux<ByteBuffer> synchronous and send(r) caller does not apply subscribeOn then
// subscribe block on caller thread.
// 2. If Flux<ByteBuffer> synchronous and send(r) caller apply subscribeOn then
// does not block caller thread but block on scheduler thread.
// 3. If Flux<ByteBuffer> asynchronous then subscribe does not block caller thread
// but block on the thread backing flux. This ignore any subscribeOn applied to send(r)
//
toOkHttpRequest(request).subscribe(okHttpRequest -> {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
sink.onCancel(call::cancel);
}, sink::error);
}));
}
/**
* Converts the given azure-core request to okhttp request.
*
* @param request the azure-core request
* @return the Mono emitting okhttp request
*/
private static Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
return Mono.just(new okhttp3.Request.Builder())
.map(rb -> {
rb.url(request.getUrl());
if (request.getHeaders() != null) {
Map<String, String> headers = new HashMap<>();
for (HttpHeader hdr : request.getHeaders()) {
if (hdr.getValue() != null) {
headers.put(hdr.getName(), hdr.getValue());
}
}
return rb.headers(okhttp3.Headers.of(headers));
} else {
return rb.headers(okhttp3.Headers.of(new HashMap<>()));
}
})
.flatMap((Function<Request.Builder, Mono<Request.Builder>>) rb -> {
if (request.getHttpMethod() == HttpMethod.GET) {
return Mono.just(rb.get());
} else if (request.getHttpMethod() == HttpMethod.HEAD) {
return Mono.just(rb.head());
} else {
return toOkHttpRequestBody(request.getBody(), request.getHeaders())
.map(requestBody -> rb.method(request.getHttpMethod().toString(), requestBody));
}
})
.map(Request.Builder::build);
}
/**
* Create a Mono of okhttp3.RequestBody from the given java.nio.ByteBuffer Flux.
*
* @param bbFlux stream of java.nio.ByteBuffer representing request content
* @param headers the headers associated with the original request
* @return the Mono emitting okhttp3.RequestBody
*/
private static Mono<RequestBody> toOkHttpRequestBody(Flux<ByteBuffer> bbFlux, HttpHeaders headers) {
Mono<okio.ByteString> bsMono = bbFlux == null
? EMPTY_BYTE_STRING_MONO
: toByteString(bbFlux);
return bsMono.map(bs -> {
String contentType = headers.getValue("Content-Type");
if (contentType == null) {
return RequestBody.create(bs, null);
} else {
return RequestBody.create(bs, MediaType.parse(contentType));
}
});
}
/**
* Aggregate Flux of java.nio.ByteBuffer to single okio.ByteString.
*
* Pooled okio.Buffer type is used to buffer emitted ByteBuffer instances. Content of each ByteBuffer will be
* written (i.e copied) to the internal okio.Buffer slots. Once the stream terminates, the contents of all slots get
* copied to one single byte array and okio.ByteString will be created referring this byte array. Finally the
* initial okio.Buffer will be returned to the pool.
*
* @param bbFlux the Flux of ByteBuffer to aggregate
* @return a mono emitting aggregated ByteString
*/
private static Mono<ByteString> toByteString(Flux<ByteBuffer> bbFlux) {
Objects.requireNonNull(bbFlux, "'bbFlux' cannot be null.");
return Mono.using(okio.Buffer::new,
buffer -> bbFlux.reduce(buffer, (b, byteBuffer) -> {
try {
b.write(byteBuffer);
return b;
} catch (IOException ioe) {
throw Exceptions.propagate(ioe);
}
})
.map(b -> ByteString.of(b.readByteArray())),
okio.Buffer::clear)
.switchIfEmpty(EMPTY_BYTE_STRING_MONO);
}
private static class OkHttpCallback implements okhttp3.Callback {
private final MonoSink<HttpResponse> sink;
private final HttpRequest request;
private final boolean eagerlyReadResponse;
OkHttpCallback(MonoSink<HttpResponse> sink, HttpRequest request, boolean eagerlyReadResponse) {
this.sink = sink;
this.request = request;
this.eagerlyReadResponse = eagerlyReadResponse;
}
@Override
public void onFailure(okhttp3.Call call, IOException e) {
sink.error(e);
}
@Override
public void onResponse(okhttp3.Call call, okhttp3.Response response) {
/*
* Use a buffered response when we are eagerly reading the response from the network and the body isn't
* empty.
*/
if (eagerlyReadResponse) {
ResponseBody body = response.body();
if (Objects.nonNull(body)) {
try {
byte[] bytes = body.bytes();
body.close();
sink.success(new BufferedOkHttpResponse(response, request, bytes));
} catch (IOException ex) {
// Reading the body bytes may cause an IOException, if it happens propagate it.
sink.error(ex);
}
} else {
// Body is null, use the non-buffering response.
sink.success(new OkHttpResponse(response, request));
}
} else {
sink.success(new OkHttpResponse(response, request));
}
}
}
}