ServiceBusMessage.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;
import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.models.AmqpAddress;
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.AmqpMessageBody;
import com.azure.core.amqp.models.AmqpMessageBodyType;
import com.azure.core.amqp.models.AmqpMessageHeader;
import com.azure.core.amqp.models.AmqpMessageId;
import com.azure.core.amqp.models.AmqpMessageProperties;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.Objects;
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.SCHEDULED_ENQUEUE_UTC_TIME_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
/**
* The data structure encapsulating the message being sent to Service Bus. The message structure is discussed in detail
* in the <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads">product
* documentation</a>.
*
* @see ServiceBusReceivedMessage
* @see ServiceBusMessageBatch
* @see ServiceBusSenderAsyncClient#sendMessage(ServiceBusMessage)
* @see ServiceBusSenderClient#sendMessage(ServiceBusMessage)
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads">Service Bus
* message payloads</a>
* @see <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf">AMQP 1.0 specification
* </a>
*/
public class ServiceBusMessage {
private static final int MAX_MESSAGE_ID_LENGTH = 128;
private static final int MAX_PARTITION_KEY_LENGTH = 128;
private static final int MAX_SESSION_ID_LENGTH = 128;
private final AmqpAnnotatedMessage amqpAnnotatedMessage;
private final ClientLogger logger = new ClientLogger(ServiceBusMessage.class);
private Context context;
/**
* Creates a {@link ServiceBusMessage} with given byte array body.
*
* @param body The content of the Service bus message.
*
* @throws NullPointerException if {@code body} is null.
*/
public ServiceBusMessage(byte[] body) {
this(BinaryData.fromBytes(Objects.requireNonNull(body, "'body' cannot be null.")));
}
/**
* Creates a {@link ServiceBusMessage} with a {@link StandardCharsets#UTF_8 UTF-8} encoded body.
*
* @param body The content of the Service Bus message.
*
* @throws NullPointerException if {@code body} is null.
*/
public ServiceBusMessage(String body) {
this(BinaryData.fromString(Objects.requireNonNull(body, "'body' cannot be null.")));
}
/**
* Creates a {@link ServiceBusMessage} containing the {@code body}.The {@link BinaryData} provides various
* convenience API representing byte array. It also provides a way to serialize {@link Object} into {@link
* BinaryData}.
*
* @param body The data to set for this {@link ServiceBusMessage}.
*
* @throws NullPointerException if {@code body} is {@code null}.
* @see BinaryData
*/
public ServiceBusMessage(BinaryData body) {
Objects.requireNonNull(body, "'body' cannot be null.");
this.context = Context.NONE;
this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.fromData(body.toBytes()));
}
/**
* This constructor provides an easy way to create {@link ServiceBusMessage} with message body as AMQP Data types
* {@code SEQUENCE} and {@code VALUE}.
* In case of {@code SEQUENCE}, tt support sending and receiving of only one AMQP Sequence at present.
* If you are sending message with single byte array or String data, you can also use other constructor.
*
* @param amqpMessageBody amqp message body.
*
* @throws NullPointerException if {@code amqpMessageBody} is {@code null}.
*/
public ServiceBusMessage(AmqpMessageBody amqpMessageBody) {
Objects.requireNonNull(amqpMessageBody, "'body' cannot be null.");
this.context = Context.NONE;
this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(amqpMessageBody);
}
/**
* Creates a {@link ServiceBusMessage} using properties from {@code receivedMessage}. This is normally used when a
* {@link ServiceBusReceivedMessage} needs to be sent to another entity.
*
* @param receivedMessage The received message to create new message from.
*
* @throws NullPointerException if {@code receivedMessage} is {@code null}.
* @throws IllegalStateException for invalid {@link AmqpMessageBodyType}.
*/
public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
Objects.requireNonNull(receivedMessage, "'receivedMessage' cannot be null.");
final AmqpMessageBodyType bodyType = receivedMessage.getRawAmqpMessage().getBody().getBodyType();
AmqpMessageBody amqpMessageBody;
switch (bodyType) {
case DATA:
amqpMessageBody = AmqpMessageBody.fromData(receivedMessage.getRawAmqpMessage().getBody()
.getFirstData());
break;
case SEQUENCE:
amqpMessageBody = AmqpMessageBody.fromSequence(receivedMessage.getRawAmqpMessage().getBody()
.getSequence());
break;
case VALUE:
amqpMessageBody = AmqpMessageBody.fromValue(receivedMessage.getRawAmqpMessage().getBody()
.getValue());
break;
default:
throw logger.logExceptionAsError(new IllegalStateException("Body type not valid "
+ bodyType.toString()));
}
this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(amqpMessageBody);
// set properties
final AmqpMessageProperties receivedProperties = receivedMessage.getRawAmqpMessage().getProperties();
final AmqpMessageProperties newProperties = amqpAnnotatedMessage.getProperties();
newProperties.setMessageId(receivedProperties.getMessageId());
newProperties.setUserId(receivedProperties.getUserId());
newProperties.setTo(receivedProperties.getTo());
newProperties.setSubject(receivedProperties.getSubject());
newProperties.setReplyTo(receivedProperties.getReplyTo());
newProperties.setCorrelationId(receivedProperties.getCorrelationId());
newProperties.setContentType(receivedProperties.getContentType());
newProperties.setContentEncoding(receivedProperties.getContentEncoding());
newProperties.setAbsoluteExpiryTime(receivedProperties.getAbsoluteExpiryTime());
newProperties.setCreationTime(receivedProperties.getCreationTime());
newProperties.setGroupId(receivedProperties.getGroupId());
newProperties.setGroupSequence(receivedProperties.getGroupSequence());
newProperties.setReplyToGroupId(receivedProperties.getReplyToGroupId());
// copy header except for delivery count which should be set to null
final AmqpMessageHeader receivedHeader = receivedMessage.getRawAmqpMessage().getHeader();
final AmqpMessageHeader newHeader = this.amqpAnnotatedMessage.getHeader();
newHeader.setPriority(receivedHeader.getPriority());
newHeader.setTimeToLive(receivedHeader.getTimeToLive());
newHeader.setDurable(receivedHeader.isDurable());
newHeader.setFirstAcquirer(receivedHeader.isFirstAcquirer());
// copy message annotations except for broker set ones
final Map<String, Object> receivedAnnotations = receivedMessage.getRawAmqpMessage()
.getMessageAnnotations();
final Map<String, Object> newAnnotations = this.amqpAnnotatedMessage.getMessageAnnotations();
for (Map.Entry<String, Object> entry : receivedAnnotations.entrySet()) {
if (AmqpMessageConstant.fromString(entry.getKey()) == LOCKED_UNTIL_KEY_ANNOTATION_NAME
|| AmqpMessageConstant.fromString(entry.getKey()) == SEQUENCE_NUMBER_ANNOTATION_NAME
|| AmqpMessageConstant.fromString(entry.getKey()) == DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME
|| AmqpMessageConstant.fromString(entry.getKey()) == ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME
|| AmqpMessageConstant.fromString(entry.getKey()) == ENQUEUED_TIME_UTC_ANNOTATION_NAME) {
continue;
}
newAnnotations.put(entry.getKey(), entry.getValue());
}
// copy delivery annotations
final Map<String, Object> receivedDelivery = receivedMessage.getRawAmqpMessage().getDeliveryAnnotations();
final Map<String, Object> newDelivery = this.amqpAnnotatedMessage.getDeliveryAnnotations();
for (Map.Entry<String, Object> entry : receivedDelivery.entrySet()) {
newDelivery.put(entry.getKey(), entry.getValue());
}
// copy Footer
final Map<String, Object> receivedFooter = receivedMessage.getRawAmqpMessage().getFooter();
final Map<String, Object> newFooter = this.amqpAnnotatedMessage.getFooter();
for (Map.Entry<String, Object> entry : receivedFooter.entrySet()) {
newFooter.put(entry.getKey(), entry.getValue());
}
// copy application properties except for broker set ones
final Map<String, Object> receivedApplicationProperties = receivedMessage.getRawAmqpMessage()
.getApplicationProperties();
final Map<String, Object> newApplicationProperties = this.amqpAnnotatedMessage.getApplicationProperties();
for (Map.Entry<String, Object> entry : receivedApplicationProperties.entrySet()) {
if (AmqpMessageConstant.fromString(entry.getKey()) == DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME
|| AmqpMessageConstant.fromString(entry.getKey()) == DEAD_LETTER_REASON_ANNOTATION_NAME) {
continue;
}
newApplicationProperties.put(entry.getKey(), entry.getValue());
}
this.context = Context.NONE;
}
/**
* Gets the set of free-form {@link ServiceBusMessage} properties which may be used for passing metadata associated
* with the {@link ServiceBusMessage} during Service Bus operations. A common use-case for {@code
* getApplicationProperties()} is to associate serialization hints for the {@link #getBody()} as an aid to consumers
* who wish to deserialize the binary data.
*
* @return Application properties associated with this {@link ServiceBusMessage}.
*/
public Map<String, Object> getApplicationProperties() {
return amqpAnnotatedMessage.getApplicationProperties();
}
/**
* Gets the actual payload wrapped by the {@link ServiceBusMessage}.
*
* <p>The {@link BinaryData} wraps byte array and is an abstraction over many different ways it can be represented.
* It provides convenience APIs to serialize/deserialize the object.</p>
*
* <p>If the means for deserializing the raw data is not apparent to consumers, a common technique is to make use
* of {@link #getApplicationProperties()} when creating the event, to associate serialization hints as an aid to
* consumers who wish to deserialize the binary data.</p>
*
* @throws IllegalStateException if called for the messages which are not of binary data type.
* @return Binary data representing the payload.
*/
public BinaryData getBody() {
final AmqpMessageBodyType type = amqpAnnotatedMessage.getBody().getBodyType();
switch (type) {
case DATA:
return BinaryData.fromBytes(amqpAnnotatedMessage.getBody().getFirstData());
case SEQUENCE:
case VALUE:
throw logger.logExceptionAsError(new IllegalStateException("Message body type is not DATA, instead "
+ "it is: " + type.toString()));
default:
throw logger.logExceptionAsError(new IllegalArgumentException("Unknown AmqpBodyType: "
+ type.toString()));
}
}
/**
* Gets the content type of the message.
*
* <p>
* Optionally describes the payload of the message, with a descriptor following the format of RFC2045, Section 5,
* for example "application/json".
* </p>
* @return The content type of the {@link ServiceBusMessage}.
*/
public String getContentType() {
return amqpAnnotatedMessage.getProperties().getContentType();
}
/**
* Sets the content type of the {@link ServiceBusMessage}.
*
* <p>
* Optionally describes the payload of the message, with a descriptor following the format of RFC2045, Section 5,
* for example "application/json".
* </p>
*
* @param contentType RFC2045 Content-Type descriptor of the message.
*
* @return The updated {@link ServiceBusMessage}.
*/
public ServiceBusMessage setContentType(String contentType) {
amqpAnnotatedMessage.getProperties().setContentType(contentType);
return this;
}
/**
* Gets a correlation identifier.
* <p>
* Allows an application to specify a context for the message for the purposes of correlation, for example
* reflecting the MessageId of a message that is being replied to.
* </p>
*
* @return The correlation id of this message.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation">Message
* Routing and Correlation</a>
*/
public String getCorrelationId() {
String correlationId = null;
AmqpMessageId amqpCorrelationId = amqpAnnotatedMessage.getProperties().getCorrelationId();
if (amqpCorrelationId != null) {
correlationId = amqpCorrelationId.toString();
}
return correlationId;
}
/**
* Sets a correlation identifier.
*
* @param correlationId correlation id of this message
*
* @return The updated {@link ServiceBusMessage}.
* @see #getCorrelationId()
*/
public ServiceBusMessage setCorrelationId(String correlationId) {
AmqpMessageId id = null;
if (correlationId != null) {
id = new AmqpMessageId(correlationId);
}
amqpAnnotatedMessage.getProperties().setCorrelationId(id);
return this;
}
/**
* Gets the subject for the message.
*
* <p>
* This property enables the application to indicate the purpose of the message to the receiver in a standardized
* fashion, similar to an email subject line. The mapped AMQP property is "subject".
* </p>
*
* @return The subject for the message.
*/
public String getSubject() {
return amqpAnnotatedMessage.getProperties().getSubject();
}
/**
* Sets the subject for the message.
*
* @param subject The application specific subject.
*
* @return The updated {@link ServiceBusMessage} object.
*/
public ServiceBusMessage setSubject(String subject) {
amqpAnnotatedMessage.getProperties().setSubject(subject);
return this;
}
/**
* Gets the message id.
*
* <p>
* The message identifier is an application-defined value that uniquely identifies the message and its payload. The
* identifier is a free-form string and can reflect a GUID or an identifier derived from the application context. If
* enabled, the
* <a href="https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection">duplicate detection</a>
* feature identifies and removes second and further submissions of messages with the same {@code messageId}.
* </p>
*
* @return Id of the {@link ServiceBusMessage}.
*/
public String getMessageId() {
String messageId = null;
AmqpMessageId amqpMessageId = amqpAnnotatedMessage.getProperties().getMessageId();
if (amqpMessageId != null) {
messageId = amqpMessageId.toString();
}
return messageId;
}
/**
* Sets the message id.
*
* @param messageId The message id to be set.
*
* @return The updated {@link ServiceBusMessage}.
* @throws IllegalArgumentException if {@code messageId} is too long.
*/
public ServiceBusMessage setMessageId(String messageId) {
checkIdLength("messageId", messageId, MAX_MESSAGE_ID_LENGTH);
AmqpMessageId id = null;
if (messageId != null) {
id = new AmqpMessageId(messageId);
}
amqpAnnotatedMessage.getProperties().setMessageId(id);
return this;
}
/**
* Gets the partition key for sending a message to a partitioned entity.
* <p>
* For <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning">partitioned
* entities</a>, setting this value enables assigning related messages to the same internal partition, so that
* submission sequence order is correctly recorded. The partition is chosen by a hash function over this value and
* cannot be chosen directly. For session-aware entities, the {@link #getSessionId() sessionId} property overrides
* this value.
* </p>
*
* @return The partition key of this message.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning">Partitioned
* entities</a>
*/
public String getPartitionKey() {
return (String) amqpAnnotatedMessage.getMessageAnnotations().get(PARTITION_KEY_ANNOTATION_NAME.getValue());
}
/**
* Sets a partition key for sending a message to a partitioned entity
*
* @param partitionKey The partition key of this message.
*
* @return The updated {@link ServiceBusMessage}.
* @throws IllegalArgumentException if {@code partitionKey} is too long or if the {@code partitionKey} does not
* match the {@code sessionId}.
* @see #getPartitionKey()
*/
public ServiceBusMessage setPartitionKey(String partitionKey) {
checkIdLength("partitionKey", partitionKey, MAX_PARTITION_KEY_LENGTH);
checkPartitionKey(partitionKey);
amqpAnnotatedMessage.getMessageAnnotations().put(PARTITION_KEY_ANNOTATION_NAME.getValue(), partitionKey);
return this;
}
/**
* Gets the {@link AmqpAnnotatedMessage}.
*
* @return The raw AMQP message.
*/
public AmqpAnnotatedMessage getRawAmqpMessage() {
return amqpAnnotatedMessage;
}
/**
* Gets the address of an entity to send replies to.
* <p>
* This optional and application-defined value is a standard way to express a reply path to the receiver of the
* message. When a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic
* it expects the reply to be sent to.
*
* @return ReplyTo property value of this message
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation">Message
* Routing and Correlation</a>
*/
public String getReplyTo() {
String replyTo = null;
AmqpAddress amqpAddress = amqpAnnotatedMessage.getProperties().getReplyTo();
if (amqpAddress != null) {
replyTo = amqpAddress.toString();
}
return replyTo;
}
/**
* Sets the address of an entity to send replies to.
*
* @param replyTo ReplyTo property value of this message
*
* @return The updated {@link ServiceBusMessage}.
* @see #getReplyTo()
*/
public ServiceBusMessage setReplyTo(String replyTo) {
AmqpAddress replyToAddress = null;
if (replyTo != null) {
replyToAddress = new AmqpAddress(replyTo);
}
amqpAnnotatedMessage.getProperties().setReplyTo(replyToAddress);
return this;
}
/**
* Gets the "to" address.
*
* <p>
* This property is reserved for future use in routing scenarios and presently ignored by the broker itself.
* Applications can use this value in rule-driven
* <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-auto-forwarding">auto-forward
* chaining</a> scenarios to indicate the intended logical destination of the message.
* </p>
*
* @return "To" property value of this message
*/
public String getTo() {
String to = null;
AmqpAddress amqpAddress = amqpAnnotatedMessage.getProperties().getTo();
if (amqpAddress != null) {
to = amqpAddress.toString();
}
return to;
}
/**
* Sets the "to" address.
*
* <p>
* This property is reserved for future use in routing scenarios and presently ignored by the broker itself.
* Applications can use this value in rule-driven
* <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-auto-forwarding">auto-forward
* chaining</a> scenarios to indicate the intended logical destination of the message.
* </p>
*
* @param to To property value of this message.
*
* @return The updated {@link ServiceBusMessage}.
*/
public ServiceBusMessage setTo(String to) {
AmqpAddress toAddress = null;
if (to != null) {
toAddress = new AmqpAddress(to);
}
amqpAnnotatedMessage.getProperties().setTo(toAddress);
return this;
}
/**
* Gets the duration before this message expires.
* <p>
* This value is the relative duration after which the message expires, starting from the instant the message has
* been accepted and stored by the broker, as captured in {@link #getScheduledEnqueueTime()}. When not set
* explicitly, the assumed value is the DefaultTimeToLive set for the respective queue or topic. A message-level
* TimeToLive value cannot be longer than the entity's DefaultTimeToLive setting and it is silently adjusted if it
* does.
*
* @return Time to live duration of this message
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-expiration">Message Expiration</a>
*/
public Duration getTimeToLive() {
return amqpAnnotatedMessage.getHeader().getTimeToLive();
}
/**
* Sets the duration of time before this message expires.
*
* @param timeToLive Time to Live duration of this message
*
* @return The updated {@link ServiceBusMessage}.
* @see #getTimeToLive()
*/
public ServiceBusMessage setTimeToLive(Duration timeToLive) {
amqpAnnotatedMessage.getHeader().setTimeToLive(timeToLive);
return this;
}
/**
* Gets the scheduled enqueue time of this message.
* <p>
* This value is used for delayed message availability. The message is safely added to the queue, but is not
* considered active and therefore not retrievable until the scheduled enqueue time. Mind that the message may not
* be activated (enqueued) at the exact given datetime; the actual activation time depends on the queue's workload
* and its state.
* </p>
*
* @return the datetime at which the message will be enqueued in Azure Service Bus
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-sequencing">Message Sequencing and
* Timestamps</a>
*/
public OffsetDateTime getScheduledEnqueueTime() {
Object value = amqpAnnotatedMessage.getMessageAnnotations().get(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue());
return value != null
? ((OffsetDateTime) value).toInstant().atOffset(ZoneOffset.UTC)
: null;
}
/**
* Sets the scheduled enqueue time of this message. A {@code null} will not be set. If this value needs to be unset
* it could be done by value removing from {@link AmqpAnnotatedMessage#getMessageAnnotations()} using key {@link
* AmqpMessageConstant#SCHEDULED_ENQUEUE_UTC_TIME_NAME}.
*
* @param scheduledEnqueueTime the datetime at which this message should be enqueued in Azure Service Bus.
*
* @return The updated {@link ServiceBusMessage}.
* @see #getScheduledEnqueueTime()
*/
public ServiceBusMessage setScheduledEnqueueTime(OffsetDateTime scheduledEnqueueTime) {
if (scheduledEnqueueTime != null) {
amqpAnnotatedMessage.getMessageAnnotations().put(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue(),
scheduledEnqueueTime);
}
return this;
}
/**
* Gets or sets a session identifier augmenting the {@link #getReplyTo() ReplyTo} address.
* <p>
* This value augments the {@link #getReplyTo() reply to} information and specifies which {@code sessionId} should
* be set for the reply when sent to the reply entity.
*
* @return The {@code getReplyToGroupId} property value of this message.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation">Message
* Routing and Correlation</a>
*/
public String getReplyToSessionId() {
return amqpAnnotatedMessage.getProperties().getReplyToGroupId();
}
/**
* Gets or sets a session identifier augmenting the {@link #getReplyTo() ReplyTo} address.
*
* @param replyToSessionId The ReplyToGroupId property value of this message.
*
* @return The updated {@link ServiceBusMessage}.
*/
public ServiceBusMessage setReplyToSessionId(String replyToSessionId) {
amqpAnnotatedMessage.getProperties().setReplyToGroupId(replyToSessionId);
return this;
}
/**
* Gets the session identifier for a session-aware entity.
*
* <p>
* For session-aware entities, this application-defined value specifies the session affiliation of the message.
* Messages with the same session identifier are subject to summary locking and enable exact in-order processing and
* demultiplexing. For session-unaware entities, this value is ignored. See <a
* href="https://docs.microsoft.com/azure/service-bus-messaging/message-sessions">Message Sessions</a>.
* </p>
*
* @return The session id of the {@link ServiceBusMessage}.
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-sessions">Message Sessions</a>
*/
public String getSessionId() {
return amqpAnnotatedMessage.getProperties().getGroupId();
}
/**
* Sets the session identifier for a session-aware entity.
*
* @param sessionId The session identifier to be set.
*
* @return The updated {@link ServiceBusMessage}.
* @throws IllegalArgumentException if {@code sessionId} is too long or if the {@code sessionId} does not match
* the {@code partitionKey}.
*/
public ServiceBusMessage setSessionId(String sessionId) {
checkIdLength("sessionId", sessionId, MAX_SESSION_ID_LENGTH);
checkSessionId(sessionId);
amqpAnnotatedMessage.getProperties().setGroupId(sessionId);
return this;
}
/**
* A specified key-value pair of type {@link Context} to set additional information on the {@link
* ServiceBusMessage}.
*
* @return the {@link Context} object set on the {@link ServiceBusMessage}.
*/
Context getContext() {
return context;
}
/**
* Adds a new key value pair to the existing context on Message.
*
* @param key The key for this context object
* @param value The value for this context object.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
public ServiceBusMessage addContext(String key, Object value) {
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
Objects.requireNonNull(value, "The 'value' parameter cannot be null.");
this.context = context.addData(key, value);
return this;
}
/**
* Checks the length of ID fields.
*
* Some fields within the message will cause a failure in the service without enough context information.
*/
private void checkIdLength(String fieldName, String value, int maxLength) {
if (value != null && value.length() > maxLength) {
final String message = String.format("%s cannot be longer than %d characters.", fieldName, maxLength);
throw logger.logExceptionAsError(new IllegalArgumentException(message));
}
}
/**
* Validates that the user can't set the partitionKey to a different value than the session ID. (this will
* eventually migrate to a service-side check)
*/
private void checkSessionId(String proposedSessionId) {
if (proposedSessionId == null) {
return;
}
if (this.getPartitionKey() != null && this.getPartitionKey().compareTo(proposedSessionId) != 0) {
final String message = String.format(
"sessionId:%s cannot be set to a different value than partitionKey:%s.",
proposedSessionId,
this.getPartitionKey());
throw logger.logExceptionAsError(new IllegalArgumentException(message));
}
}
/**
* Validates that the user can't set the partitionKey to a different value than the session ID. (this will
* eventually migrate to a service-side check)
*/
private void checkPartitionKey(String proposedPartitionKey) {
if (proposedPartitionKey == null) {
return;
}
if (this.getSessionId() != null && this.getSessionId().compareTo(proposedPartitionKey) != 0) {
final String message = String.format(
"partitionKey:%s cannot be set to a different value than sessionId:%s.",
proposedPartitionKey,
this.getSessionId());
throw logger.logExceptionAsError(new IllegalArgumentException(message));
}
}
}