TransactionCoordinator.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Mono;

import static com.azure.core.amqp.implementation.ClientConstants.MAX_AMQP_HEADER_SIZE_BYTES;

/**
 * Encapsulates transaction functions.
 */
final class TransactionCoordinator {

    private final ClientLogger logger = new ClientLogger(TransactionCoordinator.class);

    private final AmqpSendLink sendLink;
    private final MessageSerializer messageSerializer;

    TransactionCoordinator(AmqpSendLink sendLink, MessageSerializer messageSerializer) {
        this.sendLink = sendLink;
        this.messageSerializer = messageSerializer;
    }

    /**
     * Completes the transaction. All the work in this transaction will either rollback or committed as one unit of
     * work.
     *
     * @param transaction that needs to be completed.
     * @param isCommit true for commit and false to rollback this transaction.
     *
     * @return a completable {@link Mono} which represent {@link DeliveryState}.
     */
    Mono<Void> completeTransaction(AmqpTransaction transaction, boolean isCommit) {
        final Message message = Proton.message();
        Discharge discharge = new Discharge();
        discharge.setFail(!isCommit);
        discharge.setTxnId(new Binary(transaction.getTransactionId().array()));
        message.setBody(new AmqpValue(discharge));

        final int payloadSize = messageSerializer.getSize(message);
        final int allocationSize = payloadSize + MAX_AMQP_HEADER_SIZE_BYTES;

        final byte[] bytes = new byte[allocationSize];
        final int encodedSize = message.encode(bytes, 0, allocationSize);

        return sendLink.send(bytes, encodedSize, DeliveryImpl.DEFAULT_MESSAGE_FORMAT, null)
            .handle((outcome, sink) -> {
                final DeliveryState.DeliveryStateType stateType = outcome.getType();
                switch (stateType) {
                    case Accepted:
                        sink.complete();
                        break;
                    default:
                        sink.error(new IllegalArgumentException("Expected a Accepted, received: " + outcome));
                        logger.warning("Unknown DeliveryState type: {}", stateType);
                }
            });
    }

    /**
     * Creates the transaction in message broker.
     *
     * @return a completable {@link Mono} which represent {@link DeliveryState}.
     */
    Mono<AmqpTransaction> createTransaction() {
        final Message message = Proton.message();
        Declare declare = new Declare();
        message.setBody(new AmqpValue(declare));

        final int payloadSize = messageSerializer.getSize(message);
        final int allocationSize = payloadSize + MAX_AMQP_HEADER_SIZE_BYTES;

        final byte[] bytes = new byte[allocationSize];
        final int encodedSize = message.encode(bytes, 0, allocationSize);

        return sendLink.send(bytes, encodedSize, DeliveryImpl.DEFAULT_MESSAGE_FORMAT, null)
            .handle((outcome, sink) -> {
                final DeliveryState.DeliveryStateType stateType = outcome.getType();
                switch (stateType) {
                    case Declared:
                        Binary transactionId;
                        Declared declared = (Declared) outcome;
                        transactionId = declared.getTxnId();
                        sink.next(new AmqpTransaction(transactionId.asByteBuffer()));
                        break;
                    default:
                        sink.error(new IllegalArgumentException("Expected a Declared, received: " + outcome));
                        logger.warning("Unknown DeliveryState type: {}", stateType);
                }
            });
    }
}