ServiceBusMessageSerializer.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;
import static com.azure.core.amqp.AmqpMessageConstant.SCHEDULED_ENQUEUE_UTC_TIME_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseUtils;
import com.azure.core.amqp.models.AmqpAddress;
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.AmqpMessageHeader;
import com.azure.core.amqp.models.AmqpMessageId;
import com.azure.core.amqp.models.AmqpMessageProperties;
import com.azure.core.util.BinaryData;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.ManagementConstants;
import com.azure.messaging.servicebus.implementation.MessageWithLockToken;
import com.azure.messaging.servicebus.implementation.Messages;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Decimal128;
import org.apache.qpid.proton.amqp.Decimal32;
import org.apache.qpid.proton.amqp.Decimal64;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.UnsignedShort;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.message.Message;
import java.lang.reflect.Array;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Deserializes and serializes messages to and from Azure Service Bus.
*/
class ServiceBusMessageSerializer implements MessageSerializer {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private final ClientLogger logger = new ClientLogger(ServiceBusMessageSerializer.class);
/**
* Gets the serialized size of the AMQP message.
*/
@Override
public int getSize(Message amqpMessage) {
if (amqpMessage == null) {
return 0;
}
int payloadSize = getPayloadSize(amqpMessage);
final MessageAnnotations messageAnnotations = amqpMessage.getMessageAnnotations();
final ApplicationProperties applicationProperties = amqpMessage.getApplicationProperties();
int annotationsSize = 0;
int applicationPropertiesSize = 0;
if (messageAnnotations != null) {
final Map<Symbol, Object> map = messageAnnotations.getValue();
for (Map.Entry<Symbol, Object> entry : map.entrySet()) {
final int size = sizeof(entry.getKey()) + sizeof(entry.getValue());
annotationsSize += size;
}
}
if (applicationProperties != null) {
final Map<String, Object> map = applicationProperties.getValue();
for (Map.Entry<String, Object> entry : map.entrySet()) {
final int size = sizeof(entry.getKey()) + sizeof(entry.getValue());
applicationPropertiesSize += size;
}
}
return annotationsSize + applicationPropertiesSize + payloadSize;
}
/**
* Creates the AMQP message represented by this {@code object}. Currently, only supports serializing {@link
* ServiceBusMessage}.
*
* @param object Concrete object to deserialize.
*
* @return A new AMQP message for this {@code object}.
*
* @throws IllegalArgumentException if {@code object} is not an instance of {@link ServiceBusMessage}.
*/
@Override
public <T> Message serialize(T object) {
Objects.requireNonNull(object, "'object' to serialize cannot be null.");
if (!(object instanceof ServiceBusMessage)) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"Cannot serialize object that is not ServiceBusMessage. Clazz: " + object.getClass()));
}
final ServiceBusMessage brokeredMessage = (ServiceBusMessage) object;
final Message amqpMessage = Proton.message();
final byte[] body = brokeredMessage.getBody().toBytes();
//TODO (conniey): support AMQP sequence and AMQP value.
amqpMessage.setBody(new Data(new Binary(body)));
if (brokeredMessage.getApplicationProperties() != null) {
amqpMessage.setApplicationProperties(new ApplicationProperties(brokeredMessage.getApplicationProperties()));
}
if (brokeredMessage.getTimeToLive() != null) {
amqpMessage.setTtl(brokeredMessage.getTimeToLive().toMillis());
}
if (amqpMessage.getProperties() == null) {
amqpMessage.setProperties(new Properties());
}
amqpMessage.setMessageId(brokeredMessage.getMessageId());
amqpMessage.setContentType(brokeredMessage.getContentType());
amqpMessage.setCorrelationId(brokeredMessage.getCorrelationId());
amqpMessage.setSubject(brokeredMessage.getSubject());
amqpMessage.setReplyTo(brokeredMessage.getReplyTo());
amqpMessage.setReplyToGroupId(brokeredMessage.getReplyToSessionId());
amqpMessage.setGroupId(brokeredMessage.getSessionId());
final AmqpMessageProperties brokeredProperties = brokeredMessage.getRawAmqpMessage().getProperties();
amqpMessage.setContentEncoding(brokeredProperties.getContentEncoding());
if (brokeredProperties.getGroupSequence() != null) {
amqpMessage.setGroupSequence(brokeredProperties.getGroupSequence());
}
amqpMessage.getProperties().setTo(brokeredMessage.getTo());
amqpMessage.getProperties().setUserId(new Binary(brokeredProperties.getUserId()));
if (brokeredProperties.getAbsoluteExpiryTime() != null) {
amqpMessage.getProperties().setAbsoluteExpiryTime(Date.from(brokeredProperties.getAbsoluteExpiryTime()
.toInstant()));
}
if (brokeredProperties.getCreationTime() != null) {
amqpMessage.getProperties().setCreationTime(Date.from(brokeredProperties.getCreationTime().toInstant()));
}
//set footer
amqpMessage.setFooter(new Footer(brokeredMessage.getRawAmqpMessage().getFooter()));
//set header
AmqpMessageHeader header = brokeredMessage.getRawAmqpMessage().getHeader();
if (header.getDeliveryCount() != null) {
amqpMessage.setDeliveryCount(header.getDeliveryCount());
}
if (header.getPriority() != null) {
amqpMessage.setPriority(header.getPriority());
}
if (header.isDurable() != null) {
amqpMessage.setDurable(header.isDurable());
}
if (header.isFirstAcquirer() != null) {
amqpMessage.setFirstAcquirer(header.isFirstAcquirer());
}
if (header.getTimeToLive() != null) {
amqpMessage.setTtl(header.getTimeToLive().toMillis());
}
final Map<Symbol, Object> messageAnnotationsMap = new HashMap<>();
if (brokeredMessage.getScheduledEnqueueTime() != null) {
messageAnnotationsMap.put(Symbol.valueOf(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue()),
Date.from(brokeredMessage.getScheduledEnqueueTime().toInstant()));
}
final String partitionKey = brokeredMessage.getPartitionKey();
if (partitionKey != null && !partitionKey.isEmpty()) {
messageAnnotationsMap.put(Symbol.valueOf(PARTITION_KEY_ANNOTATION_NAME.getValue()),
brokeredMessage.getPartitionKey());
}
amqpMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
// Set Delivery Annotations.
final Map<Symbol, Object> deliveryAnnotationsMap = new HashMap<>();
final Map<String, Object> deliveryAnnotations = brokeredMessage.getRawAmqpMessage()
.getDeliveryAnnotations();
for (Map.Entry<String, Object> deliveryEntry : deliveryAnnotations.entrySet()) {
deliveryAnnotationsMap.put(Symbol.valueOf(deliveryEntry.getKey()), deliveryEntry.getValue());
}
amqpMessage.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
return amqpMessage;
}
@SuppressWarnings("unchecked")
@Override
public <T> T deserialize(Message message, Class<T> clazz) {
Objects.requireNonNull(message, "'message' cannot be null.");
Objects.requireNonNull(clazz, "'clazz' cannot be null.");
if (clazz == ServiceBusReceivedMessage.class) {
return (T) deserializeMessage(message);
} else {
throw logger.logExceptionAsError(new IllegalArgumentException(
String.format(Messages.CLASS_NOT_A_SUPPORTED_TYPE, clazz)));
}
}
@SuppressWarnings("unchecked")
@Override
public <T> List<T> deserializeList(Message message, Class<T> clazz) {
if (clazz == ServiceBusReceivedMessage.class) {
return (List<T>) deserializeListOfMessages(message);
} else if (clazz == OffsetDateTime.class) {
return (List<T>) deserializeListOfOffsetDateTime(message);
} else if (clazz == OffsetDateTime.class) {
return (List<T>) deserializeListOfOffsetDateTime(message);
} else if (clazz == Long.class) {
return (List<T>) deserializeListOfLong(message);
} else {
throw logger.logExceptionAsError(new IllegalArgumentException(
String.format(Messages.CLASS_NOT_A_SUPPORTED_TYPE, clazz)));
}
}
private List<Long> deserializeListOfLong(Message amqpMessage) {
if (amqpMessage.getBody() instanceof AmqpValue) {
AmqpValue amqpValue = ((AmqpValue) amqpMessage.getBody());
if (amqpValue.getValue() instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> responseBody = (Map<String, Object>) amqpValue.getValue();
Object expirationListObj = responseBody.get(ManagementConstants.SEQUENCE_NUMBERS);
if (expirationListObj instanceof long[]) {
return Arrays.stream((long[]) expirationListObj)
.boxed()
.collect(Collectors.toList());
}
}
}
return Collections.emptyList();
}
private List<OffsetDateTime> deserializeListOfOffsetDateTime(Message amqpMessage) {
if (amqpMessage.getBody() instanceof AmqpValue) {
AmqpValue amqpValue = ((AmqpValue) amqpMessage.getBody());
if (amqpValue.getValue() instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> responseBody = (Map<String, Object>) amqpValue.getValue();
Object expirationListObj = responseBody.get(ManagementConstants.EXPIRATIONS);
if (expirationListObj instanceof Date[]) {
return Arrays.stream((Date[]) expirationListObj)
.map(date -> date.toInstant().atOffset(ZoneOffset.UTC))
.collect(Collectors.toList());
}
}
}
return Collections.emptyList();
}
@SuppressWarnings("rawtypes")
private List<ServiceBusReceivedMessage> deserializeListOfMessages(Message amqpMessage) {
final List<ServiceBusReceivedMessage> messageList = new ArrayList<>();
final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(amqpMessage);
if (statusCode != AmqpResponseCode.OK) {
logger.warning("AMQP response did not contain OK status code. Actual: {}", statusCode);
return Collections.emptyList();
}
final Object responseBodyMap = ((AmqpValue) amqpMessage.getBody()).getValue();
if (responseBodyMap == null) {
logger.warning("AMQP response did not contain a body.");
return Collections.emptyList();
} else if (!(responseBodyMap instanceof Map)) {
logger.warning("AMQP response body is not correct instance. Expected: {}. Actual: {}",
Map.class, responseBodyMap.getClass());
return Collections.emptyList();
}
final Object messages = ((Map) responseBodyMap).get(ManagementConstants.MESSAGES);
if (messages == null) {
logger.warning("Response body did not contain key: {}", ManagementConstants.MESSAGES);
return Collections.emptyList();
} else if (!(messages instanceof Iterable)) {
logger.warning("Response body contents is not the correct type. Expected: {}. Actual: {}",
Iterable.class, messages.getClass());
return Collections.emptyList();
}
for (Object message : (Iterable) messages) {
if (!(message instanceof Map)) {
logger.warning("Message inside iterable of message is not correct type. Expected: {}. Actual: {}",
Map.class, message.getClass());
continue;
}
final Message responseMessage = Message.Factory.create();
final Binary messagePayLoad = (Binary) ((Map) message).get(ManagementConstants.MESSAGE);
responseMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(),
messagePayLoad.getLength());
final ServiceBusReceivedMessage receivedMessage = deserializeMessage(responseMessage);
// if amqp message have lockToken
if (((Map) message).containsKey(ManagementConstants.LOCK_TOKEN_KEY)) {
receivedMessage.setLockToken((UUID) ((Map) message).get(ManagementConstants.LOCK_TOKEN_KEY));
}
messageList.add(receivedMessage);
}
return messageList;
}
private ServiceBusReceivedMessage deserializeMessage(Message amqpMessage) {
final byte[] bytes;
final Section body = amqpMessage.getBody();
if (body != null) {
//TODO (conniey): Support other AMQP types like AmqpValue and AmqpSequence.
if (body instanceof Data) {
final Binary messageData = ((Data) body).getValue();
bytes = messageData.getArray();
} else {
logger.warning(String.format(Messages.MESSAGE_NOT_OF_TYPE, body.getType()));
bytes = EMPTY_BYTE_ARRAY;
}
} else {
logger.warning(String.format(Messages.MESSAGE_NOT_OF_TYPE, "null"));
bytes = EMPTY_BYTE_ARRAY;
}
final ServiceBusReceivedMessage brokeredMessage = new ServiceBusReceivedMessage(BinaryData.fromBytes(bytes));
AmqpAnnotatedMessage brokeredAmqpAnnotatedMessage = brokeredMessage.getRawAmqpMessage();
// Application properties
ApplicationProperties applicationProperties = amqpMessage.getApplicationProperties();
if (applicationProperties != null) {
final Map<String, Object> propertiesValue = applicationProperties.getValue();
brokeredAmqpAnnotatedMessage.getApplicationProperties().putAll(propertiesValue);
}
// Header
final AmqpMessageHeader brokeredHeader = brokeredAmqpAnnotatedMessage.getHeader();
brokeredHeader.setTimeToLive(Duration.ofMillis(amqpMessage.getTtl()));
brokeredHeader.setDeliveryCount(amqpMessage.getDeliveryCount());
brokeredHeader.setDurable(amqpMessage.getHeader().getDurable());
brokeredHeader.setFirstAcquirer(amqpMessage.getHeader().getFirstAcquirer());
brokeredHeader.setPriority(amqpMessage.getPriority());
// Footer
final Footer footer = amqpMessage.getFooter();
if (footer != null && footer.getValue() != null) {
@SuppressWarnings("unchecked") final Map<Symbol, Object> footerValue = footer.getValue();
setValues(footerValue, brokeredAmqpAnnotatedMessage.getFooter());
}
// Properties
final AmqpMessageProperties brokeredProperties = brokeredAmqpAnnotatedMessage.getProperties();
brokeredProperties.setReplyToGroupId(amqpMessage.getReplyToGroupId());
final String replyTo = amqpMessage.getReplyTo();
if (replyTo != null) {
brokeredProperties.setReplyTo(new AmqpAddress(amqpMessage.getReplyTo()));
}
final Object messageId = amqpMessage.getMessageId();
if (messageId != null) {
brokeredProperties.setMessageId(new AmqpMessageId(messageId.toString()));
}
brokeredProperties.setContentType(amqpMessage.getContentType());
final Object correlationId = amqpMessage.getCorrelationId();
if (correlationId != null) {
brokeredProperties.setCorrelationId(new AmqpMessageId(correlationId.toString()));
}
final Properties amqpProperties = amqpMessage.getProperties();
if (amqpProperties != null) {
final String to = amqpProperties.getTo();
if (to != null) {
brokeredProperties.setTo(new AmqpAddress(amqpProperties.getTo()));
}
if (amqpProperties.getAbsoluteExpiryTime() != null) {
brokeredProperties.setAbsoluteExpiryTime(amqpProperties.getAbsoluteExpiryTime().toInstant()
.atOffset(ZoneOffset.UTC));
}
if (amqpProperties.getCreationTime() != null) {
brokeredProperties.setCreationTime(amqpProperties.getCreationTime().toInstant()
.atOffset(ZoneOffset.UTC));
}
}
brokeredProperties.setSubject(amqpMessage.getSubject());
brokeredProperties.setGroupId(amqpMessage.getGroupId());
brokeredProperties.setContentEncoding(amqpMessage.getContentEncoding());
brokeredProperties.setGroupSequence(amqpMessage.getGroupSequence());
brokeredProperties.setUserId(amqpMessage.getUserId());
// DeliveryAnnotations
final DeliveryAnnotations deliveryAnnotations = amqpMessage.getDeliveryAnnotations();
if (deliveryAnnotations != null) {
setValues(deliveryAnnotations.getValue(), brokeredAmqpAnnotatedMessage.getDeliveryAnnotations());
}
// Message Annotations
final MessageAnnotations messageAnnotations = amqpMessage.getMessageAnnotations();
if (messageAnnotations != null) {
setValues(messageAnnotations.getValue(), brokeredAmqpAnnotatedMessage.getMessageAnnotations());
}
if (amqpMessage instanceof MessageWithLockToken) {
brokeredMessage.setLockToken(((MessageWithLockToken) amqpMessage).getLockToken());
}
return brokeredMessage;
}
private static int getPayloadSize(Message msg) {
if (msg == null || msg.getBody() == null) {
return 0;
}
final Section bodySection = msg.getBody();
if (bodySection instanceof AmqpValue) {
return sizeof(((AmqpValue) bodySection).getValue());
} else if (bodySection instanceof AmqpSequence) {
return sizeof(((AmqpSequence) bodySection).getValue());
} else if (bodySection instanceof Data) {
final Data payloadSection = (Data) bodySection;
final Binary payloadBytes = payloadSection.getValue();
return sizeof(payloadBytes);
} else {
return 0;
}
}
private void setValues(Map<Symbol, Object> sourceMap, Map<String, Object> targetMap) {
if (sourceMap != null) {
for (Map.Entry<Symbol, Object> entry : sourceMap.entrySet()) {
targetMap.put(entry.getKey().toString(), entry.getValue());
}
}
}
@SuppressWarnings("rawtypes")
private static int sizeof(Object obj) {
if (obj == null) {
return 0;
}
if (obj instanceof String) {
return obj.toString().length() << 1;
}
if (obj instanceof Symbol) {
return ((Symbol) obj).length() << 1;
}
if (obj instanceof Byte || obj instanceof UnsignedByte) {
return Byte.BYTES;
}
if (obj instanceof Integer || obj instanceof UnsignedInteger) {
return Integer.BYTES;
}
if (obj instanceof Long || obj instanceof UnsignedLong || obj instanceof Date) {
return Long.BYTES;
}
if (obj instanceof Short || obj instanceof UnsignedShort) {
return Short.BYTES;
}
if (obj instanceof Boolean) {
return 1;
}
if (obj instanceof Character) {
return 4;
}
if (obj instanceof Float) {
return Float.BYTES;
}
if (obj instanceof Double) {
return Double.BYTES;
}
if (obj instanceof UUID) {
// UUID is internally represented as 16 bytes. But how does ProtonJ encode it? To be safe..
// we can treat it as a string of 36 chars = 72 bytes. return 72;
return 16;
}
if (obj instanceof Decimal32) {
return 4;
}
if (obj instanceof Decimal64) {
return 8;
}
if (obj instanceof Decimal128) {
return 16;
}
if (obj instanceof Binary) {
return ((Binary) obj).getLength();
}
if (obj instanceof Declare) {
// Empty declare command takes up 7 bytes.
return 7;
}
if (obj instanceof Discharge) {
Discharge discharge = (Discharge) obj;
return 12 + discharge.getTxnId().getLength();
}
if (obj instanceof Map) {
// Size and Count each take a max of 4 bytes
int size = 8;
Map map = (Map) obj;
for (Object value : map.keySet()) {
size += sizeof(value);
}
for (Object value : map.values()) {
size += sizeof(value);
}
return size;
}
if (obj instanceof Iterable) {
// Size and Count each take a max of 4 bytes
int size = 8;
for (Object innerObject : (Iterable) obj) {
size += sizeof(innerObject);
}
return size;
}
if (obj.getClass().isArray()) {
// Size and Count each take a max of 4 bytes
int size = 8;
int length = Array.getLength(obj);
for (int i = 0; i < length; i++) {
size += sizeof(Array.get(obj, i));
}
return size;
}
throw new IllegalArgumentException(String.format(Locale.US,
"Encoding Type: %s is not supported", obj.getClass()));
}
}