ServiceBusMessageConverter.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.integration.servicebus.converter;

import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.MessageBody;
import com.azure.spring.integration.core.AzureHeaders;
import com.azure.spring.integration.core.converter.AbstractAzureMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.InvalidMimeTypeException;
import org.springframework.util.MimeType;
import org.springframework.util.StringUtils;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * A converter to turn a {@link org.springframework.messaging.Message} to {@link IMessage}
 * and vice versa.
 *
 * @author Warren Zhu
 */
public class ServiceBusMessageConverter extends AbstractAzureMessageConverter<IMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceBusMessageConverter.class);

    @Override
    protected byte[] getPayload(IMessage azureMessage) {
        MessageBody messageBody = azureMessage.getMessageBody();
        if (messageBody == null) {
            return new byte[0];
        }

        switch (messageBody.getBodyType()) {
            case BINARY:
                return messageBody.getBinaryData().stream().findFirst().orElse(null);
            case VALUE:
                return String.valueOf(messageBody.getValueData()).getBytes(StandardCharsets.UTF_8);
            case SEQUENCE:
                return toPayload(messageBody.getSequenceData().stream().findFirst().orElse(null));
            default:
                return new byte[0];
        }
    }

    @Override
    protected IMessage fromString(String payload) {
        return new Message(payload);
    }

    @Override
    protected IMessage fromByte(byte[] payload) {
        return new Message(payload);
    }

    @Override
    protected void setCustomHeaders(MessageHeaders headers, IMessage serviceBusMessage) {

        if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
            Object contentType = headers.get(MessageHeaders.CONTENT_TYPE);

            if (contentType instanceof MimeType) {
                serviceBusMessage.setContentType(((MimeType) contentType).toString());
            } else {
                serviceBusMessage.setContentType((String) contentType);
            }
        }

        if (headers.containsKey(MessageHeaders.ID)) {
            serviceBusMessage.setMessageId(String.valueOf(headers.get(MessageHeaders.ID, UUID.class)));
        }

        if (headers.containsKey(MessageHeaders.REPLY_CHANNEL)) {
            serviceBusMessage.setReplyTo(headers.get(MessageHeaders.REPLY_CHANNEL, String.class));
        }

        if (headers.containsKey(AzureHeaders.SCHEDULED_ENQUEUE_MESSAGE)) {
            Integer integerValue = headers.get(AzureHeaders.SCHEDULED_ENQUEUE_MESSAGE, Integer.class);
            if (null != integerValue) {
                serviceBusMessage.setScheduledEnqueueTimeUtc(Instant.now().plus(Duration.ofMillis(integerValue)));
            }
        }

        headers.forEach((key, value) -> serviceBusMessage.getProperties().put(key, value.toString()));
    }

    @Override
    protected Map<String, Object> buildCustomHeaders(IMessage serviceBusMessage) {
        Map<String, Object> headers = new HashMap<>();

        if (StringUtils.hasText(serviceBusMessage.getMessageId())) {
            headers.put(AzureHeaders.RAW_ID, serviceBusMessage.getMessageId());
        }

        if (StringUtils.hasText(serviceBusMessage.getContentType())) {
            String contentType = serviceBusMessage.getContentType();
            try {
                MimeType mimeType = MimeType.valueOf(contentType);
                headers.put(MessageHeaders.CONTENT_TYPE, mimeType.toString());
            } catch (InvalidMimeTypeException e) {
                LOG.warn("Invalid mimeType '{}' from service bus message.", contentType);
            }
        }

        if (StringUtils.hasText(serviceBusMessage.getReplyTo())) {
            headers.put(MessageHeaders.REPLY_CHANNEL, serviceBusMessage.getReplyTo());
        }

        headers.putAll(serviceBusMessage.getProperties());

        return Collections.unmodifiableMap(headers);
    }
}