SystemProperties.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventhubs;
import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.AmqpMessageProperties;
import com.azure.core.util.logging.ClientLogger;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Provides an abstraction over {@link AmqpAnnotatedMessage} and properties that are published by the service. This is a
* read-only view. The properties themselves can be updated via {@link AmqpAnnotatedMessage}.
*
* @see <a href="https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpSystemProperties.cs">AmqpSystemProperties</a>
*/
final class SystemProperties implements Map<String, Object> {
private final ClientLogger logger = new ClientLogger(SystemProperties.class);
private final Long offset;
private final String partitionKey;
private final Instant enqueuedTime;
private final Long sequenceNumber;
private final AmqpAnnotatedMessage message;
/**
* Creates an empty set of system properties. This is the case where a message was not received.
*/
SystemProperties() {
this.message = null;
this.offset = null;
this.enqueuedTime = null;
this.partitionKey = null;
this.sequenceNumber = null;
}
SystemProperties(final AmqpAnnotatedMessage message, long offset, Instant enqueuedTime, long sequenceNumber,
String partitionKey) {
this.message = Objects.requireNonNull(message, "'message' cannot be null.");
this.offset = offset;
this.enqueuedTime = enqueuedTime;
this.sequenceNumber = sequenceNumber;
this.partitionKey = partitionKey;
}
/**
* Gets the offset within the Event Hubs stream.
*
* @return The offset within the Event Hubs stream.
*/
Long getOffset() {
return offset;
}
/**
* Gets a partition key used for message partitioning. If it exists, this value was used to compute a hash to select
* a partition to send the message to.
*
* @return A partition key for this Event Data.
*/
String getPartitionKey() {
return partitionKey;
}
/**
* Gets the time this event was enqueued in the Event Hub.
*
* @return The time this was enqueued in the service.
*/
Instant getEnqueuedTime() {
return enqueuedTime;
}
/**
* Gets the sequence number in the event stream for this event. This is unique for every message received in the
* Event Hub.
*
* @return Sequence number for this event.
*
* @throws IllegalStateException if {@link SystemProperties} does not contain the sequence number in a retrieved
* event.
*/
Long getSequenceNumber() {
return sequenceNumber;
}
@Override
public int size() {
if (message == null) {
return 0;
}
return entrySet().size();
}
@Override
public Collection<Object> values() {
if (message == null) {
return Collections.emptyList();
}
return entrySet().stream().map(Entry::getValue).collect(Collectors.toList());
}
@Override
public Set<Entry<String, Object>> entrySet() {
if (message == null) {
return Collections.emptySet();
}
final AmqpMessageProperties properties = message.getProperties();
final HashSet<Entry<String, Object>> entries = new HashSet<>();
if (properties.getMessageId() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.MESSAGE_ID.getValue(), properties.getMessageId().toString(), logger));
}
if (properties.getUserId() != null && properties.getUserId().length > 0) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.USER_ID.getValue(), properties.getUserId(), logger));
}
if (properties.getTo() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.TO.getValue(), properties.getTo(), logger));
}
if (properties.getSubject() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.SUBJECT.getValue(), properties.getSubject(), logger));
}
if (properties.getReplyTo() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.REPLY_TO.getValue(), properties.getReplyTo(), logger));
}
if (properties.getCorrelationId() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.CORRELATION_ID.getValue(), properties.getCorrelationId().toString(), logger));
}
if (properties.getContentType() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.CONTENT_TYPE.getValue(), properties.getContentType(), logger));
}
if (properties.getContentEncoding() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.CONTENT_ENCODING.getValue(), properties.getContentEncoding(), logger));
}
if (properties.getContentEncoding() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.ABSOLUTE_EXPIRY_TIME.getValue(), properties.getContentEncoding(), logger));
}
if (properties.getCreationTime() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.CREATION_TIME.getValue(), properties.getCreationTime(), logger));
}
if (properties.getGroupId() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.GROUP_ID.getValue(), properties.getGroupId(), logger));
}
if (properties.getGroupSequence() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.GROUP_SEQUENCE.getValue(), properties.getGroupSequence(), logger));
}
if (properties.getReplyToGroupId() != null) {
entries.add(new SystemPropertiesEntry(
AmqpMessageConstant.REPLY_TO_GROUP_ID.getValue(), properties.getReplyToGroupId(), logger));
}
message.getMessageAnnotations().forEach((key, value) ->
entries.add(new SystemPropertiesEntry(key, value, logger)));
return entries;
}
@Override
public Object getOrDefault(Object key, Object defaultValue) {
if (message == null) {
return defaultValue;
}
if (!containsKey(key)) {
return defaultValue;
}
return get(key);
}
@Override
public Set<String> keySet() {
if (message == null) {
return Collections.emptySet();
}
return entrySet().stream().map(Entry::getKey).collect(Collectors.toSet());
}
@Override
public boolean containsKey(Object key) {
if (message == null) {
return false;
}
if (key == null) {
throw logger.logExceptionAsError(new NullPointerException("'key' cannot be null"));
} else if (!(key instanceof String)) {
throw logger.logExceptionAsError(new IllegalArgumentException(
String.format("'key' is not a string. key: %s. class: %s", key, key.getClass())));
}
return keySet().contains(key);
}
@Override
public boolean containsValue(Object value) {
if (message == null) {
return false;
}
return values().contains(value);
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public Object get(Object key) {
if (message == null) {
return null;
}
if (key == null) {
throw logger.logExceptionAsError(new NullPointerException("'key' cannot be null"));
} else if (!(key instanceof String)) {
throw logger.logExceptionAsError(new IllegalArgumentException(
String.format("'key' is not a string. key: %s. class: %s", key, key.getClass())));
}
final String keyValue = (String) key;
if (AmqpMessageConstant.MESSAGE_ID.getValue().equals(keyValue)) {
if (message.getProperties().getMessageId() != null) {
return message.getProperties().getMessageId().toString();
} else {
return null;
}
}
if (AmqpMessageConstant.USER_ID.getValue().equals(keyValue)) {
return message.getProperties().getUserId();
}
if (AmqpMessageConstant.TO.getValue().equals(keyValue)) {
if (message.getProperties().getTo() != null) {
return message.getProperties().getTo().toString();
} else {
return null;
}
}
if (AmqpMessageConstant.SUBJECT.getValue().equals(keyValue)) {
return message.getProperties().getSubject();
}
if (AmqpMessageConstant.REPLY_TO.getValue().equals(keyValue)) {
if (message.getProperties().getReplyTo() != null) {
return message.getProperties().getReplyTo().toString();
} else {
return null;
}
}
if (AmqpMessageConstant.CORRELATION_ID.getValue().equals(keyValue)) {
if (message.getProperties().getCorrelationId() != null) {
return message.getProperties().getCorrelationId().toString();
} else {
return null;
}
}
if (AmqpMessageConstant.CONTENT_TYPE.getValue().equals(keyValue)) {
return message.getProperties().getContentType();
}
if (AmqpMessageConstant.CONTENT_ENCODING.getValue().equals(keyValue)) {
return message.getProperties().getContentEncoding();
}
if (AmqpMessageConstant.ABSOLUTE_EXPIRY_TIME.getValue().equals(keyValue)) {
return message.getProperties().getAbsoluteExpiryTime();
}
if (AmqpMessageConstant.CREATION_TIME.getValue().equals(keyValue)) {
return message.getProperties().getCreationTime();
}
if (AmqpMessageConstant.GROUP_ID.getValue().equals(keyValue)) {
return message.getProperties().getGroupId();
}
if (AmqpMessageConstant.GROUP_SEQUENCE.getValue().equals(keyValue)) {
return message.getProperties().getGroupSequence();
}
if (AmqpMessageConstant.REPLY_TO_GROUP_ID.getValue().equals(keyValue)) {
return message.getProperties().getReplyToGroupId();
}
return message.getMessageAnnotations().get(keyValue);
}
@Override
public int hashCode() {
if (message == null) {
return super.hashCode();
}
return Objects.hash(message, message.getMessageAnnotations());
}
@Override
public boolean equals(Object o) {
return this == o;
}
@Override
public Object put(String key, Object value) {
throw logger.logExceptionAsError(
new UnsupportedOperationException("System properties are read-only. Cannot perform 'put' operation."));
}
@Override
public boolean remove(Object key, Object value) {
throw logger.logExceptionAsError(
new UnsupportedOperationException("System properties are read-only. Cannot perform 'remove' operation."));
}
@Override
public Object remove(Object key) {
throw logger.logExceptionAsError(
new UnsupportedOperationException("System properties are read-only. Cannot perform 'remove' operation."));
}
@Override
public void putAll(Map<? extends String, ?> m) {
throw logger.logExceptionAsError(
new UnsupportedOperationException("System properties are read-only. Cannot perform 'putAll' operation."));
}
@Override
public void clear() {
throw logger.logExceptionAsError(
new UnsupportedOperationException("System properties are read-only. Cannot perform 'clear' operation."));
}
/**
* Represents a read-only system properties entry.
*/
private static class SystemPropertiesEntry implements Map.Entry<String, Object> {
private final ClientLogger logger;
private final String key;
private final Object value;
SystemPropertiesEntry(String key, Object value, ClientLogger logger) {
this.key = key;
this.value = value;
this.logger = logger;
}
@Override
public String getKey() {
return key;
}
@Override
public Object getValue() {
return value;
}
@Override
public Object setValue(Object value) {
throw logger.logExceptionAsError(
new UnsupportedOperationException("Cannot update entry. System properties is read-only."));
}
}
}