View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.microsoft.azure.servicebus.primitives;
5   
6   import java.io.IOException;
7   import java.io.Serializable;
8   import java.time.Duration;
9   import java.time.Instant;
10  import java.time.ZonedDateTime;
11  import java.util.Arrays;
12  import java.util.Collection;
13  import java.util.Comparator;
14  import java.util.HashMap;
15  import java.util.Iterator;
16  import java.util.LinkedList;
17  import java.util.Locale;
18  import java.util.Map;
19  import java.util.PriorityQueue;
20  import java.util.UUID;
21  import java.util.concurrent.CompletableFuture;
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.ScheduledFuture;
24  
25  import com.microsoft.azure.servicebus.TransactionContext;
26  import org.apache.qpid.proton.Proton;
27  import org.apache.qpid.proton.amqp.Binary;
28  import org.apache.qpid.proton.amqp.Symbol;
29  import org.apache.qpid.proton.amqp.UnsignedInteger;
30  import org.apache.qpid.proton.amqp.messaging.Accepted;
31  import org.apache.qpid.proton.amqp.messaging.Data;
32  import org.apache.qpid.proton.amqp.messaging.Outcome;
33  import org.apache.qpid.proton.amqp.messaging.Rejected;
34  import org.apache.qpid.proton.amqp.messaging.Released;
35  import org.apache.qpid.proton.amqp.messaging.Source;
36  import org.apache.qpid.proton.amqp.messaging.Target;
37  import org.apache.qpid.proton.amqp.transaction.Declared;
38  import org.apache.qpid.proton.amqp.transaction.TransactionalState;
39  import org.apache.qpid.proton.amqp.transport.DeliveryState;
40  import org.apache.qpid.proton.amqp.transport.ErrorCondition;
41  import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
42  import org.apache.qpid.proton.engine.BaseHandler;
43  import org.apache.qpid.proton.engine.Connection;
44  import org.apache.qpid.proton.engine.Delivery;
45  import org.apache.qpid.proton.engine.EndpointState;
46  import org.apache.qpid.proton.engine.Sender;
47  import org.apache.qpid.proton.engine.Session;
48  import org.apache.qpid.proton.engine.impl.DeliveryImpl;
49  import org.apache.qpid.proton.message.Message;
50  import org.slf4j.Logger;
51  import org.slf4j.LoggerFactory;
52  
53  import com.microsoft.azure.servicebus.amqp.AmqpConstants;
54  import com.microsoft.azure.servicebus.amqp.DispatchHandler;
55  import com.microsoft.azure.servicebus.amqp.IAmqpSender;
56  import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
57  import com.microsoft.azure.servicebus.amqp.SessionHandler;
58  
59  import static java.nio.charset.StandardCharsets.UTF_8;
60  
61  /*
62   * Abstracts all amqp related details
63   * translates event-driven reactor model into async send Api
64   */
65  public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErrorContextProvider {
66      private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageSender.class);
67      private static final String SEND_TIMED_OUT = "Send operation timed out";
68      private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5); // service closes link long before this timeout expires
69  
70      private final Object requestResonseLinkCreationLock = new Object();
71      private final MessagingFactory underlyingFactory;
72      private final String sendPath;
73      private final String sasTokenAudienceURI;
74      private final Duration operationTimeout;
75      private final RetryPolicy retryPolicy;
76      private final CompletableFuture<Void> linkClose;
77      private final Object pendingSendLock;
78      private final ConcurrentHashMap<String, SendWorkItem<DeliveryState>> pendingSendsData;
79      private final PriorityQueue<WeightedDeliveryTag> pendingSends;
80      private final DispatchHandler sendWork;
81      private final MessagingEntityType entityType;
82      private boolean isSendLoopRunning;
83  
84      private Sender sendLink;
85      private RequestResponseLink requestResponseLink;
86      private CompletableFuture<CoreMessageSender> linkFirstOpen;
87      private int linkCredit;
88      private Exception lastKnownLinkError;
89      private Instant lastKnownErrorReportedAt;
90      private ScheduledFuture<?> sasTokenRenewTimerFuture;
91      private CompletableFuture<Void> requestResponseLinkCreationFuture;
92      private CompletableFuture<Void> sendLinkReopenFuture;
93      private SenderLinkSettings linkSettings;
94      private String transferDestinationPath;
95      private String transferSasTokenAudienceURI;
96      private boolean isSendVia;
97      private int maxMessageSize;
98  
99      @Deprecated
100     public static CompletableFuture<CoreMessageSender> create(
101             final MessagingFactory factory,
102             final String clientId,
103             final String senderPath,
104             final String transferDestinationPath) {
105         return CoreMessageSender.create(factory, clientId, senderPath, transferDestinationPath, null);
106     }
107 
108     public static CompletableFuture<CoreMessageSender> create(
109             final MessagingFactory factory,
110             final String clientId,
111             final String senderPath,
112             final String transferDestinationPath,
113             final MessagingEntityType entityType) {
114         return CoreMessageSender.create(factory, clientId, entityType, CoreMessageSender.getDefaultLinkProperties(senderPath, transferDestinationPath, factory, entityType));
115     }
116 
117     static CompletableFuture<CoreMessageSender> create(
118             final MessagingFactory factory,
119             final String clientId,
120             final MessagingEntityType entityType,
121             final SenderLinkSettings linkSettings) {
122         TRACE_LOGGER.info("Creating core message sender to '{}'", linkSettings.linkPath);
123 
124         final Connection connection = factory.getConnection();
125         final String sendLinkNamePrefix = "Sender".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
126         linkSettings.linkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer())
127             ? sendLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer())
128             : sendLinkNamePrefix;
129 
130         final CoreMessageSenderes/CoreMessageSender.html#CoreMessageSender">CoreMessageSender msgSender = new CoreMessageSender(factory, clientId, entityType, linkSettings);
131         TimeoutTracker openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout());
132         msgSender.initializeLinkOpen(openLinkTracker);
133 
134         CompletableFuture<Void> authenticationFuture = null;
135         if (linkSettings.requiresAuthentication) {
136             authenticationFuture = msgSender.sendTokenAndSetRenewTimer(false);
137         } else {
138             authenticationFuture = CompletableFuture.completedFuture(null);
139         }
140 
141         authenticationFuture.handleAsync((v, sasTokenEx) -> {
142             if (sasTokenEx != null) {
143                 Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
144                 TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", msgSender.sendPath, cause);
145                 msgSender.linkFirstOpen.completeExceptionally(cause);
146             } else {
147                 try {
148                     msgSender.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
149                         @Override
150                         public void onEvent() {
151                             msgSender.createSendLink(msgSender.linkSettings);
152                         }
153                     });
154                 } catch (IOException ioException) {
155                     msgSender.cancelSASTokenRenewTimer();
156                     msgSender.linkFirstOpen.completeExceptionally(new ServiceBusException(false, "Failed to create Sender, see cause for more details.", ioException));
157                 }
158             }
159 
160             return null;
161         }, MessagingFactory.INTERNAL_THREAD_POOL);
162 
163         return msgSender.linkFirstOpen;
164     }
165 
166     private CompletableFuture<Void> createRequestResponseLink() {
167         synchronized (this.requestResonseLinkCreationLock) {
168             if (this.requestResponseLinkCreationFuture == null) {
169                 this.requestResponseLinkCreationFuture = new CompletableFuture<Void>();
170                 this.underlyingFactory.obtainRequestResponseLinkAsync(this.sendPath, this.transferDestinationPath, this.entityType).handleAsync((rrlink, ex) -> {
171                     if (ex == null) {
172                         this.requestResponseLink = rrlink;
173                         this.requestResponseLinkCreationFuture.complete(null);
174                     } else {
175                         Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex);
176                         this.requestResponseLinkCreationFuture.completeExceptionally(cause);
177                         // Set it to null so next call will retry rr link creation
178                         synchronized (this.requestResonseLinkCreationLock) {
179                             this.requestResponseLinkCreationFuture = null;
180                         }
181                     }
182                     return null;
183                 }, MessagingFactory.INTERNAL_THREAD_POOL);
184             }
185 
186             return this.requestResponseLinkCreationFuture;
187         }
188     }
189 
190     private void closeRequestResponseLink() {
191         synchronized (this.requestResonseLinkCreationLock) {
192             if (this.requestResponseLinkCreationFuture != null) {
193                 this.requestResponseLinkCreationFuture.thenRun(() -> {
194                     this.underlyingFactory.releaseRequestResponseLink(this.sendPath, this.transferDestinationPath);
195                     this.requestResponseLink = null;
196                 });
197                 this.requestResponseLinkCreationFuture = null;
198             }
199         }
200     }
201 
202     private CoreMessageSender(final MessagingFactory factory, final String sendLinkName, final MessagingEntityType entityType, final SenderLinkSettings linkSettings) {
203         super(sendLinkName);
204 
205         this.sendPath = linkSettings.linkPath;
206         this.entityType = entityType;
207         if (linkSettings.linkProperties != null) {
208             String transferPath = (String) linkSettings.linkProperties.getOrDefault(ClientConstants.LINK_TRANSFER_DESTINATION_PROPERTY, null);
209             if (transferPath != null && !transferPath.isEmpty()) {
210                 this.transferDestinationPath = transferPath;
211                 this.isSendVia = true;
212                 this.transferSasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), transferDestinationPath);
213             } else {
214                 // Ensure it is null.
215                 this.transferDestinationPath = null;
216             }
217         }
218 
219         this.sasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), linkSettings.linkPath);
220         this.underlyingFactory = factory;
221         this.operationTimeout = factory.getOperationTimeout();
222         this.linkSettings = linkSettings;
223 
224         this.lastKnownLinkError = null;
225         this.lastKnownErrorReportedAt = Instant.EPOCH;
226 
227         this.retryPolicy = factory.getRetryPolicy();
228 
229         this.pendingSendLock = new Object();
230         this.pendingSendsData = new ConcurrentHashMap<String, SendWorkItem<DeliveryState>>();
231         this.pendingSends = new PriorityQueue<WeightedDeliveryTag>(1000, new DeliveryTagComparator());
232         this.linkCredit = 0;
233 
234         this.linkClose = new CompletableFuture<Void>();
235         this.sendLinkReopenFuture = null;
236         this.isSendLoopRunning = false;
237         this.sendWork = new DispatchHandler() {
238             @Override
239             public void onEvent() {
240                 CoreMessageSender.this.processSendWork();
241             }
242         };
243     }
244 
245     public String getSendPath() {
246         return this.sendPath;
247     }
248 
249     private static String generateRandomDeliveryTag() {
250         return UUID.randomUUID().toString().replace("-", StringUtil.EMPTY);
251     }
252 
253     CompletableFuture<DeliveryState> sendCoreAsync(
254             final byte[] bytes,
255             final int arrayOffset,
256             final int messageFormat,
257             final TransactionContext transaction) {
258         this.throwIfClosed(this.lastKnownLinkError);
259         TRACE_LOGGER.debug("Sending message to '{}'", this.sendPath);
260         String deliveryTag = CoreMessageSender.generateRandomDeliveryTag();
261         CompletableFuture<DeliveryState> onSendFuture = new CompletableFuture<DeliveryState>();
262         SendWorkItem<DeliveryState> sendWorkItem = new SendWorkItem<DeliveryState>(bytes, arrayOffset, messageFormat, deliveryTag, transaction, onSendFuture, this.operationTimeout);
263         this.enlistSendRequest(deliveryTag, sendWorkItem, false);
264         this.scheduleSendTimeout(sendWorkItem);
265         return onSendFuture;
266     }
267 
268     private void scheduleSendTimeout(SendWorkItem<DeliveryState> sendWorkItem) {
269         // Timer to timeout the request
270         ScheduledFuture<?> timeoutTask = Timer.schedule(() -> {
271             if (!sendWorkItem.getWork().isDone()) {
272                 TRACE_LOGGER.warn("Delivery '{}' to '{}' did not receive ack from service. Throwing timeout.", sendWorkItem.getDeliveryTag(), CoreMessageSender.this.sendPath);
273                 CoreMessageSender.this.pendingSendsData.remove(sendWorkItem.getDeliveryTag());
274                 CoreMessageSender.this.throwSenderTimeout(sendWorkItem.getWork(), sendWorkItem.getLastKnownException());
275                 // Weighted delivery tag not removed from the pending sends queue, but send loop will ignore it anyway if it is present
276             }
277         },
278             sendWorkItem.getTimeoutTracker().remaining(),
279             TimerType.OneTimeRun);
280         sendWorkItem.setTimeoutTask(timeoutTask);
281     }
282 
283     private void enlistSendRequest(String deliveryTag, SendWorkItem<DeliveryState> sendWorkItem, boolean isRetrySend) {
284         synchronized (this.pendingSendLock) {
285             this.pendingSendsData.put(deliveryTag, sendWorkItem);
286             this.pendingSends.offer(new WeightedDeliveryTag(deliveryTag, isRetrySend ? 1 : 0));
287 
288             if (!this.isSendLoopRunning) {
289                 try {
290                     this.underlyingFactory.scheduleOnReactorThread(this.sendWork);
291                 } catch (IOException ioException) {
292                     AsyncUtil.completeFutureExceptionally(sendWorkItem.getWork(), new ServiceBusException(false, "Send failed while dispatching to Reactor, see cause for more details.", ioException));
293                 }
294             }
295         }
296     }
297 
298     private void reSendAsync(String deliveryTag, SendWorkItem<DeliveryState> retryingSendWorkItem, boolean reuseDeliveryTag) {
299         if (!retryingSendWorkItem.getWork().isDone() && retryingSendWorkItem.cancelTimeoutTask(false)) {
300             Duration remainingTime = retryingSendWorkItem.getTimeoutTracker().remaining();
301             if (!remainingTime.isNegative() && !remainingTime.isZero()) {
302                 if (!reuseDeliveryTag) {
303                     deliveryTag = CoreMessageSender.generateRandomDeliveryTag();
304                     retryingSendWorkItem.setDeliveryTag(deliveryTag);
305                 }
306 
307                 this.enlistSendRequest(deliveryTag, retryingSendWorkItem, true);
308                 this.scheduleSendTimeout(retryingSendWorkItem);
309             }
310         }
311     }
312 
313     public CompletableFuture<Void> sendAsync(final Iterable<Message> messages, TransactionContext transaction) {
314         if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
315             throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
316         }
317 
318         TRACE_LOGGER.debug("Sending a batch of messages to '{}'", this.sendPath);
319 
320         Message firstMessage = messages.iterator().next();
321         if (IteratorUtil.sizeEquals(messages, 1)) {
322             return this.sendAsync(firstMessage, transaction);
323         }
324 
325         // proton-j doesn't support multiple dataSections to be part of AmqpMessage
326         // here's the alternate approach provided by them: https://github.com/apache/qpid-proton/pull/54
327         Message batchMessage = Proton.message();
328         batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());
329 
330         byte[] bytes = null;
331         int byteArrayOffset = 0;
332         try {
333             Pair<byte[], Integer> encodedPair = Util.encodeMessageToMaxSizeArray(batchMessage, this.maxMessageSize);
334             bytes = encodedPair.getFirstItem();
335             byteArrayOffset = encodedPair.getSecondItem();
336 
337             for (Message amqpMessage: messages) {
338                 Message messageWrappedByData = Proton.message();
339                 encodedPair = Util.encodeMessageToOptimalSizeArray(amqpMessage, this.maxMessageSize);
340                 messageWrappedByData.setBody(new Data(new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem())));
341 
342                 int encodedSize = Util.encodeMessageToCustomArray(messageWrappedByData, bytes, byteArrayOffset, this.maxMessageSize - byteArrayOffset - 1);
343                 byteArrayOffset = byteArrayOffset + encodedSize;
344             }
345         } catch (PayloadSizeExceededException ex) {
346             TRACE_LOGGER.error("Payload size of batch of messages exceeded limit", ex);
347             final CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
348             sendTask.completeExceptionally(ex);
349             return sendTask;
350         }
351 
352         return this.sendCoreAsync(bytes, byteArrayOffset, AmqpConstants.AMQP_BATCH_MESSAGE_FORMAT, transaction).thenAccept((x) -> { /*Do nothing*/ });
353     }
354 
355     public CompletableFuture<Void> sendAsync(Message msg, TransactionContext transaction) {
356         return this.sendAndReturnDeliveryStateAsync(msg, transaction).thenAccept((x) -> { /*Do nothing*/ });
357     }
358 
359     // To be used only by internal components like TransactionController
360     CompletableFuture<DeliveryState> sendAndReturnDeliveryStateAsync(Message msg, TransactionContext transaction) {
361         try {
362             Pair<byte[], Integer> encodedPair = Util.encodeMessageToOptimalSizeArray(msg, this.maxMessageSize);
363             return this.sendCoreAsync(encodedPair.getFirstItem(), encodedPair.getSecondItem(), DeliveryImpl.DEFAULT_MESSAGE_FORMAT, transaction);
364         } catch (PayloadSizeExceededException exception) {
365             TRACE_LOGGER.error("Payload size of message exceeded limit", exception);
366             final CompletableFuture<DeliveryState> sendTask = new CompletableFuture<DeliveryState>();
367             sendTask.completeExceptionally(exception);
368             return sendTask;
369         }
370     }
371 
372     @Override
373     public void onOpenComplete(Exception completionException) {
374         if (completionException == null) {
375             this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
376             this.lastKnownLinkError = null;
377             this.retryPolicy.resetRetryCount(this.getClientId());
378 
379             if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
380                 AsyncUtil.completeFuture(this.sendLinkReopenFuture, null);
381             }
382 
383             if (!this.linkFirstOpen.isDone()) {
384                 TRACE_LOGGER.info("Opened send link to '{}'", this.sendPath);
385                 AsyncUtil.completeFuture(this.linkFirstOpen, this);
386             } else {
387                 synchronized (this.pendingSendLock) {
388                     if (!this.pendingSendsData.isEmpty()) {
389                         LinkedList<String> unacknowledgedSends = new LinkedList<String>();
390                         unacknowledgedSends.addAll(this.pendingSendsData.keySet());
391 
392                         if (unacknowledgedSends.size() > 0) {
393                             Iterator<String> reverseReader = unacknowledgedSends.iterator();
394                             while (reverseReader.hasNext()) {
395                                 String unacknowledgedSend = reverseReader.next();
396                                 if (this.pendingSendsData.get(unacknowledgedSend).isWaitingForAck()) {
397                                     this.pendingSends.offer(new WeightedDeliveryTag(unacknowledgedSend, 1));
398                                 }
399                             }
400                         }
401 
402                         unacknowledgedSends.clear();
403                     }
404                 }
405             }
406         } else {
407             this.cancelSASTokenRenewTimer();
408             if (!this.linkFirstOpen.isDone()) {
409                 TRACE_LOGGER.error("Opening send link '{}' to '{}' failed", this.sendLink.getName(), this.sendPath, completionException);
410                 this.setClosed();
411                 ExceptionUtil.completeExceptionally(this.linkFirstOpen, completionException, this, true);
412             }
413 
414             if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
415                 TRACE_LOGGER.warn("Opening send link '{}' to '{}' failed", this.sendLink.getName(), this.sendPath, completionException);
416                 AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, completionException);
417             }
418         }
419     }
420 
421     @Override
422     public void onClose(ErrorCondition condition) {
423         Exception completionException = condition != null ? ExceptionUtil.toException(condition)
424                 : new ServiceBusException(ClientConstants.DEFAULT_IS_TRANSIENT,
425                 "The entity has been closed due to transient failures (underlying link closed), please retry the operation.");
426         this.onError(completionException);
427     }
428 
429     @Override
430     public void onError(Exception completionException) {
431         this.linkCredit = 0;
432         if (this.getIsClosingOrClosed()) {
433             Exception failureException = completionException == null
434                     ? new OperationCancelledException("Send cancelled as the Sender instance is Closed before the sendOperation completed.")
435                     : completionException;
436             this.clearAllPendingSendsWithException(failureException);
437 
438             TRACE_LOGGER.info("Send link to '{}' closed", this.sendPath);
439             AsyncUtil.completeFuture(this.linkClose, null);
440             return;
441         } else {
442             this.underlyingFactory.deregisterForConnectionError(this.sendLink);
443             this.lastKnownLinkError = completionException;
444             this.lastKnownErrorReportedAt = Instant.now();
445 
446             this.onOpenComplete(completionException);
447 
448             if (completionException != null
449                 && (!(completionException instanceof ServiceBusException./../com/microsoft/azure/servicebus/primitives/ServiceBusException.html#ServiceBusException">ServiceBusException) || !((ServiceBusException) completionException).getIsTransient())) {
450                 TRACE_LOGGER.warn("Send link '{}' to '{}' closed. Failing all pending send requests.", this.sendLink.getName(), this.sendPath);
451                 this.clearAllPendingSendsWithException(completionException);
452             } else {
453                 final Map.Entry<String, SendWorkItem<DeliveryState>> pendingSendEntry = IteratorUtil.getFirst(this.pendingSendsData.entrySet());
454                 if (pendingSendEntry != null && pendingSendEntry.getValue() != null) {
455                     final TimeoutTracker tracker = pendingSendEntry.getValue().getTimeoutTracker();
456                     if (tracker != null) {
457                         final Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), completionException, tracker.remaining());
458                         if (nextRetryInterval != null) {
459                             TRACE_LOGGER.warn("Send link '{}' to '{}' closed. Will retry link creation after '{}'.", this.sendLink.getName(), this.sendPath, nextRetryInterval);
460                             Timer.schedule(() -> CoreMessageSender.this.ensureLinkIsOpen(), nextRetryInterval, TimerType.OneTimeRun);
461                         }
462                     }
463                 }
464             }
465         }
466     }
467 
468     @Override
469     public void onSendComplete(final Delivery delivery) {
470         DeliveryState outcome = delivery.getRemoteState();
471         final String deliveryTag = new String(delivery.getTag(), UTF_8);
472 
473         TRACE_LOGGER.debug("Received ack for delivery. path:{}, linkName:{}, deliveryTag:{}, outcome:{}", CoreMessageSender.this.sendPath, this.sendLink.getName(), deliveryTag, outcome);
474         final SendWorkItem<DeliveryState> pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag);
475 
476         if (pendingSendWorkItem != null) {
477             if (outcome instanceof TransactionalState) {
478                 TRACE_LOGGER.trace("State of delivery is Transactional, retrieving outcome: {}", outcome);
479                 Outcome transactionalOutcome = ((TransactionalState) outcome).getOutcome();
480                 if (transactionalOutcome instanceof DeliveryState) {
481                     outcome = (DeliveryState) transactionalOutcome;
482                 } else {
483                     this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, "Unknown delivery state: " + outcome.toString()));
484                     return;
485                 }
486             }
487 
488             if (outcome instanceof Accepted) {
489                 this.lastKnownLinkError = null;
490                 this.retryPolicy.resetRetryCount(this.getClientId());
491 
492                 pendingSendWorkItem.cancelTimeoutTask(false);
493                 AsyncUtil.completeFuture(pendingSendWorkItem.getWork(), outcome);
494             } else if (outcome instanceof Declared) {
495                 AsyncUtil.completeFuture(pendingSendWorkItem.getWork(), outcome);
496             } else if (outcome instanceof Rejected) {
497                 Rejected rejected = (Rejected) outcome;
498                 ErrorCondition error = rejected.getError();
499                 Exception exception = ExceptionUtil.toException(error);
500 
501                 if (ExceptionUtil.isGeneralError(error.getCondition())) {
502                     this.lastKnownLinkError = exception;
503                     this.lastKnownErrorReportedAt = Instant.now();
504                 }
505 
506                 Duration retryInterval = this.retryPolicy.getNextRetryInterval(
507                         this.getClientId(), exception, pendingSendWorkItem.getTimeoutTracker().remaining());
508                 if (retryInterval == null) {
509                     this.cleanupFailedSend(pendingSendWorkItem, exception);
510                 } else {
511                     TRACE_LOGGER.warn("Send failed for delivery '{}'. Will retry after '{}'", deliveryTag, retryInterval);
512                     pendingSendWorkItem.setLastKnownException(exception);
513                     Timer.schedule(() -> CoreMessageSender.this.reSendAsync(deliveryTag, pendingSendWorkItem, false), retryInterval, TimerType.OneTimeRun);
514                 }
515             } else if (outcome instanceof Released) {
516                 this.cleanupFailedSend(pendingSendWorkItem, new OperationCancelledException(outcome.toString()));
517             } else {
518                 this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, outcome.toString()));
519             }
520         } else {
521             TRACE_LOGGER.warn("Delivery mismatch. path:{}, linkName:{}, delivery:{}", this.sendPath, this.sendLink.getName(), deliveryTag);
522         }
523     }
524 
525     private void clearAllPendingSendsWithException(Throwable failureException) {
526         synchronized (this.pendingSendLock) {
527             for (Map.Entry<String, SendWorkItem<DeliveryState>> pendingSend: this.pendingSendsData.entrySet()) {
528                 this.cleanupFailedSend(pendingSend.getValue(), failureException);
529             }
530 
531             this.pendingSendsData.clear();
532             this.pendingSends.clear();
533         }
534     }
535 
536     private void cleanupFailedSend(final SendWorkItem<DeliveryState> failedSend, final Throwable exception) {
537         failedSend.cancelTimeoutTask(false);
538         ExceptionUtil.completeExceptionally(failedSend.getWork(), exception, this, true);
539     }
540 
541     private static SenderLinkSettings getDefaultLinkProperties(String sendPath, String transferDestinationPath, MessagingFactory underlyingFactory, MessagingEntityType entityType) {
542         SenderLinkSettingsenderLinkSettings.html#SenderLinkSettings">SenderLinkSettings linkSettings = new SenderLinkSettings();
543         linkSettings.linkPath = sendPath;
544 
545         final Target target = new Target();
546         target.setAddress(sendPath);
547         linkSettings.target = target;
548         linkSettings.source = new Source();
549         linkSettings.settleMode = SenderSettleMode.UNSETTLED;
550         linkSettings.requiresAuthentication = true;
551 
552         Map<Symbol, Object> linkProperties = new HashMap<>();
553         // ServiceBus expects timeout to be of type unsignedint
554         linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(underlyingFactory.getOperationTimeout()).toMillis()));
555         if (entityType != null) {
556             linkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, entityType.getIntValue());
557         }
558         if (transferDestinationPath != null && !transferDestinationPath.isEmpty()) {
559             linkProperties.put(ClientConstants.LINK_TRANSFER_DESTINATION_PROPERTY, transferDestinationPath);
560         }
561 
562         linkSettings.linkProperties = linkProperties;
563 
564         return linkSettings;
565     }
566 
567     private void createSendLink(SenderLinkSettings linkSettings) {
568         TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath);
569         final Connection connection = this.underlyingFactory.getConnection();
570         final Session session = connection.session();
571         session.setOutgoingWindow(Integer.MAX_VALUE);
572         session.open();
573         BaseHandler.setHandler(session, new SessionHandler(sendPath));
574 
575         final Sender sender = session.sender(linkSettings.linkName);
576         sender.setTarget(linkSettings.target);
577         sender.setSource(linkSettings.source);
578         sender.setProperties(linkSettings.linkProperties);
579 
580         TRACE_LOGGER.debug("Send link settle mode '{}'", linkSettings.settleMode);
581         sender.setSenderSettleMode(linkSettings.settleMode);
582 
583         SendLinkHandler/SendLinkHandler.html#SendLinkHandler">SendLinkHandler handler = new SendLinkHandler(CoreMessageSender.this);
584         BaseHandler.setHandler(sender, handler);
585         sender.open();
586         this.sendLink = sender;
587         this.underlyingFactory.registerForConnectionError(this.sendLink);
588     }
589 
590     CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean retryOnFailure) {
591         if (this.getIsClosingOrClosed()) {
592             return CompletableFuture.completedFuture(null);
593         } else {
594             CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
595             CompletableFuture<Void> sasTokenFuture = sendTokenFuture.thenAccept((f) -> this.sasTokenRenewTimerFuture = f);
596 
597             if (this.transferDestinationPath != null && !this.transferDestinationPath.isEmpty()) {
598                 CompletableFuture<Void> transferSendTokenFuture = this.underlyingFactory.sendSecurityToken(this.transferSasTokenAudienceURI);
599                 return CompletableFuture.allOf(sasTokenFuture, transferSendTokenFuture);
600             }
601 
602             return sasTokenFuture;
603         }
604     }
605 
606     private void cancelSASTokenRenewTimer() {
607         if (this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) {
608             this.sasTokenRenewTimerFuture.cancel(true);
609             TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
610         }
611     }
612 
613     // TODO: consolidate common-code written for timeouts in Sender/Receiver
614     private void initializeLinkOpen(TimeoutTracker timeout) {
615         this.linkFirstOpen = new CompletableFuture<CoreMessageSender>();
616 
617         // timer to signal a timeout if exceeds the operationTimeout on MessagingFactory
618         Timer.schedule(
619             () -> {
620                 if (!CoreMessageSender.this.linkFirstOpen.isDone()) {
621                     CoreMessageSender.this.closeInternals(false);
622                     CoreMessageSender.this.setClosed();
623 
624                     Exception operationTimedout = new TimeoutException(
625                             String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.getSendPath(), ZonedDateTime.now().toString()),
626                             CoreMessageSender.this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)) ? CoreMessageSender.this.lastKnownLinkError : null);
627                     TRACE_LOGGER.warn(operationTimedout.getMessage());
628                     ExceptionUtil.completeExceptionally(CoreMessageSender.this.linkFirstOpen, operationTimedout, CoreMessageSender.this, true);
629                 }
630             },
631             timeout.remaining(),
632             TimerType.OneTimeRun);
633     }
634 
635     @Override
636     public ErrorContext getContext() {
637         final boolean isLinkOpened = this.linkFirstOpen != null && this.linkFirstOpen.isDone();
638         final String referenceId = this.sendLink != null && this.sendLink.getRemoteProperties() != null && this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)
639                 ? this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString()
640                 : ((this.sendLink != null) ? this.sendLink.getName() : null);
641 
642         SenderErrorContextenderErrorContext.html#SenderErrorContext">SenderErrorContext errorContext = new SenderErrorContext(
643                 this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null,
644                 this.sendPath,
645                 referenceId,
646                 isLinkOpened && this.sendLink != null ? this.sendLink.getCredit() : null);
647         return errorContext;
648     }
649 
650     @Override
651     public void onFlow(final int creditIssued) {
652         this.lastKnownLinkError = null;
653 
654         if (creditIssued <= 0) {
655             return;
656         }
657 
658         TRACE_LOGGER.debug("Received flow frame. path:{}, linkName:{}, remoteLinkCredit:{}, pendingSendsWaitingForCredit:{}, pendingSendsWaitingDelivery:{}",
659                 this.sendPath, this.sendLink.getName(), creditIssued, this.pendingSends.size(), this.pendingSendsData.size() - this.pendingSends.size());
660 
661         this.linkCredit = this.linkCredit + creditIssued;
662         this.sendWork.onEvent();
663     }
664 
665     private synchronized CompletableFuture<Void> ensureLinkIsOpen() {
666         // Send SAS token before opening a link as connection might have been closed and reopened
667         if (!(this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE)) {
668             if (this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone()) {
669                 TRACE_LOGGER.info("Recreating send link to '{}'", this.sendPath);
670                 this.retryPolicy.incrementRetryCount(CoreMessageSender.this.getClientId());
671                 this.sendLinkReopenFuture = new CompletableFuture<>();
672                 // Variable just to closed over by the scheduled runnable. The runnable should cancel only the closed over future, not the parent's instance variable which can change
673                 final CompletableFuture<Void> linkReopenFutureThatCanBeCancelled = this.sendLinkReopenFuture;
674                 Timer.schedule(
675                     () -> {
676                         if (!linkReopenFutureThatCanBeCancelled.isDone()) {
677                             CoreMessageSender.this.cancelSASTokenRenewTimer();
678                             Exception operationTimedout = new TimeoutException(
679                                     String.format(Locale.US, "%s operation on SendLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.sendPath, ZonedDateTime.now()));
680 
681                             TRACE_LOGGER.warn(operationTimedout.getMessage());
682                             linkReopenFutureThatCanBeCancelled.completeExceptionally(operationTimedout);
683                         }
684                     },
685                     CoreMessageSender.LINK_REOPEN_TIMEOUT,
686                     TimerType.OneTimeRun);
687                 this.cancelSASTokenRenewTimer();
688 
689                 CompletableFuture<Void> authenticationFuture = null;
690                 if (linkSettings.requiresAuthentication) {
691                     authenticationFuture = this.sendTokenAndSetRenewTimer(false);
692                 } else {
693                     authenticationFuture = CompletableFuture.completedFuture(null);
694                 }
695 
696                 authenticationFuture.handleAsync((v, sendTokenEx) -> {
697                     if (sendTokenEx != null) {
698                         Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
699                         TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.sendPath, cause);
700                         this.sendLinkReopenFuture.completeExceptionally(sendTokenEx);
701                         this.clearAllPendingSendsWithException(sendTokenEx);
702                     } else {
703                         try {
704                             this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
705                                 @Override
706                                 public void onEvent() {
707                                     CoreMessageSender.this.createSendLink(CoreMessageSender.this.linkSettings);
708                                 }
709                             });
710                         } catch (IOException ioEx) {
711                             this.sendLinkReopenFuture.completeExceptionally(ioEx);
712                         }
713                     }
714                     return null;
715                 }, MessagingFactory.INTERNAL_THREAD_POOL);
716             }
717 
718             return this.sendLinkReopenFuture;
719         } else {
720             return CompletableFuture.completedFuture(null);
721         }
722     }
723 
724     // actual send on the SenderLink should happen only in this method & should run on Reactor Thread
725     private void processSendWork() {
726         synchronized (this.pendingSendLock) {
727             if (!this.isSendLoopRunning) {
728                 this.isSendLoopRunning = true;
729             } else {
730                 return;
731             }
732         }
733 
734         TRACE_LOGGER.debug("Processing pending sends to '{}'. Available link credit '{}'", this.sendPath, this.linkCredit);
735         try {
736             if (!this.ensureLinkIsOpen().isDone()) {
737                 // Link recreation is pending
738                 return;
739             }
740 
741             final Sender sendLinkCurrent = this.sendLink;
742             while (sendLinkCurrent != null
743                     && sendLinkCurrent.getLocalState() == EndpointState.ACTIVE && sendLinkCurrent.getRemoteState() == EndpointState.ACTIVE
744                     && this.linkCredit > 0) {
745                 final WeightedDeliveryTag deliveryTag;
746                 final SendWorkItem<DeliveryState> sendData;
747                 synchronized (this.pendingSendLock) {
748                     deliveryTag = this.pendingSends.poll();
749                     if (deliveryTag == null) {
750                         TRACE_LOGGER.debug("There are no pending sends to '{}'.", this.sendPath);
751                         // Must be done inside this synchronized block
752                         this.isSendLoopRunning = false;
753                         break;
754                     } else {
755                         sendData = this.pendingSendsData.get(deliveryTag.getDeliveryTag());
756                         if (sendData == null) {
757                             TRACE_LOGGER.debug("SendData not found for this delivery. path:{}, linkName:{}, deliveryTag:{}", this.sendPath, this.sendLink.getName(), deliveryTag);
758                             continue;
759                         }
760                     }
761                 }
762 
763                 if (sendData.getWork() != null && sendData.getWork().isDone()) {
764                     // CoreSend could enqueue Sends into PendingSends Queue and can fail the SendCompletableFuture
765                     // (when It fails to schedule the ProcessSendWork on reactor Thread)
766                     this.pendingSendsData.remove(deliveryTag.getDeliveryTag());
767                     continue;
768                 }
769 
770                 Delivery delivery = null;
771                 boolean linkAdvance = false;
772                 int sentMsgSize = 0;
773                 Exception sendException = null;
774 
775                 try {
776                     delivery = sendLinkCurrent.delivery(deliveryTag.getDeliveryTag().getBytes(UTF_8));
777                     delivery.setMessageFormat(sendData.getMessageFormat());
778 
779                     TransactionContext transaction = sendData.getTransaction();
780                     if (transaction != TransactionContext.NULL_TXN) {
781                         TransactionalState transactionalState = new TransactionalState();
782                         transactionalState.setTxnId(new Binary(transaction.getTransactionId().array()));
783                         delivery.disposition(transactionalState);
784                     }
785 
786                     TRACE_LOGGER.debug("Sending message delivery '{}' to '{}'", deliveryTag.getDeliveryTag(), this.sendPath);
787                     sentMsgSize = sendLinkCurrent.send(sendData.getMessage(), 0, sendData.getEncodedMessageSize());
788                     assert sentMsgSize == sendData.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed";
789 
790                     linkAdvance = sendLinkCurrent.advance();
791                 } catch (Exception exception) {
792                     sendException = exception;
793                 }
794 
795                 if (linkAdvance) {
796                     this.linkCredit--;
797                     sendData.setWaitingForAck();
798                 } else {
799                     TRACE_LOGGER.warn("Sendlink advance failed. path:{}, linkName:{}, deliveryTag:{}, sentMessageSize:{}, payloadActualSiz:{}",
800                             this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize());
801 
802                     if (delivery != null) {
803                         delivery.free();
804                     }
805 
806                     Exception completionException = sendException != null ? new OperationCancelledException("Send operation failed. Please see cause for more details", sendException)
807                             : new OperationCancelledException(String.format(Locale.US, "Send operation failed while advancing delivery(tag: %s) on SendLink(path: %s).", this.sendPath, deliveryTag));
808                     AsyncUtil.completeFutureExceptionally(sendData.getWork(), completionException);
809                 }
810             }
811         } finally {
812             synchronized (this.pendingSendLock) {
813                 if (this.isSendLoopRunning) {
814                     this.isSendLoopRunning = false;
815                 }
816             }
817         }
818     }
819 
820     private void throwSenderTimeout(CompletableFuture<DeliveryState> pendingSendWork, Exception lastKnownException) {
821         Exception cause = lastKnownException;
822         if (lastKnownException == null && this.lastKnownLinkError != null) {
823             cause = this.lastKnownErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis())) ? this.lastKnownLinkError : null;
824         }
825 
826         boolean isClientSideTimeout = (cause == null || !(cause instanceof ServiceBusException));
827         ServiceBusException exception = isClientSideTimeout
828                 ? new TimeoutException(String.format(Locale.US, "%s %s %s.", CoreMessageSender.SEND_TIMED_OUT, " at ", ZonedDateTime.now(), cause))
829                 : (ServiceBusException) cause;
830 
831         TRACE_LOGGER.error("Send timed out", exception);
832         ExceptionUtil.completeExceptionally(pendingSendWork, exception, this, true);
833     }
834 
835     private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) {
836         // timer to signal a timeout if exceeds the operationTimeout on MessagingFactory
837         Timer.schedule(
838             () -> {
839                 if (!linkClose.isDone()) {
840                     Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Send Link(%s) timed out at %s", "Close", CoreMessageSender.this.sendLink.getName(), ZonedDateTime.now()));
841                     TRACE_LOGGER.warn(operationTimedout.getMessage());
842 
843                     ExceptionUtil.completeExceptionally(linkClose, operationTimedout, CoreMessageSender.this, true);
844                 }
845             },
846             timeout.remaining(),
847             TimerType.OneTimeRun);
848     }
849 
850     @Override
851     protected CompletableFuture<Void> onClose() {
852         this.closeInternals(true);
853         return this.linkClose;
854     }
855 
856     private void closeInternals(boolean waitForCloseCompletion) {
857         if (!this.getIsClosed()) {
858             if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) {
859                 try {
860                     this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
861 
862                         @Override
863                         public void onEvent() {
864                             if (CoreMessageSender.this.sendLink != null && CoreMessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED) {
865                                 TRACE_LOGGER.info("Closing send link to '{}'", CoreMessageSender.this.sendPath);
866                                 CoreMessageSender.this.underlyingFactory.deregisterForConnectionError(CoreMessageSender.this.sendLink);
867                                 CoreMessageSender.this.sendLink.close();
868                                 if (waitForCloseCompletion) {
869                                     CoreMessageSender.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageSender.this.operationTimeout));
870                                 } else {
871                                     AsyncUtil.completeFuture(CoreMessageSender.this.linkClose, null);
872                                 }
873                             }
874                         }
875                     });
876                 } catch (IOException e) {
877                     AsyncUtil.completeFutureExceptionally(this.linkClose, e);
878                 }
879             } else {
880                 AsyncUtil.completeFuture(this.linkClose, null);
881             }
882 
883             this.cancelSASTokenRenewTimer();
884             this.closeRequestResponseLink();
885         }
886     }
887 
888     private static class WeightedDeliveryTag {
889         private final String deliveryTag;
890         private final int priority;
891 
892         WeightedDeliveryTag(final String deliveryTag, final int priority) {
893             this.deliveryTag = deliveryTag;
894             this.priority = priority;
895         }
896 
897         public String getDeliveryTag() {
898             return this.deliveryTag;
899         }
900 
901         public int getPriority() {
902             return this.priority;
903         }
904     }
905 
906     private static class DeliveryTagComparator implements Comparator<WeightedDeliveryTag>, Serializable {
907         private static final long serialVersionUID = -7057500582037295636L;
908         @Override
909         public int compare(WeightedDeliveryTag deliveryTag0, WeightedDeliveryTag deliveryTag1) {
910             return deliveryTag1.getPriority() - deliveryTag0.getPriority();
911         }
912     }
913 
914     public CompletableFuture<long[]> scheduleMessageAsync(Message[] messages, TransactionContext transaction, Duration timeout) {
915         TRACE_LOGGER.debug("Sending '{}' scheduled message(s) to '{}'", messages.length, this.sendPath);
916         return this.createRequestResponseLink().thenComposeAsync((v) -> {
917             HashMap requestBodyMap = new HashMap();
918             Collection<HashMap> messageList = new LinkedList<HashMap>();
919             for (Message message : messages) {
920                 HashMap messageEntry = new HashMap();
921 
922                 Pair<byte[], Integer> encodedPair;
923                 try {
924                     encodedPair = Util.encodeMessageToOptimalSizeArray(message, this.maxMessageSize);
925                 } catch (PayloadSizeExceededException exception) {
926                     TRACE_LOGGER.error("Payload size of message exceeded limit", exception);
927                     final CompletableFuture<long[]> scheduleMessagesTask = new CompletableFuture<long[]>();
928                     scheduleMessagesTask.completeExceptionally(exception);
929                     return scheduleMessagesTask;
930                 }
931 
932                 messageEntry.put(ClientConstants.REQUEST_RESPONSE_MESSAGE, new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem()));
933                 messageEntry.put(ClientConstants.REQUEST_RESPONSE_MESSAGE_ID, message.getMessageId());
934 
935                 String sessionId = message.getGroupId();
936                 if (!StringUtil.isNullOrEmpty(sessionId)) {
937                     messageEntry.put(ClientConstants.REQUEST_RESPONSE_SESSION_ID, sessionId);
938                 }
939 
940                 Object partitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.PARTITIONKEYNAME));
941                 if (partitionKey != null && !((String) partitionKey).isEmpty()) {
942                     messageEntry.put(ClientConstants.REQUEST_RESPONSE_PARTITION_KEY, partitionKey);
943                 }
944 
945                 Object viaPartitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.VIAPARTITIONKEYNAME));
946                 if (viaPartitionKey != null && !((String) viaPartitionKey).isEmpty()) {
947                     messageEntry.put(ClientConstants.REQUEST_RESPONSE_VIA_PARTITION_KEY, viaPartitionKey);
948                 }
949 
950                 messageList.add(messageEntry);
951             }
952             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_MESSAGES, messageList);
953             Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, requestBodyMap, Util.adjustServerTimeout(timeout), this.sendLink.getName());
954             CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, transaction, timeout);
955             return responseFuture.thenComposeAsync((responseMessage) -> {
956                 CompletableFuture<long[]> returningFuture = new CompletableFuture<>();
957                 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
958                 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
959                     long[] sequenceNumbers = (long[]) RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS);
960                     if (TRACE_LOGGER.isDebugEnabled()) {
961                         TRACE_LOGGER.debug("Scheduled messages sent. Received sequence numbers '{}'", Arrays.toString(sequenceNumbers));
962                     }
963 
964                     returningFuture.complete(sequenceNumbers);
965                 } else {
966                     // error response
967                     Exception scheduleException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
968                     TRACE_LOGGER.error("Sending scheduled messages to '{}' failed.", this.sendPath, scheduleException);
969                     returningFuture.completeExceptionally(scheduleException);
970                 }
971                 return returningFuture;
972             }, MessagingFactory.INTERNAL_THREAD_POOL);
973         }, MessagingFactory.INTERNAL_THREAD_POOL);
974     }
975 
976     public CompletableFuture<Void> cancelScheduledMessageAsync(Long[] sequenceNumbers, Duration timeout) {
977         if (TRACE_LOGGER.isDebugEnabled()) {
978             TRACE_LOGGER.debug("Cancelling scheduled message(s) '{}' to '{}'", Arrays.toString(sequenceNumbers), this.sendPath);
979         }
980 
981         return this.createRequestResponseLink().thenComposeAsync((v) -> {
982             HashMap requestBodyMap = new HashMap();
983             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers);
984 
985             Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_CANCEL_CHEDULE_MESSAGE_OPERATION, requestBodyMap, Util.adjustServerTimeout(timeout), this.sendLink.getName());
986             CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, timeout);
987             return responseFuture.thenComposeAsync((responseMessage) -> {
988                 CompletableFuture<Void> returningFuture = new CompletableFuture<Void>();
989                 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
990                 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
991                     TRACE_LOGGER.debug("Cancelled scheduled messages in '{}'", this.sendPath);
992                     returningFuture.complete(null);
993                 } else {
994                     // error response
995                     Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
996                     TRACE_LOGGER.error("Cancelling scheduled messages in '{}' failed.", this.sendPath, failureException);
997                     returningFuture.completeExceptionally(failureException);
998                 }
999                 return returningFuture;
1000             }, MessagingFactory.INTERNAL_THREAD_POOL);
1001         }, MessagingFactory.INTERNAL_THREAD_POOL);
1002     }
1003 
1004     // In case we need to support peek on a topic
1005     public CompletableFuture<Collection<Message>> peekMessagesAsync(long fromSequenceNumber, int messageCount) {
1006         TRACE_LOGGER.debug("Peeking '{}' messages in '{}' from sequence number '{}'", messageCount, this.sendPath, fromSequenceNumber);
1007         return this.createRequestResponseLink().thenComposeAsync((v) -> CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, null, this.sendLink.getName()), MessagingFactory.INTERNAL_THREAD_POOL);
1008     }
1009 }