CloudEventTracingPipelinePolicy.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventgrid.implementation;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.tracing.TracerProxy;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
import com.azure.core.models.CloudEvent;
/**
* This pipeline policy should be added after OpenTelemetryPolicy in the http pipeline.
*
* It checks whether the {@link HttpRequest} headers have "traceparent" or "tracestate" and whether the serialized
* http body json string for a list of {@link CloudEvent} instances has place holders
* {@link Constants#TRACE_PARENT_PLACEHOLDER} or {@link Constants#TRACE_STATE_PLACEHOLDER}.
* The place holders will be replaced by the value from headers if the headers have "traceparent" or "tracestate",
* or be removed if the headers don't have.
*
* The place holders won't exist in the json string if the {@link TracerProxy#isTracingEnabled()} returns false.
*/
public final class CloudEventTracingPipelinePolicy implements HttpPipelinePolicy {
@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
final HttpRequest request = context.getHttpRequest();
final HttpHeader contentType = request.getHeaders().get(Constants.CONTENT_TYPE);
StringBuilder bodyStringBuilder = new StringBuilder();
if (TracerProxy.isTracingEnabled() && contentType != null
&& Constants.CLOUD_EVENT_CONTENT_TYPE.equals(contentType.getValue())) {
return request.getBody().map(byteBuffer -> bodyStringBuilder.append(new String(byteBuffer.array(),
StandardCharsets.UTF_8)))
.then(Mono.fromCallable(() -> replaceTracingPlaceHolder(request, bodyStringBuilder)))
.then(next.process());
} else {
return next.process();
}
}
/**
*
* @param request The {@link HttpRequest}, whose body will be mutated by replacing traceparent and tracestate
* placeholders.
* @param bodyStringBuilder The {@link StringBuilder} that contains the full HttpRequest body string.
* @return The new body string with the place holders replaced (if header has tracing)
* or removed (if header no tracing).
*/
static String replaceTracingPlaceHolder(HttpRequest request, StringBuilder bodyStringBuilder) {
final int traceParentPlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_PARENT_PLACEHOLDER);
if (traceParentPlaceHolderIndex >= 0) { // There is "traceparent" placeholder in body, replace it.
final HttpHeader traceparentHeader = request.getHeaders().get(Constants.TRACE_PARENT);
bodyStringBuilder.replace(traceParentPlaceHolderIndex,
Constants.TRACE_PARENT_PLACEHOLDER.length() + traceParentPlaceHolderIndex,
traceparentHeader != null
? String.format(",\"%s\":\"%s\"", Constants.TRACE_PARENT, traceparentHeader.getValue())
: "");
}
final int traceStatePlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_STATE_PLACEHOLDER);
if (traceStatePlaceHolderIndex >= 0) { // There is "tracestate" placeholder in body, replace it.
final HttpHeader tracestateHeader = request.getHeaders().get(Constants.TRACE_STATE);
bodyStringBuilder.replace(traceStatePlaceHolderIndex,
Constants.TRACE_STATE_PLACEHOLDER.length() + traceStatePlaceHolderIndex,
tracestateHeader != null
? String.format(",\"%s\":\"%s\"", Constants.TRACE_STATE, tracestateHeader.getValue())
: "");
}
String newBodyString = bodyStringBuilder.toString();
request.setHeader(Constants.CONTENT_LENGTH, String.valueOf(newBodyString.length()));
request.setBody(newBodyString);
return newBodyString;
}
}