RetriableWorkItem.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.amqp.implementation;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import reactor.core.publisher.MonoSink;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Represents a work item that can be scheduled multiple times.
*/
class RetriableWorkItem {
private final AtomicInteger retryAttempts = new AtomicInteger();
private final MonoSink<DeliveryState> monoSink;
private final TimeoutTracker timeoutTracker;
private final byte[] amqpMessage;
private final int messageFormat;
private final int encodedMessageSize;
private final DeliveryState deliveryState;
private boolean waitingForAck;
private Exception lastKnownException;
RetriableWorkItem(byte[] amqpMessage, int encodedMessageSize, int messageFormat, MonoSink<DeliveryState> monoSink,
Duration timeout, DeliveryState deliveryState) {
this(amqpMessage, encodedMessageSize, messageFormat, monoSink, new TimeoutTracker(timeout,
false), deliveryState);
}
private RetriableWorkItem(byte[] amqpMessage, int encodedMessageSize, int messageFormat, MonoSink<DeliveryState>
monoSink, TimeoutTracker timeout, DeliveryState deliveryState) {
this.amqpMessage = amqpMessage;
this.encodedMessageSize = encodedMessageSize;
this.messageFormat = messageFormat;
this.monoSink = monoSink;
this.timeoutTracker = timeout;
this.deliveryState = deliveryState;
}
byte[] getMessage() {
return amqpMessage;
}
DeliveryState getDeliveryState() {
return deliveryState;
}
boolean isDeliveryStateProvided() {
return deliveryState != null;
}
TimeoutTracker getTimeoutTracker() {
return timeoutTracker;
}
void success(DeliveryState deliveryState) {
monoSink.success(deliveryState);
}
void error(Throwable error) {
monoSink.error(error);
}
int incrementRetryAttempts() {
return retryAttempts.incrementAndGet();
}
boolean hasBeenRetried() {
return retryAttempts.get() == 0;
}
int getEncodedMessageSize() {
return encodedMessageSize;
}
int getMessageFormat() {
return messageFormat;
}
Exception getLastKnownException() {
return this.lastKnownException;
}
void setLastKnownException(Exception exception) {
this.lastKnownException = exception;
}
void setWaitingForAck() {
this.waitingForAck = true;
}
boolean isWaitingForAck() {
return this.waitingForAck;
}
}