TracerProvider.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;
import com.azure.core.util.Context;
import com.azure.core.util.tracing.Tracer;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.TransactionalBatchResponse;
import com.azure.cosmos.implementation.clientTelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clientTelemetry.ReportPayload;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.HdrHistogram.ConcurrentDoubleHistogram;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
public class TracerProvider {
private Tracer tracer;
public final static String DB_TYPE_VALUE = "Cosmos";
public final static String DB_TYPE = "db.type";
public final static String DB_INSTANCE = "db.instance";
public final static String DB_URL = "db.url";
public static final String DB_STATEMENT = "db.statement";
public static final String ERROR_MSG = "error.msg";
public static final String ERROR_TYPE = "error.type";
public static final String COSMOS_CALL_DEPTH = "cosmosCallDepth";
public static final String COSMOS_CALL_DEPTH_VAL = "nested";
public static final int ERROR_CODE = 0;
public static final String RESOURCE_PROVIDER_NAME = "Microsoft.DocumentDB";
public TracerProvider(Tracer tracer) {
this.tracer = tracer;
}
public boolean isEnabled() {
return tracer != null;
}
/**
* For each tracer plugged into the SDK a new tracing span is created.
* <p>
* The {@code context} will be checked for containing information about a parent span. If a parent span is found the
* new span will be added as a child, otherwise the span will be created and added to the context and any downstream
* start calls will use the created span as the parent.
*
* @param context Additional metadata that is passed through the call stack.
* @return An updated context object.
*/
public Context startSpan(String methodName, String databaseId, String endpoint, Context context) {
Context local = Objects.requireNonNull(context, "'context' cannot be null.");
local = local.addData(AZ_TRACING_NAMESPACE_KEY, RESOURCE_PROVIDER_NAME);
local = tracer.start(methodName, local); // start the span and return the started span
if (databaseId != null) {
tracer.setAttribute(TracerProvider.DB_INSTANCE, databaseId, local);
}
tracer.setAttribute(TracerProvider.DB_TYPE, DB_TYPE_VALUE, local);
tracer.setAttribute(TracerProvider.DB_URL, endpoint, local);
tracer.setAttribute(TracerProvider.DB_STATEMENT, methodName, local);
return local;
}
/**
* Given a context containing the current tracing span the span is marked completed with status info from
* {@link Signal}. For each tracer plugged into the SDK the current tracing span is marked as completed.
*
* @param context Additional metadata that is passed through the call stack.
* @param signal The signal indicates the status and contains the metadata we need to end the tracing span.
*/
public <T extends CosmosResponse<? extends Resource>> void endSpan(Context context,
Signal<T> signal,
int statusCode) {
Objects.requireNonNull(context, "'context' cannot be null.");
Objects.requireNonNull(signal, "'signal' cannot be null.");
switch (signal.getType()) {
case ON_COMPLETE:
end(statusCode, null, context);
break;
case ON_ERROR:
Throwable throwable = null;
if (signal.hasError()) {
// The last status available is on error, this contains the thrown error.
throwable = signal.getThrowable();
if (throwable instanceof CosmosException) {
CosmosException exception = (CosmosException) throwable;
statusCode = exception.getStatusCode();
}
}
end(statusCode, throwable, context);
break;
default:
// ON_SUBSCRIBE and ON_NEXT don't have the information to end the span so just return.
break;
}
}
public <T extends CosmosResponse<?>> Mono<T> traceEnabledCosmosResponsePublisher(Mono<T> resultPublisher,
Context context,
String spanName,
String databaseId,
String endpoint) {
return traceEnabledPublisher(resultPublisher, context, spanName, databaseId, endpoint,
(T response) -> response.getStatusCode());
}
public Mono<TransactionalBatchResponse> traceEnabledBatchResponsePublisher(Mono<TransactionalBatchResponse> resultPublisher,
Context context,
String spanName,
String databaseId,
String endpoint) {
return traceEnabledPublisher(resultPublisher, context, spanName, databaseId, endpoint,
TransactionalBatchResponse::getStatusCode);
}
public <T> Mono<CosmosItemResponse<T>> traceEnabledCosmosItemResponsePublisher(Mono<CosmosItemResponse<T>> resultPublisher,
Context context,
String spanName,
String containerId,
String databaseId,
CosmosAsyncClient client,
ConsistencyLevel consistencyLevel,
OperationType operationType,
ResourceType resourceType) {
return publisherWithClientTelemetry(resultPublisher, context, spanName, containerId, databaseId,
BridgeInternal.getServiceEndpoint(client),
client,
consistencyLevel,
operationType,
resourceType,
CosmosItemResponse::getStatusCode);
}
private <T> Mono<T> traceEnabledPublisher(Mono<T> resultPublisher,
Context context,
String spanName,
String databaseId,
String endpoint,
Function<T, Integer> statusCodeFunc) {
final AtomicReference<Context> parentContext = new AtomicReference<>(Context.NONE);
Optional<Object> callDepth = context.getData(COSMOS_CALL_DEPTH);
final boolean isNestedCall = callDepth.isPresent();
return resultPublisher
.doOnSubscribe(ignoredValue -> {
if (isEnabled() && !isNestedCall) {
parentContext.set(this.startSpan(spanName, databaseId, endpoint,
context));
}
}).doOnSuccess(response -> {
if (isEnabled() && !isNestedCall) {
this.endSpan(parentContext.get(), Signal.complete(), statusCodeFunc.apply(response));
}
}).doOnError(throwable -> {
if (isEnabled() && !isNestedCall) {
this.endSpan(parentContext.get(), Signal.error(throwable), ERROR_CODE);
}
});
}
private <T> Mono<T> publisherWithClientTelemetry(Mono<T> resultPublisher,
Context context,
String spanName,
String containerId,
String databaseId,
String endpoint,
CosmosAsyncClient client,
ConsistencyLevel consistencyLevel,
OperationType operationType,
ResourceType resourceType,
Function<T, Integer> statusCodeFunc) {
Mono<T> tracerMono = traceEnabledPublisher(resultPublisher, context, spanName, databaseId, endpoint, statusCodeFunc);
return tracerMono
.doOnSuccess(response -> {
if (Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(client)) && response instanceof CosmosItemResponse) {
@SuppressWarnings("unchecked")
CosmosItemResponse<T> itemResponse = (CosmosItemResponse<T>) response;
fillClientTelemetry(client, itemResponse.getDiagnostics(), itemResponse.getStatusCode(),
ModelBridgeInternal.getPayloadLength(itemResponse), containerId,
databaseId, operationType, resourceType, consistencyLevel,
(float) itemResponse.getRequestCharge());
}
}).doOnError(throwable -> {
if (Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(client)) && throwable instanceof CosmosException) {
CosmosException cosmosException = (CosmosException) throwable;
fillClientTelemetry(client, cosmosException.getDiagnostics(), cosmosException.getStatusCode(),
null, containerId,
databaseId, operationType, resourceType, consistencyLevel,
(float) cosmosException.getRequestCharge());
}
});
}
private void end(int statusCode, Throwable throwable, Context context) {
if (throwable != null) {
tracer.setAttribute(TracerProvider.ERROR_MSG, throwable.getMessage(), context);
tracer.setAttribute(TracerProvider.ERROR_TYPE, throwable.getClass().getName(), context);
}
tracer.end(statusCode, throwable, context);
}
public void fillClientTelemetry(CosmosAsyncClient cosmosAsyncClient,
CosmosDiagnostics cosmosDiagnostics,
int statusCode,
Integer objectSize,
String containerId,
String databaseId,
OperationType operationType,
ResourceType resourceType,
ConsistencyLevel consistencyLevel,
float requestCharge) {
ClientTelemetry telemetry = BridgeInternal.getContextClient(cosmosAsyncClient).getClientTelemetry();
ReportPayload reportPayloadLatency = createReportPayload(cosmosAsyncClient, cosmosDiagnostics,
statusCode, objectSize, containerId, databaseId
, operationType, resourceType, consistencyLevel, ClientTelemetry.REQUEST_LATENCY_NAME,
ClientTelemetry.REQUEST_LATENCY_UNIT);
ConcurrentDoubleHistogram latencyHistogram = telemetry.getClientTelemetryInfo().getOperationInfoMap().get(reportPayloadLatency);
if (latencyHistogram != null) {
ClientTelemetry.recordValue(latencyHistogram, cosmosDiagnostics.getDuration().toNanos()/1000);
} else {
if (statusCode >= HttpConstants.StatusCodes.MINIMUM_SUCCESS_STATUSCODE && statusCode <= HttpConstants.StatusCodes.MAXIMUM_SUCCESS_STATUSCODE) {
latencyHistogram = new ConcurrentDoubleHistogram(ClientTelemetry.REQUEST_LATENCY_MAX_MICRO_SEC, ClientTelemetry.REQUEST_LATENCY_SUCCESS_PRECISION);
} else {
latencyHistogram = new ConcurrentDoubleHistogram(ClientTelemetry.REQUEST_LATENCY_MAX_MICRO_SEC, ClientTelemetry.REQUEST_LATENCY_FAILURE_PRECISION);
}
latencyHistogram.setAutoResize(true);
ClientTelemetry.recordValue(latencyHistogram, cosmosDiagnostics.getDuration().toNanos()/1000);
telemetry.getClientTelemetryInfo().getOperationInfoMap().put(reportPayloadLatency, latencyHistogram);
}
ReportPayload reportPayloadRequestCharge = createReportPayload(cosmosAsyncClient, cosmosDiagnostics,
statusCode, objectSize, containerId, databaseId
, operationType, resourceType, consistencyLevel, ClientTelemetry.REQUEST_CHARGE_NAME, ClientTelemetry.REQUEST_CHARGE_UNIT);
ConcurrentDoubleHistogram requestChargeHistogram = telemetry.getClientTelemetryInfo().getOperationInfoMap().get(reportPayloadRequestCharge);
if (requestChargeHistogram != null) {
ClientTelemetry.recordValue(requestChargeHistogram, requestCharge);
} else {
requestChargeHistogram = new ConcurrentDoubleHistogram(ClientTelemetry.REQUEST_CHARGE_MAX, ClientTelemetry.REQUEST_CHARGE_PRECISION);
requestChargeHistogram.setAutoResize(true);
ClientTelemetry.recordValue(requestChargeHistogram, requestCharge);
telemetry.getClientTelemetryInfo().getOperationInfoMap().put(reportPayloadRequestCharge,
requestChargeHistogram);
}
}
private ReportPayload createReportPayload(CosmosAsyncClient cosmosAsyncClient,
CosmosDiagnostics cosmosDiagnostics,
int statusCode,
Integer objectSize,
String containerId,
String databaseId,
OperationType operationType,
ResourceType resourceType,
ConsistencyLevel consistencyLevel,
String metricsName,
String unitName) {
ReportPayload reportPayload = new ReportPayload(metricsName, unitName);
reportPayload.setRegionsContacted(BridgeInternal.getRegionsContacted(cosmosDiagnostics).toString());
reportPayload.setConsistency(consistencyLevel == null ?
BridgeInternal.getContextClient(cosmosAsyncClient).getConsistencyLevel() :
consistencyLevel);
if (objectSize != null) {
reportPayload.setGreaterThan1Kb(objectSize > ClientTelemetry.ONE_KB_TO_BYTES);
}
reportPayload.setDatabaseName(databaseId);
reportPayload.setContainerName(containerId);
reportPayload.setOperation(operationType);
reportPayload.setResource(resourceType);
reportPayload.setStatusCode(statusCode);
return reportPayload;
}
}