ServiceBusMessageBatch.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;

import java.nio.BufferOverflowException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan;

/**
 * A class for aggregating {@link ServiceBusMessage messages} into a single, size-limited, batch. It is treated as a
 * single AMQP message when sent to the Azure Service Bus service.
 */
public final class ServiceBusMessageBatch {
    private final ClientLogger logger = new ClientLogger(ServiceBusMessageBatch.class);
    private final Object lock = new Object();
    private final int maxMessageSize;
    private final ErrorContextProvider contextProvider;
    private final MessageSerializer serializer;
    private final List<ServiceBusMessage> serviceBusMessageList;
    private final byte[] eventBytes;
    private final AtomicInteger sizeInBytes;
    private final TracerProvider tracerProvider;
    private final String entityPath;
    private final String hostname;

    ServiceBusMessageBatch(int maxMessageSize, ErrorContextProvider contextProvider, TracerProvider tracerProvider,
        MessageSerializer serializer, String entityPath, String hostname) {
        this.maxMessageSize = maxMessageSize;
        this.contextProvider = contextProvider;
        this.serializer = serializer;
        this.serviceBusMessageList = Collections.synchronizedList(new LinkedList<>());
        this.sizeInBytes = new AtomicInteger((maxMessageSize / 65536) * 1024); // reserve 1KB for every 64KB
        this.eventBytes = new byte[maxMessageSize];
        this.tracerProvider = tracerProvider;
        this.entityPath = entityPath;
        this.hostname = hostname;
    }

    /**
     * Gets the number of {@link ServiceBusMessage messages} in the batch.
     *
     * @return The number of {@link ServiceBusMessage messages} in the batch.
     */
    public int getCount() {
        return serviceBusMessageList.size();
    }

    /**
     * Gets the maximum size, in bytes, of the {@link ServiceBusMessageBatch batch}.
     *
     * @return The maximum size, in bytes, of the {@link ServiceBusMessageBatch batch}.
     */
    public int getMaxSizeInBytes() {
        return maxMessageSize;
    }

    /**
     * Gets the size of the {@link ServiceBusMessageBatch batch} in bytes.
     *
     * @return The size of the {@link ServiceBusMessageBatch batch} in bytes.
     */
    public int getSizeInBytes() {
        return this.sizeInBytes.get();
    }

    /**
     * Tries to add an {@link ServiceBusMessage message} to the batch.
     *
     * @param serviceBusMessage The {@link ServiceBusMessage} to add to the batch.
     *
     * @return {@code true} if the message could be added to the batch; {@code false} if the event was too large to fit
     *     in the batch.
     *
     * @throws NullPointerException if {@code message} is {@code null}.
     * @throws AmqpException if {@code message} is larger than the maximum size of the {@link
     *     ServiceBusMessageBatch}.
     */
    public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
        if (serviceBusMessage == null) {
            throw logger.logExceptionAsWarning(new NullPointerException("'serviceBusMessage' cannot be null"));
        }
        ServiceBusMessage serviceBusMessageUpdated =
            tracerProvider.isEnabled()
                ? traceMessageSpan(serviceBusMessage, serviceBusMessage.getContext(), hostname, entityPath,
                tracerProvider)
                : serviceBusMessage;

        final AtomicInteger size = new AtomicInteger();
        try {
            size.set(getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty()));
        } catch (BufferOverflowException exception) {
            final RuntimeException ex = new ServiceBusException(
                    new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
                        String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb",
                            maxMessageSize / 1024), contextProvider.getErrorContext()), ServiceBusErrorSource.SEND);

            throw logger.logExceptionAsWarning(ex);
        }

        if (this.sizeInBytes.addAndGet(size.get()) > this.maxMessageSize) {
            this.sizeInBytes.addAndGet(-1 * size.get());
            return false;
        }

        this.serviceBusMessageList.add(serviceBusMessageUpdated);
        return true;
    }

    /**
     * Gets the messages in the batch.
     *
     * @return The messages in the message batch.
     */
    List<ServiceBusMessage> getMessages() {
        return serviceBusMessageList;
    }

    private int getSize(final ServiceBusMessage serviceBusMessage, final boolean isFirst) {
        Objects.requireNonNull(serviceBusMessage, "'serviceBusMessage' cannot be null.");

        final org.apache.qpid.proton.message.Message amqpMessage = serializer.serialize(serviceBusMessage);
        int eventSize = amqpMessage.encode(this.eventBytes, 0, maxMessageSize); // actual encoded bytes size
        eventSize += 16; // data section overhead

        if (isFirst) {
            amqpMessage.setBody(null);
            amqpMessage.setApplicationProperties(null);
            amqpMessage.setProperties(null);
            amqpMessage.setDeliveryAnnotations(null);

            eventSize += amqpMessage.encode(this.eventBytes, 0, maxMessageSize);
        }

        return eventSize;
    }
}