TracerProvider.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.core.util.Context;
import com.azure.core.util.tracing.SpanKind;
import com.azure.core.util.tracing.StartSpanOptions;
import com.azure.core.util.tracing.Tracer;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ReportPayload;
import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.HdrHistogram.ConcurrentDoubleHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.util.context.ContextView;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;

public class TracerProvider {
    private Tracer tracer;
    private static final Logger LOGGER = LoggerFactory.getLogger(TracerProvider.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private final static String JSON_STRING = "JSON";
    public final static String DB_TYPE_VALUE = "Cosmos";
    public final static String DB_TYPE = "db.type";
    public final static String DB_INSTANCE = "db.instance";
    public final static String DB_URL = "db.url";
    public static final String DB_STATEMENT = "db.statement";
    public static final String COSMOS_CALL_DEPTH = "cosmosCallDepth";
    public static final String COSMOS_CALL_DEPTH_VAL = "nested";
    public static final int ERROR_CODE = 0;
    public static final String RESOURCE_PROVIDER_NAME = "Microsoft.DocumentDB";
    public final Duration CRUD_THRESHOLD_FOR_DIAGNOSTICS = Duration.ofMillis(100);
    public final Duration QUERY_THRESHOLD_FOR_DIAGNOSTICS = Duration.ofMillis(500);

    private static final String REACTOR_TRACING_CONTEXT_KEY = "tracing-context";
    private static final Object DUMMY_VALUE = new Object();
    private final Mono<Object> propagatingMono;
    private final Flux<Object> propagatingFlux;
    public TracerProvider(Tracer tracer) {
        this.tracer = tracer;
        this.propagatingMono = new PropagatingMono();
        this.propagatingFlux = new PropagatingFlux();
    }

    public boolean isEnabled() {
        return tracer != null;
    }

    /**
     * Gets {@link Context} from Reactor {@link ContextView}.
     *
     * @param reactorContext Reactor context instance.
     * @return {@link Context} from reactor context or null if not present.
     */
    public static Context getContextFromReactorOrNull(ContextView reactorContext) {
        Object context = reactorContext.getOrDefault(REACTOR_TRACING_CONTEXT_KEY, null);

        if (context != null && context instanceof Context) {
            return (Context) context;
        }

        return null;
    }

    /**
     * Stores {@link Context} in Reactor {@link reactor.util.context.Context}.
     *
     * @param traceContext {@link Context} context with trace context to store.
     * @return {@link reactor.util.context.Context} Reactor context with trace context.
     */
    public static reactor.util.context.Context setContextInReactor(Context traceContext) {
        return reactor.util.context.Context.of(REACTOR_TRACING_CONTEXT_KEY, traceContext);
    }

    /**
     * For each tracer plugged into the SDK a new tracing span is created.
     * <p>
     * The {@code context} will be checked for containing information about a parent span. If a parent span is found the
     * new span will be added as a child, otherwise the span will be created and added to the context and any downstream
     * start calls will use the created span as the parent.
     *
     * @param context Additional metadata that is passed through the call stack.
     * @return An updated context object.
     */
    public Context startSpan(String methodName, String databaseId, String endpoint, Context context) {
        Context local = Objects.requireNonNull(context, "'context' cannot be null.");

        StartSpanOptions spanOptions = new StartSpanOptions(SpanKind.CLIENT)
            .setAttribute(AZ_TRACING_NAMESPACE_KEY, RESOURCE_PROVIDER_NAME)
            .setAttribute(DB_TYPE, DB_TYPE_VALUE)
            .setAttribute(TracerProvider.DB_URL, endpoint)
            .setAttribute(TracerProvider.DB_STATEMENT, methodName);
        if (databaseId != null) {
            spanOptions.setAttribute(TracerProvider.DB_INSTANCE, databaseId);
        }

        // start the span and return the started span
        return tracer.start(methodName, spanOptions, local);
    }

    /**
     * Adds an event to the current span with the provided {@code timestamp} and {@code attributes}.
     * <p>This API does not provide any normalization if provided timestamps are out of range of the current
     * span timeline</p>
     * <p>Supported attribute values include String, double, boolean, long, String [], double [], long [].
     * Any other Object value type and null values will be silently ignored.</p>
     *
     * @param name the name of the event.
     * @param attributes the additional attributes to be set for the event.
     * @param timestamp The instant, in UTC, at which the event will be associated to the span.
     * @param context the call metadata containing information of the span to which the event should be associated with.
     * @throws NullPointerException if {@code eventName} is {@code null}.
     */
    public void addEvent(String name, Map<String, Object> attributes, OffsetDateTime timestamp, Context context) {
        tracer.addEvent(name, attributes, timestamp, context);
    }

    /**
     * Given a context containing the current tracing span the span is marked completed with status info from
     * {@link Signal}.  For each tracer plugged into the SDK the current tracing span is marked as completed.
     *
     * @param signal  The signal indicates the status and contains the metadata we need to end the tracing span.
     */
    public <T> void endSpan(Signal<T> signal, int statusCode) {
        Objects.requireNonNull(signal, "'signal' cannot be null.");

        Context context = getContextFromReactorOrNull(signal.getContextView());
        if (context == null) {
            return;
        }

        switch (signal.getType()) {
            case ON_COMPLETE:
            case ON_NEXT:
                end(statusCode, null, context);
                break;
            case ON_ERROR:
                Throwable throwable = null;
                if (signal.hasError()) {
                    // The last status available is on error, this contains the thrown error.
                    throwable = signal.getThrowable();

                    if (throwable instanceof CosmosException) {
                        CosmosException exception = (CosmosException) throwable;
                        statusCode = exception.getStatusCode();
                    }
                }
                end(statusCode, throwable, context);
                break;
            default:
                // ON_SUBSCRIBE isn't the right state to end span
                break;
        }
    }

    public <T extends CosmosResponse<?>> Mono<T> traceEnabledCosmosResponsePublisher(Mono<T> resultPublisher,
                                                                                     Context context,
                                                                                     String spanName,
                                                                                     String databaseId,
                                                                                     String endpoint) {
        return traceEnabledPublisher(resultPublisher, context, spanName, databaseId, endpoint,
            (T response) -> response.getStatusCode(), (T response) -> response.getDiagnostics(), null);
    }

    public Mono<CosmosBatchResponse> traceEnabledBatchResponsePublisher(Mono<CosmosBatchResponse> resultPublisher,
                                                                        Context context,
                                                                        String spanName,
                                                                        String containerId,
                                                                        String databaseId,
                                                                        CosmosAsyncClient client,
                                                                        ConsistencyLevel consistencyLevel,
                                                                        OperationType operationType,
                                                                        ResourceType resourceType) {

        return publisherWithClientTelemetry(resultPublisher, context, spanName, containerId, databaseId,
            BridgeInternal.getServiceEndpoint(client),
            client,
            consistencyLevel,
            operationType,
            resourceType,
            CosmosBatchResponse::getStatusCode,
            CosmosBatchResponse::getDiagnostics,
            null);
    }

    public <T> Mono<CosmosItemResponse<T>> traceEnabledCosmosItemResponsePublisher(Mono<CosmosItemResponse<T>> resultPublisher,
                                                                                   Context context,
                                                                                   String spanName,
                                                                                   String containerId,
                                                                                   String databaseId,
                                                                                   CosmosAsyncClient client,
                                                                                   ConsistencyLevel consistencyLevel,
                                                                                   OperationType operationType,
                                                                                   ResourceType resourceType,
                                                                                   Duration thresholdForDiagnosticsOnTracer) {

        return publisherWithClientTelemetry(resultPublisher, context, spanName, containerId, databaseId,
            BridgeInternal.getServiceEndpoint(client),
            client,
            consistencyLevel,
            operationType,
            resourceType,
            CosmosItemResponse::getStatusCode,
            CosmosItemResponse::getDiagnostics,
            thresholdForDiagnosticsOnTracer);
    }

    /**
     * Runs given {@code Flux<T>} publisher in the scope of trace context passed in using
     * {@link TracerProvider#setContextInReactor(Context, reactor.util.context.Context)} in {@code contextWrite}
     * Populates active trace context on Reactor's hot path. Reactor's instrumentation for OpenTelemetry
     * (or other hypothetical solution) will take care of the cold path.
     *
     * @param publisher publisher to run.
     * @return wrapped publisher.
     */
    public <T> Flux<T> runUnderSpanInContext(Flux<T> publisher) {
        return propagatingFlux
            .flatMap(ignored -> publisher);
    }

    private <T> Mono<T> traceEnabledPublisher(Mono<T> resultPublisher,
                                              Context context,
                                              String spanName,
                                              String databaseId,
                                              String endpoint,
                                              Function<T, Integer> statusCodeFunc,
                                              Function<T, CosmosDiagnostics> diagnosticFunc,
                                              Duration thresholdForDiagnosticsOnTracer) {

        if (!isEnabled()) {
            return resultPublisher;
        }

        Optional<Object> callDepth = context.getData(COSMOS_CALL_DEPTH);
        final boolean isNestedCall = callDepth.isPresent();
        if (isNestedCall) {
            return resultPublisher;
        }

        // propagatingMono ensures active span is propagated to the `resultPublisher`
        // subscription and hot path. OpenTelemetry reactor's instrumentation will
        // propagate it on the cold path.
        return propagatingMono
            .flatMap(ignored -> resultPublisher)
            .doOnEach(signal -> {
                switch (signal.getType()) {
                    case ON_NEXT:
                        T response = signal.get();
                        Context traceContext = getContextFromReactorOrNull(signal.getContextView());
                        CosmosDiagnostics cosmosDiagnostics = diagnosticFunc.apply(response);
                        try {
                            Duration threshold = thresholdForDiagnosticsOnTracer;
                            if (threshold == null) {
                                threshold = CRUD_THRESHOLD_FOR_DIAGNOSTICS;
                            }

                            if (cosmosDiagnostics != null
                                && cosmosDiagnostics.getDuration() != null
                                && cosmosDiagnostics.getDuration().compareTo(threshold) > 0) {
                                addDiagnosticsOnTracerEvent(cosmosDiagnostics, traceContext);
                            }
                        } catch (JsonProcessingException ex) {
                            LOGGER.warn("Error while serializing diagnostics for tracer", ex.getMessage());
                        }

                        this.endSpan(signal, statusCodeFunc.apply(response));
                        break;
                    case ON_ERROR:
                        // not adding diagnostics on trace event for exception as this information is already there as
                        // part of exception message
                        this.endSpan(signal, ERROR_CODE);
                        break;
                    default:
                        break;
                }})
            .contextWrite(setContextInReactor(this.startSpan(spanName, databaseId, endpoint,
                context)));
    }

    private <T> Mono<T> publisherWithClientTelemetry(Mono<T> resultPublisher,
                                                     Context context,
                                                     String spanName,
                                                     String containerId,
                                                     String databaseId,
                                                     String endpoint,
                                                     CosmosAsyncClient client,
                                                     ConsistencyLevel consistencyLevel,
                                                     OperationType operationType,
                                                     ResourceType resourceType,
                                                     Function<T, Integer> statusCodeFunc,
                                                     Function<T, CosmosDiagnostics> diagnosticFunc,
                                                     Duration thresholdForDiagnosticsOnTracer) {
        Mono<T> tracerMono = traceEnabledPublisher(resultPublisher, context, spanName, databaseId, endpoint, statusCodeFunc, diagnosticFunc, thresholdForDiagnosticsOnTracer);
        return tracerMono
            .doOnSuccess(response -> {
                if (Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(client)) && response instanceof CosmosItemResponse) {
                    @SuppressWarnings("unchecked")
                    CosmosItemResponse<T> itemResponse = (CosmosItemResponse<T>) response;
                    fillClientTelemetry(client, itemResponse.getDiagnostics(), itemResponse.getStatusCode(),
                        ModelBridgeInternal.getPayloadLength(itemResponse), containerId,
                        databaseId, operationType, resourceType, consistencyLevel,
                        (float) itemResponse.getRequestCharge());
                } else if (Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(client)) && response instanceof CosmosBatchResponse) {
                    @SuppressWarnings("unchecked")
                    CosmosBatchResponse cosmosBatchResponse = (CosmosBatchResponse) response;
                    fillClientTelemetry(client, cosmosBatchResponse.getDiagnostics(), cosmosBatchResponse.getStatusCode(),
                        ModelBridgeInternal.getPayloadLength(cosmosBatchResponse), containerId,
                        databaseId, operationType, resourceType, consistencyLevel,
                        (float) cosmosBatchResponse.getRequestCharge());
                }
            }).doOnError(throwable -> {
                if (Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(client)) && throwable instanceof CosmosException) {
                    CosmosException cosmosException = (CosmosException) throwable;
                    fillClientTelemetry(client, cosmosException.getDiagnostics(), cosmosException.getStatusCode(),
                        null, containerId,
                        databaseId, operationType, resourceType, consistencyLevel,
                        (float) cosmosException.getRequestCharge());
                }
            });
    }

    private void end(int statusCode, Throwable throwable, Context context) {
        if (throwable != null) {
            if (throwable instanceof CosmosException) {
                CosmosException cosmosException = (CosmosException) throwable;
                if (statusCode == HttpConstants.StatusCodes.NOTFOUND && cosmosException.getSubStatusCode() == 0) {
                    tracer.end(statusCode, null, context);
                    return;
                }
            }
        }
        tracer.end(statusCode, throwable, context);
    }

    private void fillClientTelemetry(CosmosAsyncClient cosmosAsyncClient,
                                    CosmosDiagnostics cosmosDiagnostics,
                                    int statusCode,
                                    Integer objectSize,
                                    String containerId,
                                    String databaseId,
                                    OperationType operationType,
                                    ResourceType resourceType,
                                    ConsistencyLevel consistencyLevel,
                                    float requestCharge) {
        ClientTelemetry telemetry = BridgeInternal.getContextClient(cosmosAsyncClient).getClientTelemetry();
        ReportPayload reportPayloadLatency = createReportPayload(cosmosAsyncClient, cosmosDiagnostics,
            statusCode, objectSize, containerId, databaseId
            , operationType, resourceType, consistencyLevel, ClientTelemetry.REQUEST_LATENCY_NAME,
            ClientTelemetry.REQUEST_LATENCY_UNIT);
        ConcurrentDoubleHistogram latencyHistogram = telemetry.getClientTelemetryInfo().getOperationInfoMap().get(reportPayloadLatency);
        if (latencyHistogram != null) {
            ClientTelemetry.recordValue(latencyHistogram, cosmosDiagnostics.getDuration().toMillis());
        } else {
            if (statusCode >= HttpConstants.StatusCodes.MINIMUM_SUCCESS_STATUSCODE && statusCode <= HttpConstants.StatusCodes.MAXIMUM_SUCCESS_STATUSCODE) {
                latencyHistogram = new ConcurrentDoubleHistogram(ClientTelemetry.REQUEST_LATENCY_MAX_MILLI_SEC, ClientTelemetry.REQUEST_LATENCY_SUCCESS_PRECISION);
            } else {
                latencyHistogram = new ConcurrentDoubleHistogram(ClientTelemetry.REQUEST_LATENCY_MAX_MILLI_SEC, ClientTelemetry.REQUEST_LATENCY_FAILURE_PRECISION);
            }

            latencyHistogram.setAutoResize(true);
            ClientTelemetry.recordValue(latencyHistogram, cosmosDiagnostics.getDuration().toMillis());
            telemetry.getClientTelemetryInfo().getOperationInfoMap().put(reportPayloadLatency, latencyHistogram);
        }

        ReportPayload reportPayloadRequestCharge = createReportPayload(cosmosAsyncClient, cosmosDiagnostics,
            statusCode, objectSize, containerId, databaseId
            , operationType, resourceType, consistencyLevel, ClientTelemetry.REQUEST_CHARGE_NAME, ClientTelemetry.REQUEST_CHARGE_UNIT);
        ConcurrentDoubleHistogram requestChargeHistogram = telemetry.getClientTelemetryInfo().getOperationInfoMap().get(reportPayloadRequestCharge);
        if (requestChargeHistogram != null) {
            ClientTelemetry.recordValue(requestChargeHistogram, requestCharge);
        } else {
            requestChargeHistogram = new ConcurrentDoubleHistogram(ClientTelemetry.REQUEST_CHARGE_MAX, ClientTelemetry.REQUEST_CHARGE_PRECISION);
            requestChargeHistogram.setAutoResize(true);
            ClientTelemetry.recordValue(requestChargeHistogram, requestCharge);
            telemetry.getClientTelemetryInfo().getOperationInfoMap().put(reportPayloadRequestCharge,
                requestChargeHistogram);
        }
    }

    private ReportPayload createReportPayload(CosmosAsyncClient cosmosAsyncClient,
                                              CosmosDiagnostics cosmosDiagnostics,
                                              int statusCode,
                                              Integer objectSize,
                                              String containerId,
                                              String databaseId,
                                              OperationType operationType,
                                              ResourceType resourceType,
                                              ConsistencyLevel consistencyLevel,
                                              String metricsName,
                                              String unitName) {
        ReportPayload reportPayload = new ReportPayload(metricsName, unitName);
        reportPayload.setRegionsContacted(BridgeInternal.getRegionsContacted(cosmosDiagnostics).toString());
        reportPayload.setConsistency(consistencyLevel == null ?
            BridgeInternal.getContextClient(cosmosAsyncClient).getConsistencyLevel() :
            consistencyLevel);
        if (objectSize != null) {
            reportPayload.setGreaterThan1Kb(objectSize > ClientTelemetry.ONE_KB_TO_BYTES);
        }

        reportPayload.setDatabaseName(databaseId);
        reportPayload.setContainerName(containerId);
        reportPayload.setOperation(operationType);
        reportPayload.setResource(resourceType);
        reportPayload.setStatusCode(statusCode);
        return reportPayload;
    }

    private void addDiagnosticsOnTracerEvent(CosmosDiagnostics cosmosDiagnostics, Context context) throws JsonProcessingException {
        if (cosmosDiagnostics == null || context == null) {
            return;
        }

        ClientSideRequestStatistics clientSideRequestStatistics =
            BridgeInternal.getClientSideRequestStatics(cosmosDiagnostics);

        Map<String, Object> attributes = null;
        //adding storeResponse
        int diagnosticsCounter = 1;
        for (ClientSideRequestStatistics.StoreResponseStatistics storeResponseStatistics :
            clientSideRequestStatistics.getResponseStatisticsList()) {
            attributes = new HashMap<>();
            attributes.put(JSON_STRING, mapper.writeValueAsString(storeResponseStatistics));
            Iterator<RequestTimeline.Event> eventIterator = null;
            try {
                if (storeResponseStatistics.getStoreResult() != null) {
                    eventIterator =
                        DirectBridgeInternal.getRequestTimeline(storeResponseStatistics.getStoreResult().toResponse()).iterator();
                }
            } catch (CosmosException ex) {
                eventIterator = BridgeInternal.getRequestTimeline(ex).iterator();
            }

            OffsetDateTime requestStartTime = OffsetDateTime.ofInstant(storeResponseStatistics.getRequestResponseTimeUTC()
                , ZoneOffset.UTC);
            if (eventIterator != null) {
                while (eventIterator.hasNext()) {
                    RequestTimeline.Event event = eventIterator.next();
                    if (event.getName().equals("created")) {
                        requestStartTime = OffsetDateTime.ofInstant(event.getStartTime(), ZoneOffset.UTC);
                        break;
                    }
                }
            }

            this.addEvent("StoreResponse" + diagnosticsCounter++, attributes, requestStartTime, context);
        }

        //adding supplemental storeResponse
        diagnosticsCounter = 1;
        for (ClientSideRequestStatistics.StoreResponseStatistics statistics :
            ClientSideRequestStatistics.getCappedSupplementalResponseStatisticsList(clientSideRequestStatistics.getSupplementalResponseStatisticsList())) {
            attributes = new HashMap<>();
            attributes.put(JSON_STRING, mapper.writeValueAsString(statistics));
            OffsetDateTime requestStartTime = OffsetDateTime.ofInstant(statistics.getRequestResponseTimeUTC(),
                ZoneOffset.UTC);
            if (statistics.getStoreResult() != null) {
                Iterator<RequestTimeline.Event> eventIterator =
                    DirectBridgeInternal.getRequestTimeline(statistics.getStoreResult().toResponse()).iterator();
                while (eventIterator.hasNext()) {
                    RequestTimeline.Event event = eventIterator.next();
                    if (event.getName().equals("created")) {
                        requestStartTime = OffsetDateTime.ofInstant(event.getStartTime(), ZoneOffset.UTC);
                        break;
                    }
                }
            }
            this.addEvent("Supplemental StoreResponse" + diagnosticsCounter++, attributes, requestStartTime, context);
        }

        //adding gateway statistics
        if (clientSideRequestStatistics.getGatewayStatistics() != null) {
            attributes = new HashMap<>();
            attributes.put(JSON_STRING,
                mapper.writeValueAsString(clientSideRequestStatistics.getGatewayStatistics()));
            OffsetDateTime requestStartTime =
                OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC);
            if (clientSideRequestStatistics.getGatewayStatistics().getRequestTimeline() != null) {
                Iterator<RequestTimeline.Event> eventIterator =
                    clientSideRequestStatistics.getGatewayStatistics().getRequestTimeline().iterator();
                while (eventIterator.hasNext()) {
                    RequestTimeline.Event event = eventIterator.next();
                    if (event.getName().equals("created")) {
                        requestStartTime = OffsetDateTime.ofInstant(event.getStartTime(), ZoneOffset.UTC);
                        break;
                    }
                }
            }
            this.addEvent("GatewayStatistics", attributes, requestStartTime, context);
        }

        //adding retry context
        if (clientSideRequestStatistics.getRetryContext().getRetryStartTime() != null) {
            attributes = new HashMap<>();
            attributes.put(JSON_STRING,
                mapper.writeValueAsString(clientSideRequestStatistics.getRetryContext()));
            this.addEvent("Retry Context", attributes,
                OffsetDateTime.ofInstant(clientSideRequestStatistics.getRetryContext().getRetryStartTime(),
                    ZoneOffset.UTC), context);
        }

        //adding addressResolutionStatistics
        diagnosticsCounter = 1;
        for (ClientSideRequestStatistics.AddressResolutionStatistics addressResolutionStatistics :
            clientSideRequestStatistics.getAddressResolutionStatistics().values()) {
            attributes = new HashMap<>();
            attributes.put(JSON_STRING, mapper.writeValueAsString(addressResolutionStatistics));
            this.addEvent("AddressResolutionStatistics" + diagnosticsCounter++, attributes,
                OffsetDateTime.ofInstant(addressResolutionStatistics.getStartTimeUTC(), ZoneOffset.UTC), context);
        }

        //adding serializationDiagnosticsContext
        if (clientSideRequestStatistics.getSerializationDiagnosticsContext().serializationDiagnosticsList != null) {
            for (SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics :
                clientSideRequestStatistics.getSerializationDiagnosticsContext().serializationDiagnosticsList) {
                attributes = new HashMap<>();
                attributes.put(JSON_STRING, mapper.writeValueAsString(serializationDiagnostics));
                this.addEvent("SerializationDiagnostics " + serializationDiagnostics.serializationType, attributes,
                    OffsetDateTime.ofInstant(serializationDiagnostics.startTimeUTC, ZoneOffset.UTC), context);
            }
        }

        //adding systemInformation
        attributes = new HashMap<>();
        attributes.put(JSON_STRING,
            mapper.writeValueAsString(clientSideRequestStatistics.getContactedRegionNames()));
        this.addEvent("RegionContacted", attributes,
            OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), context);


        //adding systemInformation
        attributes = new HashMap<>();
        attributes.put(JSON_STRING,
            mapper.writeValueAsString(ClientSideRequestStatistics.fetchSystemInformation()));
        this.addEvent("SystemInformation", attributes,
            OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), context);

        //adding clientCfgs
        attributes = new HashMap<>();
        attributes.put(JSON_STRING,
            mapper.writeValueAsString(clientSideRequestStatistics.getDiagnosticsClientContext()));
        this.addEvent("ClientCfgs", attributes,
            OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), context);
    }

    private static void subscribe(Tracer tracer, CoreSubscriber<? super Object> actual) {
        Context context = getContextFromReactorOrNull(actual.currentContext());
        if (context != null) {
            AutoCloseable scope = tracer.makeSpanCurrent(context);
            try {
                actual.onSubscribe(Operators.scalarSubscription(actual, DUMMY_VALUE));
            } finally {
                try {
                    scope.close();
                } catch (Exception e) {
                    // can't happen
                }
            }
        } else {
            actual.onSubscribe(Operators.scalarSubscription(actual, DUMMY_VALUE));
        }
    }

    /**
     * Helper class allowing running Mono subscription (and anything on the hot path)
     * in scope of trace context. This enables OpenTelemetry auto-collection
     * to pick it up and correlate lower levels of instrumentation and logs
     * to logical Cosmos spans.
     *
     * OpenTelemetry reactor auto-instrumentation will take care of the cold path.
     */
    private final class PropagatingMono extends Mono<Object> {
        @Override
        public void subscribe(CoreSubscriber<? super Object> actual) {
            TracerProvider.subscribe(tracer, actual);
        }
    }

    /**
     * Helper class allowing running Flux subscription (and anything on the hot path)
     * in scope of trace context. This enables OpenTelemetry auto-collection
     * to pick it up and correlate lower levels of instrumentation and logs
     * to logical Cosmos spans.
     *
     * OpenTelemetry reactor auto-instrumentation will take care of the cold path.
     */
    private final class PropagatingFlux extends Flux<Object> {
        @Override
        public void subscribe(CoreSubscriber<? super Object> actual) {
            TracerProvider.subscribe(tracer, actual);
        }
    }
}