AzureMonitorExporter.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.opentelemetry.exporter.azuremonitor;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.microsoft.opentelemetry.exporter.azuremonitor.implementation.models.ContextTagKeys;
import com.microsoft.opentelemetry.exporter.azuremonitor.implementation.models.MonitorBase;
import com.microsoft.opentelemetry.exporter.azuremonitor.implementation.models.RemoteDependencyData;
import com.microsoft.opentelemetry.exporter.azuremonitor.implementation.models.RequestData;
import com.microsoft.opentelemetry.exporter.azuremonitor.implementation.models.TelemetryEventData;
import com.microsoft.opentelemetry.exporter.azuremonitor.implementation.models.TelemetryExceptionData;
import com.microsoft.opentelemetry.exporter.azuremonitor.implementation.models.TelemetryExceptionDetails;
import com.microsoft.opentelemetry.exporter.azuremonitor.implementation.models.TelemetryItem;
import io.opentelemetry.common.AttributeValue;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.common.ReadableAttributes;
import io.opentelemetry.common.ReadableKeyValuePairs;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanId;
import io.opentelemetry.trace.attributes.SemanticAttributes;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
 * This class is an implementation of OpenTelemetry {@link SpanExporter} that allows different tracing services to
 * export recorded data for sampled spans in their own format.
 */
public final class AzureMonitorExporter implements SpanExporter {
    private static final Pattern COMPONENT_PATTERN = Pattern
        .compile("io\\.opentelemetry\\.auto\\.([^0-9]*)(-[0-9.]*)?");

    private static final Set<String> SQL_DB_SYSTEMS;

    static {
        Set<String> dbSystems = new HashSet<>();
        dbSystems.add("db2");
        dbSystems.add("derby");
        dbSystems.add("mariadb");
        dbSystems.add("mssql");
        dbSystems.add("mysql");
        dbSystems.add("oracle");
        dbSystems.add("postgresql");
        dbSystems.add("sqlite");
        dbSystems.add("other_sql");
        dbSystems.add("hsqldb");
        dbSystems.add("h2");

        SQL_DB_SYSTEMS = Collections.unmodifiableSet(dbSystems);
    }

    private final MonitorExporterClient client;
    private final ClientLogger logger = new ClientLogger(AzureMonitorExporter.class);
    private final String instrumentationKey;
    private final String telemetryItemNamePrefix;

    /**
     * Creates an instance of exporter that is configured with given exporter client that sends telemetry events to
     * Application Insights resource identified by the instrumentation key.
     *
     * @param client The client used to send data to Azure Monitor.
     * @param instrumentationKey The instrumentation key of Application Insights resource.
     */
    AzureMonitorExporter(MonitorExporterClient client, String instrumentationKey) {
        this.client = client;
        this.instrumentationKey = instrumentationKey;
        String formattedInstrumentationKey = instrumentationKey.replaceAll("-", "");
        this.telemetryItemNamePrefix = "Microsoft.ApplicationInsights." + formattedInstrumentationKey + ".";
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public CompletableResultCode export(Collection<SpanData> spans) {

        try {
            List<TelemetryItem> telemetryItems = new ArrayList<>();
            for (SpanData span : spans) {
                logger.verbose("exporting span: {}", span);
                export(span, telemetryItems);
            }
            client.export(telemetryItems);
            return CompletableResultCode.ofSuccess();
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
            return CompletableResultCode.ofFailure();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public CompletableResultCode flush() {
        return CompletableResultCode.ofSuccess();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public CompletableResultCode shutdown() {
        return CompletableResultCode.ofSuccess();
    }

    private void export(SpanData span, List<TelemetryItem> telemetryItems) {
        Span.Kind kind = span.getKind();
        String instrumentationName = span.getInstrumentationLibraryInfo().getName();
        Matcher matcher = COMPONENT_PATTERN.matcher(instrumentationName);
        String stdComponent = matcher.matches() ? matcher.group(1) : null;
        if ("jms".equals(stdComponent) && !span.getParentSpanId().isValid() && kind == Span.Kind.CLIENT) {
            // no need to capture these, at least is consistent with prior behavior
            // these tend to be frameworks pulling messages which are then pushed to consumers
            // where we capture them
            return;
        }
        if (kind == Span.Kind.INTERNAL) {
            if (!span.getParentSpanId().isValid()) {
                // TODO (srnagar): revisit this decision
                // maybe user-generated telemetry?
                // otherwise this top-level span won't show up in Performance blade
                exportRequest(stdComponent, span, telemetryItems);
            } else if (span.getName().equals("EventHubs.message")) {
                // TODO (srnagar): eventhubs should use PRODUCER instead of INTERNAL
                exportRemoteDependency(stdComponent, span, false, telemetryItems);
            } else {
                exportRemoteDependency(stdComponent, span, true, telemetryItems);
            }
        } else if (kind == Span.Kind.CLIENT || kind == Span.Kind.PRODUCER) {
            exportRemoteDependency(stdComponent, span, false, telemetryItems);
        } else if (kind == Span.Kind.SERVER || kind == Span.Kind.CONSUMER) {
            exportRequest(stdComponent, span, telemetryItems);
        } else {
            throw logger.logExceptionAsError(new UnsupportedOperationException(kind.name()));
        }
    }


    private static List<TelemetryExceptionDetails> minimalParse(String errorStack) {
        TelemetryExceptionDetails details = new TelemetryExceptionDetails();
        String line = errorStack.split("\n")[0];
        int index = line.indexOf(": ");

        if (index != -1) {
            details.setTypeName(line.substring(0, index));
            details.setMessage(line.substring(index + 2));
        } else {
            details.setTypeName(line);
        }
        details.setStack(errorStack);
        return Arrays.asList(details);
    }

    private void exportRemoteDependency(String stdComponent, SpanData span, boolean inProc,
                                        List<TelemetryItem> telemetryItems) {
        TelemetryItem telemetryItem = new TelemetryItem();
        RemoteDependencyData remoteDependencyData = new RemoteDependencyData();
        MonitorBase monitorBase = new MonitorBase();

        telemetryItem.setTags(new HashMap<>());
        telemetryItem.setName(telemetryItemNamePrefix + "RemoteDependency");
        telemetryItem.setVersion(1);
        telemetryItem.setInstrumentationKey(instrumentationKey);
        telemetryItem.setData(monitorBase);

        remoteDependencyData.setProperties(new HashMap<>());
        remoteDependencyData.setVersion(2);
        monitorBase.setBaseType("RemoteDependencyData");
        monitorBase.setBaseData(remoteDependencyData);

        addLinks(remoteDependencyData.getProperties(), span.getLinks());
        remoteDependencyData.setName(span.getName());

        span.getInstrumentationLibraryInfo().getName();

        Map<String, AttributeValue> attributes = getAttributesCopy(span.getAttributes());

        if (inProc) {
            remoteDependencyData.setType("InProc");
        } else {
            if (attributes.containsKey("http.method")) {
                applyHttpRequestSpan(attributes, remoteDependencyData);
            } else if (attributes.containsKey(SemanticAttributes.DB_SYSTEM.key())) {
                applyDatabaseQuerySpan(attributes, remoteDependencyData, stdComponent);
            } else if (span.getName().equals("EventHubs.send")) {
                // TODO (srnagar): eventhubs should use CLIENT instead of PRODUCER
                // TODO (srnagar): eventhubs should add links to messages?
                remoteDependencyData.setType("Microsoft.EventHub");
                String peerAddress = removeAttributeString(attributes, "peer.address");
                String destination = removeAttributeString(attributes, "message_bus.destination");
                remoteDependencyData.setTarget(peerAddress + "/" + destination);
            } else if (span.getName().equals("EventHubs.message")) {
                // TODO (srnagar): eventhubs should populate peer.address and message_bus.destination
                String peerAddress = removeAttributeString(attributes, "peer.address");
                String destination = removeAttributeString(attributes, "message_bus.destination");
                if (peerAddress != null) {
                    remoteDependencyData.setTarget(peerAddress + "/" + destination);
                }
                remoteDependencyData.setType("Microsoft.EventHub");
            } else if ("kafka-clients".equals(stdComponent)) {
                remoteDependencyData.setType("Kafka");
                remoteDependencyData.setTarget(span.getName()); // destination queue name
            } else if ("jms".equals(stdComponent)) {
                remoteDependencyData.setType("JMS");
                remoteDependencyData.setTarget(span.getName()); // destination queue name
            }
        }

        remoteDependencyData.setId(span.getSpanId().toLowerBase16());
        telemetryItem.getTags().put(ContextTagKeys.AI_OPERATION_ID.toString(), span.getTraceId().toLowerBase16());

        SpanId parentSpanId = span.getParentSpanId();
        if (parentSpanId.isValid()) {
            telemetryItem.getTags().put(ContextTagKeys.AI_OPERATION_PARENT_ID.toString(), parentSpanId.toLowerBase16());
        }

        telemetryItem.setTime(getFormattedTime(span.getStartEpochNanos()));
        remoteDependencyData
            .setDuration(getFormattedDuration(Duration.ofNanos(span.getEndEpochNanos() - span.getStartEpochNanos())));

        remoteDependencyData.setSuccess(span.getStatus().isOk());
        String description = span.getStatus().getDescription();
        if (description != null) {
            remoteDependencyData.getProperties().put("statusDescription", description);
        }

        Double samplingPercentage = removeAiSamplingPercentage(attributes);

        // for now, only add extra attributes for custom telemetry
        if (stdComponent == null) {
            addExtraAttributes(remoteDependencyData.getProperties(), attributes);
        }
        telemetryItem.setSampleRate(samplingPercentage.floatValue());
        telemetryItems.add(telemetryItem);
        exportEvents(span, samplingPercentage, telemetryItems);
    }

    private void applyDatabaseQuerySpan(Map<String, AttributeValue> attributes, RemoteDependencyData rd,
                                        String component) {
        String type = removeAttributeString(attributes, SemanticAttributes.DB_SYSTEM.key());

        if (SQL_DB_SYSTEMS.contains(type)) {
            type = "SQL";
        }
        rd.setType(type);
        rd.setData(removeAttributeString(attributes, SemanticAttributes.DB_STATEMENT.key()));

        String dbUrl = removeAttributeString(attributes, SemanticAttributes.DB_CONNECTION_STRING.key());
        if (dbUrl == null) {
            // this is needed until all database instrumentation captures the required db.url
            rd.setTarget(type);
        } else {
            String dbInstance = removeAttributeString(attributes, SemanticAttributes.DB_NAME.key());
            if (dbInstance != null) {
                dbUrl += " | " + dbInstance;
            }
            if ("jdbc".equals(component)) {
                // TODO (srnagar): this is special case to match 2.x behavior
                //      because U/X strips off the beginning in E2E tx view
                rd.setTarget("jdbc:" + dbUrl);
            } else {
                rd.setTarget(dbUrl);
            }
        }
        // TODO (srnagar): put db.instance somewhere
    }

    private void applyHttpRequestSpan(Map<String, AttributeValue> attributes,
                                      RemoteDependencyData remoteDependencyData) {

        remoteDependencyData.setType("Http (tracked component)");

        String method = removeAttributeString(attributes, SemanticAttributes.HTTP_METHOD.key());
        String url = removeAttributeString(attributes, SemanticAttributes.HTTP_URL.key());

        AttributeValue httpStatusCode = attributes.remove(SemanticAttributes.HTTP_STATUS_CODE.key());
        if (httpStatusCode != null && httpStatusCode.getType() == AttributeValue.Type.LONG) {
            long statusCode = httpStatusCode.getLongValue();
            remoteDependencyData.setResultCode(Long.toString(statusCode));
        }

        if (url != null) {
            try {
                URI uriObject = new URI(url);
                String target = createTarget(uriObject);
                remoteDependencyData.setTarget(target);
                // TODO (srnagar): is this right, overwriting name to include the full path?
                String path = uriObject.getPath();
                if (CoreUtils.isNullOrEmpty(path)) {
                    remoteDependencyData.setName(method + " /");
                } else {
                    remoteDependencyData.setName(method + " " + path);
                }
            } catch (URISyntaxException e) {
                logger.error(e.getMessage());
            }
        }
    }

    private void exportRequest(String stdComponent, SpanData span, List<TelemetryItem> telemetryItems) {
        TelemetryItem telemetryItem = new TelemetryItem();
        RequestData requestData = new RequestData();
        MonitorBase monitorBase = new MonitorBase();

        telemetryItem.setTags(new HashMap<>());
        telemetryItem.setName(telemetryItemNamePrefix + "Request");
        telemetryItem.setVersion(1);
        telemetryItem.setInstrumentationKey(instrumentationKey);
        telemetryItem.setData(monitorBase);

        requestData.setProperties(new HashMap<>());
        requestData.setVersion(2);
        monitorBase.setBaseType("RequestData");
        monitorBase.setBaseData(requestData);

        Map<String, AttributeValue> attributes = getAttributesCopy(span.getAttributes());

        if ("kafka-clients".equals(stdComponent)) {
            requestData.setSource(span.getName()); // destination queue name
        } else if ("jms".equals(stdComponent)) {
            requestData.setSource(span.getName()); // destination queue name
        }
        addLinks(requestData.getProperties(), span.getLinks());
        AttributeValue httpStatusCode = attributes.remove(SemanticAttributes.HTTP_STATUS_CODE.key());

        if (isNonNullLong(httpStatusCode)) {
            requestData.setResponseCode(Long.toString(httpStatusCode.getLongValue()));
        }

        String httpUrl = removeAttributeString(attributes, SemanticAttributes.HTTP_URL.key());
        if (httpUrl != null) {
            requestData.setUrl(httpUrl);
        }

        String httpMethod = removeAttributeString(attributes, SemanticAttributes.HTTP_METHOD.key());
        String name = span.getName();
        if (httpMethod != null && name.startsWith("/")) {
            name = httpMethod + " " + name;
        }
        requestData.setName(name);
        telemetryItem.getTags().put(ContextTagKeys.AI_OPERATION_NAME.toString(), name);

        if (span.getName().equals("EventHubs.process")) {
            // TODO (srnagar): eventhubs should use CONSUMER instead of SERVER
            // (https://gist.github.com/lmolkova/e4215c0f44a49ef824983382762e6b92#opentelemetry-example-1)
            String peerAddress = removeAttributeString(attributes, "peer.address");
            String destination = removeAttributeString(attributes, "message_bus.destination");
            requestData.setSource(peerAddress + "/" + destination);
        }
        requestData.setId(span.getSpanId().toLowerBase16());
        telemetryItem.getTags().put(ContextTagKeys.AI_OPERATION_ID.toString(), span.getTraceId().toLowerBase16());

        String aiLegacyParentId = span.getTraceState().get("ai-legacy-parent-id");
        if (aiLegacyParentId != null) {
            // see behavior specified at https://github.com/microsoft/ApplicationInsights-Java/issues/1174
            telemetryItem.getTags().put(ContextTagKeys.AI_OPERATION_PARENT_ID.toString(), aiLegacyParentId);

            String aiLegacyOperationId = span.getTraceState().get("ai-legacy-operation-id");
            if (aiLegacyOperationId != null) {
                telemetryItem.getTags().putIfAbsent("ai_legacyRootID", aiLegacyOperationId);
            }
        } else {
            SpanId parentSpanId = span.getParentSpanId();
            if (parentSpanId.isValid()) {
                telemetryItem.getTags()
                    .put(ContextTagKeys.AI_OPERATION_PARENT_ID.toString(), parentSpanId.toLowerBase16());
            }
        }

        long startEpochNanos = span.getStartEpochNanos();
        telemetryItem.setTime(getFormattedTime(startEpochNanos));

        Duration duration = Duration.ofNanos(span.getEndEpochNanos() - startEpochNanos);
        requestData.setDuration(getFormattedDuration(duration));

        requestData.setSuccess(span.getStatus().isOk());
        String description = span.getStatus().getDescription();
        if (description != null) {
            requestData.getProperties().put("statusDescription", description);
        }

        Double samplingPercentage = removeAiSamplingPercentage(attributes);

        // for now, only add extra attributes for custom telemetry
        if (stdComponent == null) {
            addExtraAttributes(requestData.getProperties(), attributes);
        }

        telemetryItem.setSampleRate(samplingPercentage.floatValue());
        telemetryItems.add(telemetryItem);
        exportEvents(span, samplingPercentage, telemetryItems);
    }

    private void exportEvents(SpanData span, Double samplingPercentage, List<TelemetryItem> telemetryItems) {
        boolean foundException = false;
        for (SpanData.Event event : span.getEvents()) {

            TelemetryItem telemetryItem = new TelemetryItem();
            TelemetryEventData eventData = new TelemetryEventData();
            MonitorBase monitorBase = new MonitorBase();

            telemetryItem.setTags(new HashMap<>());
            telemetryItem.setName(telemetryItemNamePrefix + "Event");
            telemetryItem.setVersion(1);
            telemetryItem.setInstrumentationKey(instrumentationKey);
            telemetryItem.setData(monitorBase);

            eventData.setProperties(new HashMap<>());
            eventData.setVersion(2);
            monitorBase.setBaseType("EventData");
            monitorBase.setBaseData(eventData);

            eventData.setName(event.getName());

            String operationId = span.getTraceId().toLowerBase16();
            telemetryItem.getTags().put(ContextTagKeys.AI_OPERATION_ID.toString(), operationId);
            telemetryItem.getTags()
                .put(ContextTagKeys.AI_OPERATION_PARENT_ID.toString(), span.getParentSpanId().toLowerBase16());
            telemetryItem.setTime(getFormattedTime(event.getEpochNanos()));
            addExtraAttributes(eventData.getProperties(), event.getAttributes());

            if (event.getAttributes().get(SemanticAttributes.EXCEPTION_TYPE.key()) != null
                || event.getAttributes().get(SemanticAttributes.EXCEPTION_MESSAGE.key()) != null) {
                // TODO (srnagar): Remove this boolean after we can confirm that the exception duplicate
                //  is a bug from the opentelmetry-java-instrumentation
                if (!foundException) {
                    // TODO (srnagar): map OpenTelemetry exception to Application Insights exception better
                    AttributeValue stacktrace = event.getAttributes()
                        .get(SemanticAttributes.EXCEPTION_STACKTRACE.key());
                    if (stacktrace != null) {
                        trackException(stacktrace.getStringValue(), span, operationId,
                            span.getSpanId().toLowerBase16(), samplingPercentage, telemetryItems);
                    }
                }
                foundException = true;
            } else {
                telemetryItem.setSampleRate(samplingPercentage.floatValue());
                telemetryItems.add(telemetryItem);
            }
        }
    }

    private void trackException(String errorStack, SpanData span, String operationId,
                                String id, Double samplingPercentage, List<TelemetryItem> telemetryItems) {
        TelemetryItem telemetryItem = new TelemetryItem();
        TelemetryExceptionData exceptionData = new TelemetryExceptionData();
        MonitorBase monitorBase = new MonitorBase();

        telemetryItem.setTags(new HashMap<>());
        telemetryItem.setName(telemetryItemNamePrefix + "Exception");
        telemetryItem.setVersion(1);
        telemetryItem.setInstrumentationKey(instrumentationKey);
        telemetryItem.setData(monitorBase);

        exceptionData.setProperties(new HashMap<>());
        exceptionData.setVersion(2);
        monitorBase.setBaseType("ExceptionData");
        monitorBase.setBaseData(exceptionData);

        telemetryItem.getTags().put(ContextTagKeys.AI_OPERATION_ID.toString(), operationId);
        telemetryItem.getTags().put(ContextTagKeys.AI_OPERATION_PARENT_ID.toString(), id);
        telemetryItem.setTime(getFormattedTime(span.getEndEpochNanos()));
        telemetryItem.setSampleRate(samplingPercentage.floatValue());
        exceptionData.setExceptions(minimalParse(errorStack));
        telemetryItems.add(telemetryItem);
    }

    private static String getFormattedDuration(Duration duration) {
        return duration.toDays() + "." + duration.toHours() + ":" + duration.toMinutes() + ":" + duration.getSeconds()
            + "." + duration.toMillis();
    }

    private static String getFormattedTime(long epochNanos) {
        return Instant.ofEpochMilli(NANOSECONDS.toMillis(epochNanos))
            .atOffset(ZoneOffset.UTC)
            .format(DateTimeFormatter.ISO_DATE_TIME);
    }

    private boolean isNonNullLong(AttributeValue attributeValue) {
        return attributeValue != null && attributeValue.getType() == AttributeValue.Type.LONG;
    }

    private Map<String, AttributeValue> getAttributesCopy(ReadableAttributes attributes) {
        final Map<String, AttributeValue> copy = new HashMap<>();
        attributes.forEach(copy::put);
        return copy;
    }

    private static void addLinks(Map<String, String> properties, List<SpanData.Link> links) {
        if (links.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        boolean first = true;
        for (SpanData.Link link : links) {
            if (!first) {
                sb.append(",");
            }
            sb.append("{\"operation_Id\":\"");
            sb.append(link.getContext().getTraceId().toLowerBase16());
            sb.append("\",\"id\":\"");
            sb.append(link.getContext().getSpanId().toLowerBase16());
            sb.append("\"}");
            first = false;
        }
        sb.append("]");
        properties.put("_MS.links", sb.toString());
    }

    private static String removeAttributeString(Map<String, AttributeValue> attributes, String attributeName) {
        AttributeValue attributeValue = attributes.remove(attributeName);
        if (attributeValue == null) {
            return null;
        } else if (attributeValue.getType() == AttributeValue.Type.STRING) {
            return attributeValue.getStringValue();
        } else {
            // TODO (srnagar): log debug warning
            return null;
        }
    }

    private static Double removeAttributeDouble(Map<String, AttributeValue> attributes, String attributeName) {
        AttributeValue attributeValue = attributes.remove(attributeName);
        if (attributeValue == null) {
            return null;
        } else if (attributeValue.getType() == AttributeValue.Type.DOUBLE) {
            return attributeValue.getDoubleValue();
        } else {
            // TODO (srnagar): log debug warning
            return null;
        }
    }

    private static String createTarget(URI uriObject) {
        String target = uriObject.getHost();
        if (uriObject.getPort() != 80 && uriObject.getPort() != 443 && uriObject.getPort() != -1) {
            target += ":" + uriObject.getPort();
        }
        return target;
    }

    private static String getStringValue(AttributeValue value) {
        switch (value.getType()) {
            case STRING:
                return value.getStringValue();
            case BOOLEAN:
                return Boolean.toString(value.getBooleanValue());
            case LONG:
                return Long.toString(value.getLongValue());
            case DOUBLE:
                return Double.toString(value.getDoubleValue());
            case STRING_ARRAY:
                return join(value.getStringArrayValue());
            case BOOLEAN_ARRAY:
                return join(value.getBooleanArrayValue());
            case LONG_ARRAY:
                return join(value.getLongArrayValue());
            case DOUBLE_ARRAY:
                return join(value.getDoubleArrayValue());
            default:
                return null;
        }
    }

    private static <T> String join(List<T> values) {
        StringBuilder sb = new StringBuilder();
        if (CoreUtils.isNullOrEmpty(values)) {
            return sb.toString();
        }

        for (int i = 0; i < values.size() - 1; i++) {
            sb.append(values.get(i));
            sb.append(", ");
        }
        sb.append(values.get(values.size() - 1));
        return sb.toString();
    }

    private static Double removeAiSamplingPercentage(Map<String, AttributeValue> attributes) {
        return removeAttributeDouble(attributes, "ai.sampling.percentage");
    }

    private static void addExtraAttributes(Map<String, String> properties, Map<String, AttributeValue> attributes) {
        for (Map.Entry<String, AttributeValue> entry : attributes.entrySet()) {
            String value = getStringValue(entry.getValue());
            if (value != null) {
                properties.put(entry.getKey(), value);
            }
        }
    }

    private static void addExtraAttributes(final Map<String, String> properties, Attributes attributes) {
        attributes.forEach(new ReadableKeyValuePairs.KeyValueConsumer<AttributeValue>() {
            @Override
            public void consume(String key, AttributeValue value) {
                String val = getStringValue(value);
                if (val != null) {
                    properties.put(key, val);
                }
            }
        });
    }
}