MessageUtils.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.models.AmqpAddress;
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.AmqpMessageBody;
import com.azure.core.amqp.models.AmqpMessageHeader;
import com.azure.core.amqp.models.AmqpMessageId;
import com.azure.core.amqp.models.AmqpMessageProperties;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * Converts {@link AmqpAnnotatedMessage messages} to and from proton-j messages.
 */
final class MessageUtils {
    private static final ClientLogger LOGGER = new ClientLogger(MessageUtils.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

    /**
     * Converts an {@link AmqpAnnotatedMessage} to a proton-j message.
     *
     * @param message The message to convert.
     *
     * @return The corresponding proton-j message.
     *
     * @throws NullPointerException if {@code message} is null.
     */
    static Message toProtonJMessage(AmqpAnnotatedMessage message) {
        Objects.requireNonNull(message, "'message' to serialize cannot be null.");

        final Message protonJMessage = Proton.message();

        //TODO (conniey): support AMQP sequence and AMQP value.
        final AmqpMessageBody body = message.getBody();
        switch (body.getBodyType()) {
            case DATA:
                protonJMessage.setBody(new Data(new Binary(body.getFirstData())));
                break;
            case VALUE:
            case SEQUENCE:
            default:
                throw LOGGER.logExceptionAsError(new UnsupportedOperationException(
                    "bodyType [" + body.getBodyType() + "] is not supported yet."));
        }

        // Setting message properties.
        final AmqpMessageProperties properties = message.getProperties();
        if (properties.getMessageId() != null) {
            protonJMessage.setMessageId(properties.getMessageId().toString());
        }

        if (properties.getContentType() != null) {
            protonJMessage.setContentType(properties.getContentType());
        }

        if (properties.getCorrelationId() != null) {
            protonJMessage.setCorrelationId(properties.getCorrelationId().toString());
        }

        if (properties.getSubject() != null) {
            protonJMessage.setSubject(properties.getSubject());
        }

        final AmqpAddress replyTo = properties.getReplyTo();
        if (replyTo != null) {
            protonJMessage.setReplyTo(replyTo.toString());
        }

        if (properties.getReplyToGroupId() != null) {
            protonJMessage.setReplyToGroupId(properties.getReplyToGroupId());
        }

        if (properties.getGroupId() != null) {
            protonJMessage.setGroupId(properties.getGroupId());
        }

        if (properties.getContentEncoding() != null) {
            protonJMessage.setContentEncoding(properties.getContentEncoding());
        }

        if (properties.getGroupSequence() != null) {
            protonJMessage.setGroupSequence(properties.getGroupSequence());
        }

        if (properties.getTo() != null) {
            if (protonJMessage.getProperties() == null) {
                protonJMessage.setProperties(new Properties());
            }

            protonJMessage.getProperties().setTo(properties.getTo().toString());
        }

        // The default is byte[0] when getting a user id that has not been set.
        if (properties.getUserId() != null && properties.getUserId().length > 0) {
            if (protonJMessage.getProperties() == null) {
                protonJMessage.setProperties(new Properties());
            }

            protonJMessage.getProperties().setUserId(new Binary(properties.getUserId()));
        }

        if (properties.getAbsoluteExpiryTime() != null) {
            if (protonJMessage.getProperties() == null) {
                protonJMessage.setProperties(new Properties());
            }

            protonJMessage.getProperties().setAbsoluteExpiryTime(
                Date.from(properties.getAbsoluteExpiryTime().toInstant()));
        }

        if (properties.getCreationTime() != null) {
            if (protonJMessage.getProperties() == null) {
                protonJMessage.setProperties(new Properties());
            }

            protonJMessage.getProperties().setCreationTime(Date.from(properties.getCreationTime().toInstant()));
        }

        // Set header
        final AmqpMessageHeader header = message.getHeader();
        if (header.getTimeToLive() != null) {
            protonJMessage.setTtl(header.getTimeToLive().toMillis());
        }
        if (header.getDeliveryCount() != null) {
            protonJMessage.setDeliveryCount(header.getDeliveryCount());
        }
        if (header.getPriority() != null) {
            protonJMessage.setPriority(header.getPriority());
        }
        if (header.isDurable() != null) {
            protonJMessage.setDurable(header.isDurable());
        }
        if (header.isFirstAcquirer() != null) {
            protonJMessage.setFirstAcquirer(header.isFirstAcquirer());
        }
        if (header.getTimeToLive() != null) {
            protonJMessage.setTtl(header.getTimeToLive().toMillis());
        }

        // Set footer
        if (!message.getFooter().isEmpty()) {
            protonJMessage.setFooter(new Footer(message.getFooter()));
        }

        // Set message annotations.
        final Map<Symbol, Object> messageAnnotations = convert(message.getMessageAnnotations());
        if (!messageAnnotations.isEmpty()) {
            protonJMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
        }

        // Set Delivery Annotations.
        final Map<Symbol, Object> deliveryAnnotations = convert(message.getDeliveryAnnotations());
        if (!deliveryAnnotations.isEmpty()) {
            protonJMessage.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotations));
        }

        // Set application properties
        if (!message.getApplicationProperties().isEmpty()) {
            protonJMessage.setApplicationProperties(new ApplicationProperties(message.getApplicationProperties()));
        }

        return protonJMessage;
    }

    /**
     * Converts a proton-j message to {@link AmqpAnnotatedMessage}.
     *
     * @param message The message to convert.
     *
     * @return The corresponding {@link AmqpAnnotatedMessage message}.
     *
     * @throws NullPointerException if {@code message} is null.
     */
    static AmqpAnnotatedMessage toAmqpAnnotatedMessage(Message message) {
        Objects.requireNonNull(message, "'message' cannot be null");

        final byte[] bytes;
        final Section body = message.getBody();
        if (body != null) {
            //TODO (conniey): Support other AMQP types like AmqpValue and AmqpSequence.
            if (body instanceof Data) {
                final Binary messageData = ((Data) body).getValue();
                bytes = messageData.getArray();
            } else {
                LOGGER.warning("Message not of type Data. Actual: {}",
                    body.getType());
                bytes = EMPTY_BYTE_ARRAY;
            }
        } else {
            LOGGER.warning("Message does not have a body.");
            bytes = EMPTY_BYTE_ARRAY;
        }

        final AmqpAnnotatedMessage response = new AmqpAnnotatedMessage(AmqpMessageBody.fromData(bytes));

        // Application properties
        final ApplicationProperties applicationProperties = message.getApplicationProperties();
        if (applicationProperties != null) {
            final Map<String, Object> propertiesValue = applicationProperties.getValue();
            response.getApplicationProperties().putAll(propertiesValue);
        }

        // Header
        final AmqpMessageHeader responseHeader = response.getHeader();
        responseHeader.setTimeToLive(Duration.ofMillis(message.getTtl()));
        responseHeader.setDeliveryCount(message.getDeliveryCount());
        responseHeader.setPriority(message.getPriority());

        if (message.getHeader() != null) {
            responseHeader.setDurable(message.getHeader().getDurable());
            responseHeader.setFirstAcquirer(message.getHeader().getFirstAcquirer());
        }

        // Footer
        final Footer footer = message.getFooter();
        if (footer != null && footer.getValue() != null) {
            @SuppressWarnings("unchecked") final Map<Symbol, Object> footerValue = footer.getValue();

            setValues(footerValue, response.getFooter());
        }

        // DeliveryAnnotations
        final DeliveryAnnotations deliveryAnnotations = message.getDeliveryAnnotations();
        if (deliveryAnnotations != null) {
            setValues(deliveryAnnotations.getValue(), response.getDeliveryAnnotations());
        }

        // Message Annotations
        final MessageAnnotations messageAnnotations = message.getMessageAnnotations();
        if (messageAnnotations != null) {
            setValues(messageAnnotations.getValue(), response.getMessageAnnotations());
        }

        // AMQP Properties
        final AmqpMessageProperties responseProperties = response.getProperties();
        final Properties protonJProperties = message.getProperties();

        if (protonJProperties != null) {
            if (protonJProperties.getAbsoluteExpiryTime() != null) {
                responseProperties.setAbsoluteExpiryTime(protonJProperties.getAbsoluteExpiryTime().toInstant()
                    .atOffset(ZoneOffset.UTC));
            }

            if (protonJProperties.getContentType() != null) {
                responseProperties.setContentType(protonJProperties.getContentType().toString());
            }

            if (protonJProperties.getContentEncoding() != null) {
                responseProperties.setContentEncoding(protonJProperties.getContentEncoding().toString());
            }

            final Object correlationId = message.getCorrelationId();
            if (correlationId != null) {
                responseProperties.setCorrelationId(new AmqpMessageId(correlationId.toString()));
            }

            if (protonJProperties.getCreationTime() != null) {
                responseProperties.setCreationTime(protonJProperties.getCreationTime().toInstant()
                    .atOffset(ZoneOffset.UTC));
            }

            if (protonJProperties.getGroupId() != null) {
                responseProperties.setGroupId(protonJProperties.getGroupId());
            }

            if (protonJProperties.getGroupSequence() != null) {
                responseProperties.setGroupSequence(protonJProperties.getGroupSequence().longValue());
            }

            final Object messageId = message.getMessageId();
            if (messageId != null) {
                responseProperties.setMessageId(new AmqpMessageId(messageId.toString()));
            }

            final String replyTo = protonJProperties.getReplyTo();
            if (replyTo != null) {
                responseProperties.setReplyTo(new AmqpAddress(replyTo));
            }

            if (protonJProperties.getReplyToGroupId() != null) {
                responseProperties.setReplyToGroupId(protonJProperties.getReplyToGroupId());
            }

            if (protonJProperties.getSubject() != null) {
                responseProperties.setSubject(protonJProperties.getSubject());
            }

            final String to = protonJProperties.getTo();
            if (to != null) {
                responseProperties.setTo(new AmqpAddress(to));
            }

            if (protonJProperties.getUserId() != null) {
                responseProperties.setUserId(protonJProperties.getUserId().getArray());
            }
        }

        return response;
    }

    /**
     * Converts a map from its string keys to use {@link Symbol}. In addition, if a type is of instant.
     * Also converts the Date.
     *
     * @param sourceMap Source map.
     *
     * @return A map with corresponding keys as symbols.
     */
    static Map<Symbol, Object> convert(Map<String, Object> sourceMap) {
        if (sourceMap == null) {
            return null;
        }

        return sourceMap.entrySet().stream()
            .collect(HashMap::new,
                (existing, entry) -> {
                    if (entry.getValue() instanceof Instant) {
                        final long epochMilli = ((Instant) entry.getValue()).toEpochMilli();
                        existing.put(Symbol.valueOf(entry.getKey()), new Date(epochMilli));
                    } else {
                        existing.put(Symbol.valueOf(entry.getKey()), entry.getValue());
                    }
                },
                (HashMap::putAll));
    }

    private static void setValues(Map<Symbol, Object> sourceMap, Map<String, Object> targetMap) {
        if (sourceMap == null) {
            return;
        }

        for (Map.Entry<Symbol, Object> entry : sourceMap.entrySet()) {
            targetMap.put(entry.getKey().toString(), entry.getValue());
        }
    }

    /**
     * Private constructor.
     */
    private MessageUtils() {
    }
}