AzureMonitorTraceExporter.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.monitor.opentelemetry.exporter;

import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.monitor.opentelemetry.exporter.implementation.models.ContextTagKeys;
import com.azure.monitor.opentelemetry.exporter.implementation.models.MessageData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.MonitorBase;
import com.azure.monitor.opentelemetry.exporter.implementation.models.MonitorDomain;
import com.azure.monitor.opentelemetry.exporter.implementation.models.RemoteDependencyData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.RequestData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryExceptionData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryExceptionDetails;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.FormattedDuration;
import com.azure.monitor.opentelemetry.exporter.implementation.VersionGenerator;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import reactor.util.context.Context;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
 * This class is an implementation of OpenTelemetry {@link SpanExporter} that allows different tracing services to
 * export recorded data for sampled spans in their own format.
 */
public final class AzureMonitorTraceExporter implements SpanExporter {
    private static final Pattern COMPONENT_PATTERN = Pattern
        .compile("io\\.opentelemetry\\.javaagent\\.([^0-9]*)(-[0-9.]*)?");

    private static final Set<String> SQL_DB_SYSTEMS;

    private static final Set<String> STANDARD_ATTRIBUTE_PREFIXES;

    private static final AttributeKey<String> AZURE_NAMESPACE =
        AttributeKey.stringKey("az.namespace");
    private static final AttributeKey<String> AZURE_SDK_PEER_ADDRESS =
        AttributeKey.stringKey("peer.address");
    private static final AttributeKey<String> AZURE_SDK_MESSAGE_BUS_DESTINATION =
        AttributeKey.stringKey("message_bus.destination");
    private static final AttributeKey<Long> AZURE_SDK_ENQUEUED_TIME =
        AttributeKey.longKey("x-opt-enqueued-time");

    private static final ClientLogger LOGGER = new ClientLogger(AzureMonitorTraceExporter.class);

    static {
        Set<String> dbSystems = new HashSet<>();
        dbSystems.add("db2");
        dbSystems.add("derby");
        dbSystems.add("mariadb");
        dbSystems.add("mssql");
        dbSystems.add("mysql");
        dbSystems.add("oracle");
        dbSystems.add("postgresql");
        dbSystems.add("sqlite");
        dbSystems.add("other_sql");
        dbSystems.add("hsqldb");
        dbSystems.add("h2");

        SQL_DB_SYSTEMS = Collections.unmodifiableSet(dbSystems);

        Set<String> standardAttributesPrefix = new HashSet<>();
        standardAttributesPrefix.add("http");
        standardAttributesPrefix.add("db");
        standardAttributesPrefix.add("message");
        standardAttributesPrefix.add("messaging");
        standardAttributesPrefix.add("rpc");
        standardAttributesPrefix.add("enduser");
        standardAttributesPrefix.add("net");
        standardAttributesPrefix.add("peer");
        standardAttributesPrefix.add("exception");
        standardAttributesPrefix.add("thread");
        standardAttributesPrefix.add("faas");

        STANDARD_ATTRIBUTE_PREFIXES = Collections.unmodifiableSet(standardAttributesPrefix);
    }

    private final MonitorExporterAsyncClient client;
    private final String instrumentationKey;

    /**
     * Creates an instance of exporter that is configured with given exporter client that sends telemetry events to
     * Application Insights resource identified by the instrumentation key.
     * @param client The client used to send data to Azure Monitor.
     * @param instrumentationKey The instrumentation key of Application Insights resource.
     */
    AzureMonitorTraceExporter(MonitorExporterAsyncClient client, String instrumentationKey) {
        this.client = client;
        this.instrumentationKey = instrumentationKey;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public CompletableResultCode export(Collection<SpanData> spans) {
        CompletableResultCode completableResultCode = new CompletableResultCode();
        try {
            List<TelemetryItem> telemetryItems = new ArrayList<>();
            for (SpanData span : spans) {
                LOGGER.verbose("exporting span: {}", span);
                export(span, telemetryItems);
            }
            client.export(telemetryItems)
                .subscriberContext(Context.of(Tracer.DISABLE_TRACING_KEY, true))
                .subscribe(ignored -> { }, error -> completableResultCode.fail(), completableResultCode::succeed);
            return completableResultCode;
        } catch (Throwable t) {
            LOGGER.error(t.getMessage(), t);
            return completableResultCode.fail();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public CompletableResultCode flush() {
        return CompletableResultCode.ofSuccess();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public CompletableResultCode shutdown() {
        return CompletableResultCode.ofSuccess();
    }

    private void export(SpanData span, List<TelemetryItem> telemetryItems) {
        SpanKind kind = span.getKind();
        String instrumentationName = span.getInstrumentationLibraryInfo().getName();
        Matcher matcher = COMPONENT_PATTERN.matcher(instrumentationName);
        String stdComponent = matcher.matches() ? matcher.group(1) : null;
        if (kind == SpanKind.INTERNAL) {
            if ("spring-scheduling".equals(stdComponent) && !span.getParentSpanContext().isValid()) {
                // if (!span.getParentSpanContext().isValid()) {
                // TODO (trask) need semantic convention for determining whether to map INTERNAL to request or
                //  dependency (or need clarification to use SERVER for this)
                exportRequest(span, telemetryItems);
            } else {
                exportRemoteDependency(span, true, telemetryItems);
            }
        } else if (kind == SpanKind.CLIENT || kind == SpanKind.PRODUCER) {
            exportRemoteDependency(span, false, telemetryItems);
        } else if (kind == SpanKind.CONSUMER && !span.getParentSpanContext().isRemote()) {
            exportRemoteDependency(span, false, telemetryItems);
        } else if (kind == SpanKind.SERVER || kind == SpanKind.CONSUMER) {
            exportRequest(span, telemetryItems);
        } else {
            throw LOGGER.logExceptionAsError(new UnsupportedOperationException(kind.name()));
        }
    }

    private static List<TelemetryExceptionDetails> minimalParse(String errorStack) {
        TelemetryExceptionDetails details = new TelemetryExceptionDetails();
        String line = errorStack.split(System.lineSeparator())[0];
        int index = line.indexOf(": ");

        if (index != -1) {
            details.setTypeName(line.substring(0, index));
            details.setMessage(line.substring(index + 2));
        } else {
            details.setTypeName(line);
        }
        // TODO (trask): map OpenTelemetry exception to Application Insights exception better
        details.setStack(errorStack);
        return Collections.singletonList(details);
    }

    private void exportRemoteDependency(SpanData span, boolean inProc,
                                        List<TelemetryItem> telemetryItems) {
        TelemetryItem telemetry = new TelemetryItem();
        RemoteDependencyData data = new RemoteDependencyData();
        initTelemetry(telemetry, data, "RemoteDependency", "RemoteDependencyData");
        // TODO (trask): can properties be moved up to MonitorDomain and then this can be lazy init in setExtraAttributes
        data.setProperties(new HashMap<>());
        // sampling is not supported in this exporter yet
        float samplingPercentage = 100;

        // set standard properties
        setOperationTags(telemetry, span);
        setTime(telemetry, span.getStartEpochNanos());
        setExtraAttributes(telemetry, data.getProperties(), span.getAttributes());
        addLinks(data.getProperties(), span.getLinks());

        // set dependency-specific properties
        data.setId(span.getSpanId());
        data.setName(span.getName());
        data.setDuration(FormattedDuration.getFormattedDuration(span.getEndEpochNanos() - span.getStartEpochNanos()));
        data.setSuccess(span.getStatus().getStatusCode() != StatusCode.ERROR);

        if (inProc) {
            data.setType("InProc");
        } else {
            applySemanticConventions(span, data);
        }

        telemetryItems.add(telemetry);
        exportEvents(span, telemetryItems);
    }

    private static void applySemanticConventions(SpanData span, RemoteDependencyData remoteDependencyData) {
        Attributes attributes = span.getAttributes();
        String httpMethod = attributes.get(SemanticAttributes.HTTP_METHOD);
        if (httpMethod != null) {
            applyHttpClientSpan(attributes, remoteDependencyData);
            return;
        }
        String rpcSystem = attributes.get(SemanticAttributes.RPC_SYSTEM);
        if (rpcSystem != null) {
            applyRpcClientSpan(attributes, remoteDependencyData, rpcSystem);
            return;
        }
        String dbSystem = attributes.get(SemanticAttributes.DB_SYSTEM);
        if (dbSystem != null) {
            applyDatabaseClientSpan(attributes, remoteDependencyData, dbSystem);
            return;
        }
        String azureNamespace = attributes.get(AZURE_NAMESPACE);
        if (azureNamespace != null && "Microsoft.EventHub".equals(azureNamespace)) {
            applyEventHubsSpan(attributes, remoteDependencyData);
            return;
        }
        if (azureNamespace != null && "Microsoft.ServiceBus".equals(azureNamespace)) {
            applyServiceBusSpan(attributes, remoteDependencyData);
            return;
        }
        String messagingSystem = attributes.get(SemanticAttributes.MESSAGING_SYSTEM);
        if (messagingSystem != null) {
            applyMessagingClientSpan(attributes, remoteDependencyData, messagingSystem, span.getKind());
            return;
        }

        // passing max value because we don't know what the default port would be in this case,
        // so we always want the port included
        String target = getTargetFromPeerAttributes(attributes, Integer.MAX_VALUE);
        if (target != null) {
            remoteDependencyData.setTarget(target);
            return;
        }

        // with no target, the App Map falls back to creating a node based on the telemetry name,
        // which is very confusing, e.g. when multiple unrelated nodes all point to a single node
        // because they had dependencies with the same telemetry name
        //
        // so we mark these as InProc, even though they aren't INTERNAL spans,
        // in order to prevent App Map from considering them
        remoteDependencyData.setType("InProc");
    }

    private static void setOperationTags(TelemetryItem telemetry, SpanData span) {
        setOperationTags(telemetry, span.getTraceId(), span.getParentSpanContext().getSpanId());
    }

    private static void setOperationTags(TelemetryItem telemetry, String traceId, String parentSpanId) {
        telemetry.getTags().put(ContextTagKeys.AI_OPERATION_ID.toString(), traceId);
        if (SpanId.isValid(parentSpanId)) {
            telemetry.getTags().put(ContextTagKeys.AI_OPERATION_PARENT_ID.toString(), parentSpanId);
        }
    }

    private static void applyHttpClientSpan(Attributes attributes, RemoteDependencyData telemetry) {

        String target = getTargetForHttpClientSpan(attributes);

        telemetry.setType("Http");
        telemetry.setTarget(target);

        Long httpStatusCode = attributes.get(SemanticAttributes.HTTP_STATUS_CODE);
        if (httpStatusCode != null) {
            telemetry.setResultCode(Long.toString(httpStatusCode));
        }

        String url = attributes.get(SemanticAttributes.HTTP_URL);
        telemetry.setData(url);
    }

    private static String getTargetForHttpClientSpan(Attributes attributes) {
        // from the spec, at least one of the following sets of attributes is required:
        // * http.url
        // * http.scheme, http.host, http.target
        // * http.scheme, net.peer.name, net.peer.port, http.target
        // * http.scheme, net.peer.ip, net.peer.port, http.target
        String target = getTargetFromPeerService(attributes);
        if (target != null) {
            return target;
        }
        // note http.host includes the port (at least when non-default)
        target = attributes.get(SemanticAttributes.HTTP_HOST);
        if (target != null) {
            String scheme = attributes.get(SemanticAttributes.HTTP_SCHEME);
            if ("http".equals(scheme)) {
                if (target.endsWith(":80")) {
                    target = target.substring(0, target.length() - 3);
                }
            } else if ("https".equals(scheme)) {
                if (target.endsWith(":443")) {
                    target = target.substring(0, target.length() - 4);
                }
            }
            return target;
        }
        String url = attributes.get(SemanticAttributes.HTTP_URL);
        if (url != null) {
            URI uri;
            try {
                uri = new URI(url);
            } catch (URISyntaxException e) {
                LOGGER.verbose(e.getMessage(), e);
                uri = null;
            }
            if (uri != null) {
                target = uri.getHost();
                if (uri.getPort() != 80 && uri.getPort() != 443 && uri.getPort() != -1) {
                    target += ":" + uri.getPort();
                }
                return target;
            }
        }
        String scheme = attributes.get(SemanticAttributes.HTTP_SCHEME);
        int defaultPort;
        if ("http".equals(scheme)) {
            defaultPort = 80;
        } else if ("https".equals(scheme)) {
            defaultPort = 443;
        } else {
            defaultPort = 0;
        }
        target = getTargetFromNetAttributes(attributes, defaultPort);
        if (target != null) {
            return target;
        }
        // this should not happen, just a failsafe
        return "Http";
    }

    private static String getTargetFromPeerAttributes(Attributes attributes, int defaultPort) {
        String target = getTargetFromPeerService(attributes);
        if (target != null) {
            return target;
        }
        return getTargetFromNetAttributes(attributes, defaultPort);
    }

    private static String getTargetFromPeerService(Attributes attributes) {
        // do not append port to peer.service
        return attributes.get(SemanticAttributes.PEER_SERVICE);
    }

    private static String getTargetFromNetAttributes(Attributes attributes, int defaultPort) {
        String target = getHostFromNetAttributes(attributes);
        if (target == null) {
            return null;
        }
        // append net.peer.port to target
        Long port = attributes.get(SemanticAttributes.NET_PEER_PORT);
        if (port != null && port != defaultPort) {
            return target + ":" + port;
        }
        return target;
    }

    private static String getHostFromNetAttributes(Attributes attributes) {
        String host = attributes.get(SemanticAttributes.NET_PEER_NAME);
        if (host != null) {
            return host;
        }
        return attributes.get(SemanticAttributes.NET_PEER_IP);
    }

    private static void applyRpcClientSpan(Attributes attributes, RemoteDependencyData telemetry,
                                           String rpcSystem) {
        telemetry.setType(rpcSystem);
        String target = getTargetFromPeerAttributes(attributes, 0);
        // not appending /rpc.service for now since that seems too fine-grained
        if (target == null) {
            target = rpcSystem;
        }
        telemetry.setTarget(target);
    }

    private static void applyDatabaseClientSpan(Attributes attributes, RemoteDependencyData telemetry,
                                                String dbSystem) {
        String dbStatement = attributes.get(SemanticAttributes.DB_STATEMENT);
        String type;
        if (SQL_DB_SYSTEMS.contains(dbSystem)) {
            type = "SQL";
            // keeping existing behavior that was release in 3.0.0 for now
            // not going with new jdbc instrumentation span name of
            // "<db.operation> <db.name>.<db.sql.table>" for now just in case this behavior is reversed
            // due to spec:
            // "It is not recommended to attempt any client-side parsing of `db.statement` just to get
            // these properties, they should only be used if the library being instrumented already
            // provides them."
            // also need to discuss with other AI language exporters
            //
            // if we go to shorter span name now, and it gets reverted, no way for customers to get the
            // shorter name back
            // whereas if we go to shorter span name in the future, and they still prefer more
            // cardinality, they can get that back using telemetry processor to copy db.statement into
            // span name
            telemetry.setName(dbStatement);
        } else {
            type = dbSystem;
        }
        telemetry.setType(type);
        telemetry.setData(dbStatement);
        String target =
            nullAwareConcat(
                getTargetFromPeerAttributes(attributes, getDefaultPortForDbSystem(dbSystem)),
                attributes.get(SemanticAttributes.DB_NAME),
                "/");
        if (target == null) {
            target = dbSystem;
        }
        telemetry.setTarget(target);
    }

    private static void applyMessagingClientSpan(Attributes attributes, RemoteDependencyData telemetry,
                                                 String messagingSystem, SpanKind spanKind) {
        if (spanKind == SpanKind.PRODUCER) {
            telemetry.setType("Queue Message | " + messagingSystem);
        } else {
            // e.g. CONSUMER kind (without remote parent) and CLIENT kind
            telemetry.setType(messagingSystem);
        }
        String destination = attributes.get(SemanticAttributes.MESSAGING_DESTINATION);
        if (destination != null) {
            telemetry.setTarget(destination);
        } else {
            telemetry.setTarget(messagingSystem);
        }
    }

    // special case needed until Azure SDK moves to OTel semantic conventions
    private static void applyEventHubsSpan(Attributes attributes, RemoteDependencyData telemetry) {
        telemetry.setType("Microsoft.EventHub");
        telemetry.setTarget(getAzureSdkTargetSource(attributes));
    }

    // special case needed until Azure SDK moves to OTel semantic conventions
    private static void applyServiceBusSpan(Attributes attributes, RemoteDependencyData telemetry) {
        telemetry.setType("AZURE SERVICE BUS");
        telemetry.setTarget(getAzureSdkTargetSource(attributes));
    }

    private static String getAzureSdkTargetSource(Attributes attributes) {
        String peerAddress = attributes.get(AZURE_SDK_PEER_ADDRESS);
        String destination = attributes.get(AZURE_SDK_MESSAGE_BUS_DESTINATION);
        return peerAddress + "/" + destination;
    }

    private static int getDefaultPortForDbSystem(String dbSystem) {
        // jdbc default ports are from
        // io.opentelemetry.javaagent.instrumentation.jdbc.JdbcConnectionUrlParser
        // TODO (trask) make the ports constants (at least in JdbcConnectionUrlParser) so they can be used here
        switch (dbSystem) {
            case SemanticAttributes.DbSystemValues.MONGODB:
                return 27017;
            case SemanticAttributes.DbSystemValues.CASSANDRA:
                return 9042;
            case SemanticAttributes.DbSystemValues.REDIS:
                return 6379;
            case SemanticAttributes.DbSystemValues.MARIADB:
            case SemanticAttributes.DbSystemValues.MYSQL:
                return 3306;
            case SemanticAttributes.DbSystemValues.MSSQL:
                return 1433;
            case SemanticAttributes.DbSystemValues.DB2:
                return 50000;
            case SemanticAttributes.DbSystemValues.ORACLE:
                return 1521;
            case SemanticAttributes.DbSystemValues.H2:
                return 8082;
            case SemanticAttributes.DbSystemValues.DERBY:
                return 1527;
            case SemanticAttributes.DbSystemValues.POSTGRESQL:
                return 5432;
            default:
                return 0;
        }
    }

    private void exportRequest(SpanData span, List<TelemetryItem> telemetryItems) {
        TelemetryItem telemetry = new TelemetryItem();
        RequestData data = new RequestData();
        initTelemetry(telemetry, data, "Request", "RequestData");
        // TODO (trask): can properties be moved up to MonitorDomain and then this can be lazy init in setExtraAttributes
        data.setProperties(new HashMap<>());

        Attributes attributes = span.getAttributes();
        long startEpochNanos = span.getStartEpochNanos();
        // sampling is not supported in this exporter yet
        float samplingPercentage = 100;

        // set standard properties
        data.setId(span.getSpanId());
        setTime(telemetry, startEpochNanos);
        setExtraAttributes(telemetry, data.getProperties(), attributes);
        addLinks(data.getProperties(), span.getLinks());

        String operationName = getOperationName(span);
        telemetry.getTags().put(ContextTagKeys.AI_OPERATION_NAME.toString(), operationName);
        telemetry.getTags().put(ContextTagKeys.AI_OPERATION_ID.toString(), span.getTraceId());

        telemetry
            .getTags()
            .put(
                ContextTagKeys.AI_OPERATION_PARENT_ID.toString(),
                span.getParentSpanContext().getSpanId());

        // set request-specific properties
        data.setName(operationName);
        data.setDuration(FormattedDuration.getFormattedDuration(span.getEndEpochNanos() - startEpochNanos));
        data.setSuccess(span.getStatus().getStatusCode() != StatusCode.ERROR);

        String httpUrl = attributes.get(SemanticAttributes.HTTP_URL);
        if (httpUrl != null) {
            data.setUrl(httpUrl);
        }

        Long httpStatusCode = attributes.get(SemanticAttributes.HTTP_STATUS_CODE);
        if (httpStatusCode == null) {
            httpStatusCode = attributes.get(SemanticAttributes.RPC_GRPC_STATUS_CODE);
        }
        if (httpStatusCode != null) {
            data.setResponseCode(Long.toString(httpStatusCode));
        } else {
            data.setResponseCode("0");
        }

        String locationIp = attributes.get(SemanticAttributes.HTTP_CLIENT_IP);
        if (locationIp == null) {
            // only use net.peer.ip if http.client_ip is not available
            locationIp = attributes.get(SemanticAttributes.NET_PEER_IP);
        }
        if (locationIp != null) {
            telemetry.getTags().put(ContextTagKeys.AI_LOCATION_IP.toString(), locationIp);
        }

        data.setSource(getSource(attributes));

        if (isAzureQueue(attributes)) {
            // TODO (trask): for batch consumer, enqueuedTime should be the average of this attribute
            //  across all links
            Long enqueuedTime = attributes.get(AZURE_SDK_ENQUEUED_TIME);
            if (enqueuedTime != null) {
                long timeSinceEnqueued =
                    NANOSECONDS.toMillis(span.getStartEpochNanos()) - SECONDS.toMillis(enqueuedTime);
                if (timeSinceEnqueued < 0) {
                    timeSinceEnqueued = 0;
                }
                if (data.getMeasurements() == null) {
                    data.setMeasurements(new HashMap<>());
                }
                data.getMeasurements().put("timeSinceEnqueued", (double) timeSinceEnqueued);
            }
        }

        telemetryItems.add(telemetry);
        exportEvents(span, telemetryItems);
    }

    private static String getSource(Attributes attributes) {
        if (isAzureQueue(attributes)) {
            return getAzureSdkTargetSource(attributes);
        }
        String messagingSystem = attributes.get(SemanticAttributes.MESSAGING_SYSTEM);
        if (messagingSystem != null) {
            // TODO (trask) AI mapping: should this pass default port for messaging.system?
            String source =
                nullAwareConcat(
                    getTargetFromPeerAttributes(attributes, 0),
                    attributes.get(SemanticAttributes.MESSAGING_DESTINATION),
                    "/");
            if (source != null) {
                return source;
            }
            // fallback
            return messagingSystem;
        }
        return null;
    }

    private static boolean isAzureQueue(Attributes attributes) {
        String azureNamespace = attributes.get(AZURE_NAMESPACE);
        if (azureNamespace == null) {
            return false;
        }
        return "Microsoft.EventHub".equals(azureNamespace)
            || "Microsoft.ServiceBus".equals(azureNamespace);
    }

    private static String getOperationName(SpanData span) {
        String spanName = span.getName();
        String httpMethod = span.getAttributes().get(SemanticAttributes.HTTP_METHOD);
        if (httpMethod != null && spanName.startsWith("/")) {
            return httpMethod + " " + spanName;
        }
        return spanName;
    }

    private static String nullAwareConcat(String str1, String str2, String separator) {
        if (str1 == null) {
            return str2;
        }
        if (str2 == null) {
            return str1;
        }
        return str1 + separator + str2;
    }

    private void exportEvents(SpanData span, List<TelemetryItem> telemetryItems) {
        for (EventData event : span.getEvents()) {

            if (event.getAttributes().get(SemanticAttributes.EXCEPTION_TYPE) != null
                || event.getAttributes().get(SemanticAttributes.EXCEPTION_MESSAGE) != null) {
                // TODO (trask): map OpenTelemetry exception to Application Insights exception better
                String stacktrace = event.getAttributes().get(SemanticAttributes.EXCEPTION_STACKTRACE);
                if (stacktrace != null) {
                    trackException(stacktrace, span, telemetryItems);
                }
                return;
            }

            TelemetryItem telemetry = new TelemetryItem();
            MessageData data = new MessageData();
            initTelemetry(telemetry, data, "Message", "MessageData");
            // TODO (trask): can properties be moved up to MonitorDomain and then this can be lazy init in setExtraAttributes
            data.setProperties(new HashMap<>());

            // set standard properties
            setOperationTags(telemetry, span.getTraceId(), span.getSpanId());
            setTime(telemetry, event.getEpochNanos());
            setExtraAttributes(telemetry, data.getProperties(), event.getAttributes());

            // set message-specific properties
            data.setMessage(event.getName());

            telemetryItems.add(telemetry);
        }
    }

    private void trackException(String errorStack, SpanData span, List<TelemetryItem> telemetryItems) {
        TelemetryItem telemetry = new TelemetryItem();
        TelemetryExceptionData data = new TelemetryExceptionData();
        initTelemetry(telemetry, data, "Exception", "ExceptionData");
        // TODO (trask): can properties be moved up to MonitorDomain and then this can be lazy init in setExtraAttributes
        data.setProperties(new HashMap<>());

        // set standard properties
        setOperationTags(telemetry, span.getTraceId(), span.getSpanId());
        setTime(telemetry, span.getEndEpochNanos());

        // set exception-specific properties
        data.setExceptions(minimalParse(errorStack));

        telemetryItems.add(telemetry);
    }

    private void initTelemetry(TelemetryItem telemetry, MonitorDomain data, String telemetryName,
                               String baseType) {
        telemetry.setVersion(1);
        telemetry.setName(telemetryName);
        telemetry.setInstrumentationKey(instrumentationKey);
        telemetry.setTags(new HashMap<>());
        // Set AI Internal SDK Version
        telemetry.getTags().put(ContextTagKeys.AI_INTERNAL_SDK_VERSION.toString(), VersionGenerator.getSdkVersion());
        data.setVersion(2);

        MonitorBase monitorBase = new MonitorBase();
        telemetry.setData(monitorBase);
        monitorBase.setBaseType(baseType);
        monitorBase.setBaseData(data);
    }

    private static void setTime(TelemetryItem telemetry, long epochNanos) {
        telemetry.setTime(getFormattedTime(epochNanos));
    }

    private static OffsetDateTime getFormattedTime(long epochNanos) {
        return Instant.ofEpochMilli(NANOSECONDS.toMillis(epochNanos)).atOffset(ZoneOffset.UTC);
    }

    private static void addLinks(Map<String, String> properties, List<LinkData> links) {
        if (links.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        boolean first = true;
        for (LinkData link : links) {
            if (!first) {
                sb.append(",");
            }
            sb.append("{\"operation_Id\":\"");
            sb.append(link.getSpanContext().getTraceId());
            sb.append("\",\"id\":\"");
            sb.append(link.getSpanContext().getSpanId());
            sb.append("\"}");
            first = false;
        }
        sb.append("]");
        properties.put("_MS.links", sb.toString());
    }

    private static void setExtraAttributes(TelemetryItem telemetry, Map<String, String> properties,
                                           Attributes attributes) {
        attributes.forEach((key, value) -> {
            String stringKey = key.getKey();
            if (stringKey.startsWith("applicationinsights.internal.")) {
                return;
            }
            // TODO (trask) use az.namespace for something?
            if (stringKey.equals(AZURE_SDK_MESSAGE_BUS_DESTINATION.getKey())
                || "az.namespace".equals(stringKey)) {
                return;
            }
            // special case mappings
            if (key.getKey().equals("enduser.id") && value instanceof String) {
                telemetry.getTags().put(ContextTagKeys.AI_USER_ID.toString(), (String) value);
                return;
            }
            if (key.getKey().equals("http.user_agent") && value instanceof String) {
                telemetry.getTags().put("ai.user.userAgent", (String) value);
                return;
            }
            int index = stringKey.indexOf(".");
            String prefix = index == -1 ? stringKey : stringKey.substring(0, index);
            if (STANDARD_ATTRIBUTE_PREFIXES.contains(prefix)) {
                return;
            }
            String val = getStringValue(key, value);
            if (value != null) {
                properties.put(key.getKey(), val);
            }
        });
    }

    private static String getStringValue(AttributeKey<?> attributeKey, Object value) {
        switch (attributeKey.getType()) {
            case STRING:
            case BOOLEAN:
            case LONG:
            case DOUBLE:
                return String.valueOf(value);
            case STRING_ARRAY:
            case BOOLEAN_ARRAY:
            case LONG_ARRAY:
            case DOUBLE_ARRAY:
                return join((List<?>) value);
            default:
                LOGGER.warning("unexpected attribute type: {}", attributeKey.getType());
                return null;
        }
    }

    private static <T> String join(List<T> values) {
        StringBuilder sb = new StringBuilder();
        for (Object val : values) {
            if (sb.length() > 0) {
                sb.append(", ");
            }
            sb.append(val);
        }
        return sb.toString();
    }
}