1
2
3
4 package com.microsoft.azure.servicebus.primitives;
5
6 import java.io.IOException;
7 import java.io.Serializable;
8 import java.time.Duration;
9 import java.time.Instant;
10 import java.time.ZonedDateTime;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Comparator;
14 import java.util.HashMap;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.Locale;
18 import java.util.Map;
19 import java.util.PriorityQueue;
20 import java.util.UUID;
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledFuture;
24
25 import com.microsoft.azure.servicebus.TransactionContext;
26 import org.apache.qpid.proton.Proton;
27 import org.apache.qpid.proton.amqp.Binary;
28 import org.apache.qpid.proton.amqp.Symbol;
29 import org.apache.qpid.proton.amqp.UnsignedInteger;
30 import org.apache.qpid.proton.amqp.messaging.Accepted;
31 import org.apache.qpid.proton.amqp.messaging.Data;
32 import org.apache.qpid.proton.amqp.messaging.Outcome;
33 import org.apache.qpid.proton.amqp.messaging.Rejected;
34 import org.apache.qpid.proton.amqp.messaging.Released;
35 import org.apache.qpid.proton.amqp.messaging.Source;
36 import org.apache.qpid.proton.amqp.messaging.Target;
37 import org.apache.qpid.proton.amqp.transaction.Declared;
38 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
39 import org.apache.qpid.proton.amqp.transport.DeliveryState;
40 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
41 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
42 import org.apache.qpid.proton.engine.BaseHandler;
43 import org.apache.qpid.proton.engine.Connection;
44 import org.apache.qpid.proton.engine.Delivery;
45 import org.apache.qpid.proton.engine.EndpointState;
46 import org.apache.qpid.proton.engine.Sender;
47 import org.apache.qpid.proton.engine.Session;
48 import org.apache.qpid.proton.engine.impl.DeliveryImpl;
49 import org.apache.qpid.proton.message.Message;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import com.microsoft.azure.servicebus.amqp.AmqpConstants;
54 import com.microsoft.azure.servicebus.amqp.DispatchHandler;
55 import com.microsoft.azure.servicebus.amqp.IAmqpSender;
56 import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
57 import com.microsoft.azure.servicebus.amqp.SessionHandler;
58
59 import static java.nio.charset.StandardCharsets.UTF_8;
60
61
62
63
64
65 public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErrorContextProvider {
66 private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageSender.class);
67 private static final String SEND_TIMED_OUT = "Send operation timed out";
68 private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5);
69
70 private final Object requestResonseLinkCreationLock = new Object();
71 private final MessagingFactory underlyingFactory;
72 private final String sendPath;
73 private final String sasTokenAudienceURI;
74 private final Duration operationTimeout;
75 private final RetryPolicy retryPolicy;
76 private final CompletableFuture<Void> linkClose;
77 private final Object pendingSendLock;
78 private final ConcurrentHashMap<String, SendWorkItem<DeliveryState>> pendingSendsData;
79 private final PriorityQueue<WeightedDeliveryTag> pendingSends;
80 private final DispatchHandler sendWork;
81 private final MessagingEntityType entityType;
82 private boolean isSendLoopRunning;
83
84 private Sender sendLink;
85 private RequestResponseLink requestResponseLink;
86 private CompletableFuture<CoreMessageSender> linkFirstOpen;
87 private int linkCredit;
88 private Exception lastKnownLinkError;
89 private Instant lastKnownErrorReportedAt;
90 private ScheduledFuture<?> sasTokenRenewTimerFuture;
91 private CompletableFuture<Void> requestResponseLinkCreationFuture;
92 private CompletableFuture<Void> sendLinkReopenFuture;
93 private SenderLinkSettings linkSettings;
94 private String transferDestinationPath;
95 private String transferSasTokenAudienceURI;
96 private boolean isSendVia;
97 private int maxMessageSize;
98
99 @Deprecated
100 public static CompletableFuture<CoreMessageSender> create(
101 final MessagingFactory factory,
102 final String clientId,
103 final String senderPath,
104 final String transferDestinationPath) {
105 return CoreMessageSender.create(factory, clientId, senderPath, transferDestinationPath, null);
106 }
107
108 public static CompletableFuture<CoreMessageSender> create(
109 final MessagingFactory factory,
110 final String clientId,
111 final String senderPath,
112 final String transferDestinationPath,
113 final MessagingEntityType entityType) {
114 return CoreMessageSender.create(factory, clientId, entityType, CoreMessageSender.getDefaultLinkProperties(senderPath, transferDestinationPath, factory, entityType));
115 }
116
117 static CompletableFuture<CoreMessageSender> create(
118 final MessagingFactory factory,
119 final String clientId,
120 final MessagingEntityType entityType,
121 final SenderLinkSettings linkSettings) {
122 TRACE_LOGGER.info("Creating core message sender to '{}'", linkSettings.linkPath);
123
124 final Connection connection = factory.getConnection();
125 final String sendLinkNamePrefix = "Sender".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
126 linkSettings.linkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer())
127 ? sendLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer())
128 : sendLinkNamePrefix;
129
130 final CoreMessageSenderes/CoreMessageSender.html#CoreMessageSender">CoreMessageSender msgSender = new CoreMessageSender(factory, clientId, entityType, linkSettings);
131 TimeoutTracker openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout());
132 msgSender.initializeLinkOpen(openLinkTracker);
133
134 CompletableFuture<Void> authenticationFuture = null;
135 if (linkSettings.requiresAuthentication) {
136 authenticationFuture = msgSender.sendTokenAndSetRenewTimer(false);
137 } else {
138 authenticationFuture = CompletableFuture.completedFuture(null);
139 }
140
141 authenticationFuture.handleAsync((v, sasTokenEx) -> {
142 if (sasTokenEx != null) {
143 Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
144 TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", msgSender.sendPath, cause);
145 msgSender.linkFirstOpen.completeExceptionally(cause);
146 } else {
147 try {
148 msgSender.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
149 @Override
150 public void onEvent() {
151 msgSender.createSendLink(msgSender.linkSettings);
152 }
153 });
154 } catch (IOException ioException) {
155 msgSender.cancelSASTokenRenewTimer();
156 msgSender.linkFirstOpen.completeExceptionally(new ServiceBusException(false, "Failed to create Sender, see cause for more details.", ioException));
157 }
158 }
159
160 return null;
161 }, MessagingFactory.INTERNAL_THREAD_POOL);
162
163 return msgSender.linkFirstOpen;
164 }
165
166 private CompletableFuture<Void> createRequestResponseLink() {
167 synchronized (this.requestResonseLinkCreationLock) {
168 if (this.requestResponseLinkCreationFuture == null) {
169 this.requestResponseLinkCreationFuture = new CompletableFuture<Void>();
170 this.underlyingFactory.obtainRequestResponseLinkAsync(this.sendPath, this.transferDestinationPath, this.entityType).handleAsync((rrlink, ex) -> {
171 if (ex == null) {
172 this.requestResponseLink = rrlink;
173 this.requestResponseLinkCreationFuture.complete(null);
174 } else {
175 Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex);
176 this.requestResponseLinkCreationFuture.completeExceptionally(cause);
177
178 synchronized (this.requestResonseLinkCreationLock) {
179 this.requestResponseLinkCreationFuture = null;
180 }
181 }
182 return null;
183 }, MessagingFactory.INTERNAL_THREAD_POOL);
184 }
185
186 return this.requestResponseLinkCreationFuture;
187 }
188 }
189
190 private void closeRequestResponseLink() {
191 synchronized (this.requestResonseLinkCreationLock) {
192 if (this.requestResponseLinkCreationFuture != null) {
193 this.requestResponseLinkCreationFuture.thenRun(() -> {
194 this.underlyingFactory.releaseRequestResponseLink(this.sendPath, this.transferDestinationPath);
195 this.requestResponseLink = null;
196 });
197 this.requestResponseLinkCreationFuture = null;
198 }
199 }
200 }
201
202 private CoreMessageSender(final MessagingFactory factory, final String sendLinkName, final MessagingEntityType entityType, final SenderLinkSettings linkSettings) {
203 super(sendLinkName);
204
205 this.sendPath = linkSettings.linkPath;
206 this.entityType = entityType;
207 if (linkSettings.linkProperties != null) {
208 String transferPath = (String) linkSettings.linkProperties.getOrDefault(ClientConstants.LINK_TRANSFER_DESTINATION_PROPERTY, null);
209 if (transferPath != null && !transferPath.isEmpty()) {
210 this.transferDestinationPath = transferPath;
211 this.isSendVia = true;
212 this.transferSasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), transferDestinationPath);
213 } else {
214
215 this.transferDestinationPath = null;
216 }
217 }
218
219 this.sasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), linkSettings.linkPath);
220 this.underlyingFactory = factory;
221 this.operationTimeout = factory.getOperationTimeout();
222 this.linkSettings = linkSettings;
223
224 this.lastKnownLinkError = null;
225 this.lastKnownErrorReportedAt = Instant.EPOCH;
226
227 this.retryPolicy = factory.getRetryPolicy();
228
229 this.pendingSendLock = new Object();
230 this.pendingSendsData = new ConcurrentHashMap<String, SendWorkItem<DeliveryState>>();
231 this.pendingSends = new PriorityQueue<WeightedDeliveryTag>(1000, new DeliveryTagComparator());
232 this.linkCredit = 0;
233
234 this.linkClose = new CompletableFuture<Void>();
235 this.sendLinkReopenFuture = null;
236 this.isSendLoopRunning = false;
237 this.sendWork = new DispatchHandler() {
238 @Override
239 public void onEvent() {
240 CoreMessageSender.this.processSendWork();
241 }
242 };
243 }
244
245 public String getSendPath() {
246 return this.sendPath;
247 }
248
249 private static String generateRandomDeliveryTag() {
250 return UUID.randomUUID().toString().replace("-", StringUtil.EMPTY);
251 }
252
253 CompletableFuture<DeliveryState> sendCoreAsync(
254 final byte[] bytes,
255 final int arrayOffset,
256 final int messageFormat,
257 final TransactionContext transaction) {
258 this.throwIfClosed(this.lastKnownLinkError);
259 TRACE_LOGGER.debug("Sending message to '{}'", this.sendPath);
260 String deliveryTag = CoreMessageSender.generateRandomDeliveryTag();
261 CompletableFuture<DeliveryState> onSendFuture = new CompletableFuture<DeliveryState>();
262 SendWorkItem<DeliveryState> sendWorkItem = new SendWorkItem<DeliveryState>(bytes, arrayOffset, messageFormat, deliveryTag, transaction, onSendFuture, this.operationTimeout);
263 this.enlistSendRequest(deliveryTag, sendWorkItem, false);
264 this.scheduleSendTimeout(sendWorkItem);
265 return onSendFuture;
266 }
267
268 private void scheduleSendTimeout(SendWorkItem<DeliveryState> sendWorkItem) {
269
270 ScheduledFuture<?> timeoutTask = Timer.schedule(() -> {
271 if (!sendWorkItem.getWork().isDone()) {
272 TRACE_LOGGER.warn("Delivery '{}' to '{}' did not receive ack from service. Throwing timeout.", sendWorkItem.getDeliveryTag(), CoreMessageSender.this.sendPath);
273 CoreMessageSender.this.pendingSendsData.remove(sendWorkItem.getDeliveryTag());
274 CoreMessageSender.this.throwSenderTimeout(sendWorkItem.getWork(), sendWorkItem.getLastKnownException());
275
276 }
277 },
278 sendWorkItem.getTimeoutTracker().remaining(),
279 TimerType.OneTimeRun);
280 sendWorkItem.setTimeoutTask(timeoutTask);
281 }
282
283 private void enlistSendRequest(String deliveryTag, SendWorkItem<DeliveryState> sendWorkItem, boolean isRetrySend) {
284 synchronized (this.pendingSendLock) {
285 this.pendingSendsData.put(deliveryTag, sendWorkItem);
286 this.pendingSends.offer(new WeightedDeliveryTag(deliveryTag, isRetrySend ? 1 : 0));
287
288 if (!this.isSendLoopRunning) {
289 try {
290 this.underlyingFactory.scheduleOnReactorThread(this.sendWork);
291 } catch (IOException ioException) {
292 AsyncUtil.completeFutureExceptionally(sendWorkItem.getWork(), new ServiceBusException(false, "Send failed while dispatching to Reactor, see cause for more details.", ioException));
293 }
294 }
295 }
296 }
297
298 private void reSendAsync(String deliveryTag, SendWorkItem<DeliveryState> retryingSendWorkItem, boolean reuseDeliveryTag) {
299 if (!retryingSendWorkItem.getWork().isDone() && retryingSendWorkItem.cancelTimeoutTask(false)) {
300 Duration remainingTime = retryingSendWorkItem.getTimeoutTracker().remaining();
301 if (!remainingTime.isNegative() && !remainingTime.isZero()) {
302 if (!reuseDeliveryTag) {
303 deliveryTag = CoreMessageSender.generateRandomDeliveryTag();
304 retryingSendWorkItem.setDeliveryTag(deliveryTag);
305 }
306
307 this.enlistSendRequest(deliveryTag, retryingSendWorkItem, true);
308 this.scheduleSendTimeout(retryingSendWorkItem);
309 }
310 }
311 }
312
313 public CompletableFuture<Void> sendAsync(final Iterable<Message> messages, TransactionContext transaction) {
314 if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
315 throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
316 }
317
318 TRACE_LOGGER.debug("Sending a batch of messages to '{}'", this.sendPath);
319
320 Message firstMessage = messages.iterator().next();
321 if (IteratorUtil.sizeEquals(messages, 1)) {
322 return this.sendAsync(firstMessage, transaction);
323 }
324
325
326
327 Message batchMessage = Proton.message();
328 batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());
329
330 byte[] bytes = null;
331 int byteArrayOffset = 0;
332 try {
333 Pair<byte[], Integer> encodedPair = Util.encodeMessageToMaxSizeArray(batchMessage, this.maxMessageSize);
334 bytes = encodedPair.getFirstItem();
335 byteArrayOffset = encodedPair.getSecondItem();
336
337 for (Message amqpMessage: messages) {
338 Message messageWrappedByData = Proton.message();
339 encodedPair = Util.encodeMessageToOptimalSizeArray(amqpMessage, this.maxMessageSize);
340 messageWrappedByData.setBody(new Data(new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem())));
341
342 int encodedSize = Util.encodeMessageToCustomArray(messageWrappedByData, bytes, byteArrayOffset, this.maxMessageSize - byteArrayOffset - 1);
343 byteArrayOffset = byteArrayOffset + encodedSize;
344 }
345 } catch (PayloadSizeExceededException ex) {
346 TRACE_LOGGER.error("Payload size of batch of messages exceeded limit", ex);
347 final CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
348 sendTask.completeExceptionally(ex);
349 return sendTask;
350 }
351
352 return this.sendCoreAsync(bytes, byteArrayOffset, AmqpConstants.AMQP_BATCH_MESSAGE_FORMAT, transaction).thenAccept((x) -> { });
353 }
354
355 public CompletableFuture<Void> sendAsync(Message msg, TransactionContext transaction) {
356 return this.sendAndReturnDeliveryStateAsync(msg, transaction).thenAccept((x) -> { });
357 }
358
359
360 CompletableFuture<DeliveryState> sendAndReturnDeliveryStateAsync(Message msg, TransactionContext transaction) {
361 try {
362 Pair<byte[], Integer> encodedPair = Util.encodeMessageToOptimalSizeArray(msg, this.maxMessageSize);
363 return this.sendCoreAsync(encodedPair.getFirstItem(), encodedPair.getSecondItem(), DeliveryImpl.DEFAULT_MESSAGE_FORMAT, transaction);
364 } catch (PayloadSizeExceededException exception) {
365 TRACE_LOGGER.error("Payload size of message exceeded limit", exception);
366 final CompletableFuture<DeliveryState> sendTask = new CompletableFuture<DeliveryState>();
367 sendTask.completeExceptionally(exception);
368 return sendTask;
369 }
370 }
371
372 @Override
373 public void onOpenComplete(Exception completionException) {
374 if (completionException == null) {
375 this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
376 this.lastKnownLinkError = null;
377 this.retryPolicy.resetRetryCount(this.getClientId());
378
379 if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
380 AsyncUtil.completeFuture(this.sendLinkReopenFuture, null);
381 }
382
383 if (!this.linkFirstOpen.isDone()) {
384 TRACE_LOGGER.info("Opened send link to '{}'", this.sendPath);
385 AsyncUtil.completeFuture(this.linkFirstOpen, this);
386 } else {
387 synchronized (this.pendingSendLock) {
388 if (!this.pendingSendsData.isEmpty()) {
389 LinkedList<String> unacknowledgedSends = new LinkedList<String>();
390 unacknowledgedSends.addAll(this.pendingSendsData.keySet());
391
392 if (unacknowledgedSends.size() > 0) {
393 Iterator<String> reverseReader = unacknowledgedSends.iterator();
394 while (reverseReader.hasNext()) {
395 String unacknowledgedSend = reverseReader.next();
396 if (this.pendingSendsData.get(unacknowledgedSend).isWaitingForAck()) {
397 this.pendingSends.offer(new WeightedDeliveryTag(unacknowledgedSend, 1));
398 }
399 }
400 }
401
402 unacknowledgedSends.clear();
403 }
404 }
405 }
406 } else {
407 this.cancelSASTokenRenewTimer();
408 if (!this.linkFirstOpen.isDone()) {
409 TRACE_LOGGER.error("Opening send link '{}' to '{}' failed", this.sendLink.getName(), this.sendPath, completionException);
410 this.setClosed();
411 ExceptionUtil.completeExceptionally(this.linkFirstOpen, completionException, this, true);
412 }
413
414 if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
415 TRACE_LOGGER.warn("Opening send link '{}' to '{}' failed", this.sendLink.getName(), this.sendPath, completionException);
416 AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, completionException);
417 }
418 }
419 }
420
421 @Override
422 public void onClose(ErrorCondition condition) {
423 Exception completionException = condition != null ? ExceptionUtil.toException(condition)
424 : new ServiceBusException(ClientConstants.DEFAULT_IS_TRANSIENT,
425 "The entity has been closed due to transient failures (underlying link closed), please retry the operation.");
426 this.onError(completionException);
427 }
428
429 @Override
430 public void onError(Exception completionException) {
431 this.linkCredit = 0;
432 if (this.getIsClosingOrClosed()) {
433 Exception failureException = completionException == null
434 ? new OperationCancelledException("Send cancelled as the Sender instance is Closed before the sendOperation completed.")
435 : completionException;
436 this.clearAllPendingSendsWithException(failureException);
437
438 TRACE_LOGGER.info("Send link to '{}' closed", this.sendPath);
439 AsyncUtil.completeFuture(this.linkClose, null);
440 return;
441 } else {
442 this.underlyingFactory.deregisterForConnectionError(this.sendLink);
443 this.lastKnownLinkError = completionException;
444 this.lastKnownErrorReportedAt = Instant.now();
445
446 this.onOpenComplete(completionException);
447
448 if (completionException != null
449 && (!(completionException instanceof ServiceBusException./../com/microsoft/azure/servicebus/primitives/ServiceBusException.html#ServiceBusException">ServiceBusException) || !((ServiceBusException) completionException).getIsTransient())) {
450 TRACE_LOGGER.warn("Send link '{}' to '{}' closed. Failing all pending send requests.", this.sendLink.getName(), this.sendPath);
451 this.clearAllPendingSendsWithException(completionException);
452 } else {
453 final Map.Entry<String, SendWorkItem<DeliveryState>> pendingSendEntry = IteratorUtil.getFirst(this.pendingSendsData.entrySet());
454 if (pendingSendEntry != null && pendingSendEntry.getValue() != null) {
455 final TimeoutTracker tracker = pendingSendEntry.getValue().getTimeoutTracker();
456 if (tracker != null) {
457 final Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), completionException, tracker.remaining());
458 if (nextRetryInterval != null) {
459 TRACE_LOGGER.warn("Send link '{}' to '{}' closed. Will retry link creation after '{}'.", this.sendLink.getName(), this.sendPath, nextRetryInterval);
460 Timer.schedule(() -> CoreMessageSender.this.ensureLinkIsOpen(), nextRetryInterval, TimerType.OneTimeRun);
461 }
462 }
463 }
464 }
465 }
466 }
467
468 @Override
469 public void onSendComplete(final Delivery delivery) {
470 DeliveryState outcome = delivery.getRemoteState();
471 final String deliveryTag = new String(delivery.getTag(), UTF_8);
472
473 TRACE_LOGGER.debug("Received ack for delivery. path:{}, linkName:{}, deliveryTag:{}, outcome:{}", CoreMessageSender.this.sendPath, this.sendLink.getName(), deliveryTag, outcome);
474 final SendWorkItem<DeliveryState> pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag);
475
476 if (pendingSendWorkItem != null) {
477 if (outcome instanceof TransactionalState) {
478 TRACE_LOGGER.trace("State of delivery is Transactional, retrieving outcome: {}", outcome);
479 Outcome transactionalOutcome = ((TransactionalState) outcome).getOutcome();
480 if (transactionalOutcome instanceof DeliveryState) {
481 outcome = (DeliveryState) transactionalOutcome;
482 } else {
483 this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, "Unknown delivery state: " + outcome.toString()));
484 return;
485 }
486 }
487
488 if (outcome instanceof Accepted) {
489 this.lastKnownLinkError = null;
490 this.retryPolicy.resetRetryCount(this.getClientId());
491
492 pendingSendWorkItem.cancelTimeoutTask(false);
493 AsyncUtil.completeFuture(pendingSendWorkItem.getWork(), outcome);
494 } else if (outcome instanceof Declared) {
495 AsyncUtil.completeFuture(pendingSendWorkItem.getWork(), outcome);
496 } else if (outcome instanceof Rejected) {
497 Rejected rejected = (Rejected) outcome;
498 ErrorCondition error = rejected.getError();
499 Exception exception = ExceptionUtil.toException(error);
500
501 if (ExceptionUtil.isGeneralError(error.getCondition())) {
502 this.lastKnownLinkError = exception;
503 this.lastKnownErrorReportedAt = Instant.now();
504 }
505
506 Duration retryInterval = this.retryPolicy.getNextRetryInterval(
507 this.getClientId(), exception, pendingSendWorkItem.getTimeoutTracker().remaining());
508 if (retryInterval == null) {
509 this.cleanupFailedSend(pendingSendWorkItem, exception);
510 } else {
511 TRACE_LOGGER.warn("Send failed for delivery '{}'. Will retry after '{}'", deliveryTag, retryInterval);
512 pendingSendWorkItem.setLastKnownException(exception);
513 Timer.schedule(() -> CoreMessageSender.this.reSendAsync(deliveryTag, pendingSendWorkItem, false), retryInterval, TimerType.OneTimeRun);
514 }
515 } else if (outcome instanceof Released) {
516 this.cleanupFailedSend(pendingSendWorkItem, new OperationCancelledException(outcome.toString()));
517 } else {
518 this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, outcome.toString()));
519 }
520 } else {
521 TRACE_LOGGER.warn("Delivery mismatch. path:{}, linkName:{}, delivery:{}", this.sendPath, this.sendLink.getName(), deliveryTag);
522 }
523 }
524
525 private void clearAllPendingSendsWithException(Throwable failureException) {
526 synchronized (this.pendingSendLock) {
527 for (Map.Entry<String, SendWorkItem<DeliveryState>> pendingSend: this.pendingSendsData.entrySet()) {
528 this.cleanupFailedSend(pendingSend.getValue(), failureException);
529 }
530
531 this.pendingSendsData.clear();
532 this.pendingSends.clear();
533 }
534 }
535
536 private void cleanupFailedSend(final SendWorkItem<DeliveryState> failedSend, final Throwable exception) {
537 failedSend.cancelTimeoutTask(false);
538 ExceptionUtil.completeExceptionally(failedSend.getWork(), exception, this, true);
539 }
540
541 private static SenderLinkSettings getDefaultLinkProperties(String sendPath, String transferDestinationPath, MessagingFactory underlyingFactory, MessagingEntityType entityType) {
542 SenderLinkSettingsenderLinkSettings.html#SenderLinkSettings">SenderLinkSettings linkSettings = new SenderLinkSettings();
543 linkSettings.linkPath = sendPath;
544
545 final Target target = new Target();
546 target.setAddress(sendPath);
547 linkSettings.target = target;
548 linkSettings.source = new Source();
549 linkSettings.settleMode = SenderSettleMode.UNSETTLED;
550 linkSettings.requiresAuthentication = true;
551
552 Map<Symbol, Object> linkProperties = new HashMap<>();
553
554 linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(underlyingFactory.getOperationTimeout()).toMillis()));
555 if (entityType != null) {
556 linkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, entityType.getIntValue());
557 }
558 if (transferDestinationPath != null && !transferDestinationPath.isEmpty()) {
559 linkProperties.put(ClientConstants.LINK_TRANSFER_DESTINATION_PROPERTY, transferDestinationPath);
560 }
561
562 linkSettings.linkProperties = linkProperties;
563
564 return linkSettings;
565 }
566
567 private void createSendLink(SenderLinkSettings linkSettings) {
568 TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath);
569 final Connection connection = this.underlyingFactory.getConnection();
570 final Session session = connection.session();
571 session.setOutgoingWindow(Integer.MAX_VALUE);
572 session.open();
573 BaseHandler.setHandler(session, new SessionHandler(sendPath));
574
575 final Sender sender = session.sender(linkSettings.linkName);
576 sender.setTarget(linkSettings.target);
577 sender.setSource(linkSettings.source);
578 sender.setProperties(linkSettings.linkProperties);
579
580 TRACE_LOGGER.debug("Send link settle mode '{}'", linkSettings.settleMode);
581 sender.setSenderSettleMode(linkSettings.settleMode);
582
583 SendLinkHandler/SendLinkHandler.html#SendLinkHandler">SendLinkHandler handler = new SendLinkHandler(CoreMessageSender.this);
584 BaseHandler.setHandler(sender, handler);
585 sender.open();
586 this.sendLink = sender;
587 this.underlyingFactory.registerForConnectionError(this.sendLink);
588 }
589
590 CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean retryOnFailure) {
591 if (this.getIsClosingOrClosed()) {
592 return CompletableFuture.completedFuture(null);
593 } else {
594 CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
595 CompletableFuture<Void> sasTokenFuture = sendTokenFuture.thenAccept((f) -> this.sasTokenRenewTimerFuture = f);
596
597 if (this.transferDestinationPath != null && !this.transferDestinationPath.isEmpty()) {
598 CompletableFuture<Void> transferSendTokenFuture = this.underlyingFactory.sendSecurityToken(this.transferSasTokenAudienceURI);
599 return CompletableFuture.allOf(sasTokenFuture, transferSendTokenFuture);
600 }
601
602 return sasTokenFuture;
603 }
604 }
605
606 private void cancelSASTokenRenewTimer() {
607 if (this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) {
608 this.sasTokenRenewTimerFuture.cancel(true);
609 TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
610 }
611 }
612
613
614 private void initializeLinkOpen(TimeoutTracker timeout) {
615 this.linkFirstOpen = new CompletableFuture<CoreMessageSender>();
616
617
618 Timer.schedule(
619 () -> {
620 if (!CoreMessageSender.this.linkFirstOpen.isDone()) {
621 CoreMessageSender.this.closeInternals(false);
622 CoreMessageSender.this.setClosed();
623
624 Exception operationTimedout = new TimeoutException(
625 String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.getSendPath(), ZonedDateTime.now().toString()),
626 CoreMessageSender.this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)) ? CoreMessageSender.this.lastKnownLinkError : null);
627 TRACE_LOGGER.warn(operationTimedout.getMessage());
628 ExceptionUtil.completeExceptionally(CoreMessageSender.this.linkFirstOpen, operationTimedout, CoreMessageSender.this, true);
629 }
630 },
631 timeout.remaining(),
632 TimerType.OneTimeRun);
633 }
634
635 @Override
636 public ErrorContext getContext() {
637 final boolean isLinkOpened = this.linkFirstOpen != null && this.linkFirstOpen.isDone();
638 final String referenceId = this.sendLink != null && this.sendLink.getRemoteProperties() != null && this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)
639 ? this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString()
640 : ((this.sendLink != null) ? this.sendLink.getName() : null);
641
642 SenderErrorContextenderErrorContext.html#SenderErrorContext">SenderErrorContext errorContext = new SenderErrorContext(
643 this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null,
644 this.sendPath,
645 referenceId,
646 isLinkOpened && this.sendLink != null ? this.sendLink.getCredit() : null);
647 return errorContext;
648 }
649
650 @Override
651 public void onFlow(final int creditIssued) {
652 this.lastKnownLinkError = null;
653
654 if (creditIssued <= 0) {
655 return;
656 }
657
658 TRACE_LOGGER.debug("Received flow frame. path:{}, linkName:{}, remoteLinkCredit:{}, pendingSendsWaitingForCredit:{}, pendingSendsWaitingDelivery:{}",
659 this.sendPath, this.sendLink.getName(), creditIssued, this.pendingSends.size(), this.pendingSendsData.size() - this.pendingSends.size());
660
661 this.linkCredit = this.linkCredit + creditIssued;
662 this.sendWork.onEvent();
663 }
664
665 private synchronized CompletableFuture<Void> ensureLinkIsOpen() {
666
667 if (!(this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE)) {
668 if (this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone()) {
669 TRACE_LOGGER.info("Recreating send link to '{}'", this.sendPath);
670 this.retryPolicy.incrementRetryCount(CoreMessageSender.this.getClientId());
671 this.sendLinkReopenFuture = new CompletableFuture<>();
672
673 final CompletableFuture<Void> linkReopenFutureThatCanBeCancelled = this.sendLinkReopenFuture;
674 Timer.schedule(
675 () -> {
676 if (!linkReopenFutureThatCanBeCancelled.isDone()) {
677 CoreMessageSender.this.cancelSASTokenRenewTimer();
678 Exception operationTimedout = new TimeoutException(
679 String.format(Locale.US, "%s operation on SendLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.sendPath, ZonedDateTime.now()));
680
681 TRACE_LOGGER.warn(operationTimedout.getMessage());
682 linkReopenFutureThatCanBeCancelled.completeExceptionally(operationTimedout);
683 }
684 },
685 CoreMessageSender.LINK_REOPEN_TIMEOUT,
686 TimerType.OneTimeRun);
687 this.cancelSASTokenRenewTimer();
688
689 CompletableFuture<Void> authenticationFuture = null;
690 if (linkSettings.requiresAuthentication) {
691 authenticationFuture = this.sendTokenAndSetRenewTimer(false);
692 } else {
693 authenticationFuture = CompletableFuture.completedFuture(null);
694 }
695
696 authenticationFuture.handleAsync((v, sendTokenEx) -> {
697 if (sendTokenEx != null) {
698 Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
699 TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.sendPath, cause);
700 this.sendLinkReopenFuture.completeExceptionally(sendTokenEx);
701 this.clearAllPendingSendsWithException(sendTokenEx);
702 } else {
703 try {
704 this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
705 @Override
706 public void onEvent() {
707 CoreMessageSender.this.createSendLink(CoreMessageSender.this.linkSettings);
708 }
709 });
710 } catch (IOException ioEx) {
711 this.sendLinkReopenFuture.completeExceptionally(ioEx);
712 }
713 }
714 return null;
715 }, MessagingFactory.INTERNAL_THREAD_POOL);
716 }
717
718 return this.sendLinkReopenFuture;
719 } else {
720 return CompletableFuture.completedFuture(null);
721 }
722 }
723
724
725 private void processSendWork() {
726 synchronized (this.pendingSendLock) {
727 if (!this.isSendLoopRunning) {
728 this.isSendLoopRunning = true;
729 } else {
730 return;
731 }
732 }
733
734 TRACE_LOGGER.debug("Processing pending sends to '{}'. Available link credit '{}'", this.sendPath, this.linkCredit);
735 try {
736 if (!this.ensureLinkIsOpen().isDone()) {
737
738 return;
739 }
740
741 final Sender sendLinkCurrent = this.sendLink;
742 while (sendLinkCurrent != null
743 && sendLinkCurrent.getLocalState() == EndpointState.ACTIVE && sendLinkCurrent.getRemoteState() == EndpointState.ACTIVE
744 && this.linkCredit > 0) {
745 final WeightedDeliveryTag deliveryTag;
746 final SendWorkItem<DeliveryState> sendData;
747 synchronized (this.pendingSendLock) {
748 deliveryTag = this.pendingSends.poll();
749 if (deliveryTag == null) {
750 TRACE_LOGGER.debug("There are no pending sends to '{}'.", this.sendPath);
751
752 this.isSendLoopRunning = false;
753 break;
754 } else {
755 sendData = this.pendingSendsData.get(deliveryTag.getDeliveryTag());
756 if (sendData == null) {
757 TRACE_LOGGER.debug("SendData not found for this delivery. path:{}, linkName:{}, deliveryTag:{}", this.sendPath, this.sendLink.getName(), deliveryTag);
758 continue;
759 }
760 }
761 }
762
763 if (sendData.getWork() != null && sendData.getWork().isDone()) {
764
765
766 this.pendingSendsData.remove(deliveryTag.getDeliveryTag());
767 continue;
768 }
769
770 Delivery delivery = null;
771 boolean linkAdvance = false;
772 int sentMsgSize = 0;
773 Exception sendException = null;
774
775 try {
776 delivery = sendLinkCurrent.delivery(deliveryTag.getDeliveryTag().getBytes(UTF_8));
777 delivery.setMessageFormat(sendData.getMessageFormat());
778
779 TransactionContext transaction = sendData.getTransaction();
780 if (transaction != TransactionContext.NULL_TXN) {
781 TransactionalState transactionalState = new TransactionalState();
782 transactionalState.setTxnId(new Binary(transaction.getTransactionId().array()));
783 delivery.disposition(transactionalState);
784 }
785
786 TRACE_LOGGER.debug("Sending message delivery '{}' to '{}'", deliveryTag.getDeliveryTag(), this.sendPath);
787 sentMsgSize = sendLinkCurrent.send(sendData.getMessage(), 0, sendData.getEncodedMessageSize());
788 assert sentMsgSize == sendData.getEncodedMessageSize() : "Contract of the ProtonJ library for Sender.Send API changed";
789
790 linkAdvance = sendLinkCurrent.advance();
791 } catch (Exception exception) {
792 sendException = exception;
793 }
794
795 if (linkAdvance) {
796 this.linkCredit--;
797 sendData.setWaitingForAck();
798 } else {
799 TRACE_LOGGER.warn("Sendlink advance failed. path:{}, linkName:{}, deliveryTag:{}, sentMessageSize:{}, payloadActualSiz:{}",
800 this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize());
801
802 if (delivery != null) {
803 delivery.free();
804 }
805
806 Exception completionException = sendException != null ? new OperationCancelledException("Send operation failed. Please see cause for more details", sendException)
807 : new OperationCancelledException(String.format(Locale.US, "Send operation failed while advancing delivery(tag: %s) on SendLink(path: %s).", this.sendPath, deliveryTag));
808 AsyncUtil.completeFutureExceptionally(sendData.getWork(), completionException);
809 }
810 }
811 } finally {
812 synchronized (this.pendingSendLock) {
813 if (this.isSendLoopRunning) {
814 this.isSendLoopRunning = false;
815 }
816 }
817 }
818 }
819
820 private void throwSenderTimeout(CompletableFuture<DeliveryState> pendingSendWork, Exception lastKnownException) {
821 Exception cause = lastKnownException;
822 if (lastKnownException == null && this.lastKnownLinkError != null) {
823 cause = this.lastKnownErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis())) ? this.lastKnownLinkError : null;
824 }
825
826 boolean isClientSideTimeout = (cause == null || !(cause instanceof ServiceBusException));
827 ServiceBusException exception = isClientSideTimeout
828 ? new TimeoutException(String.format(Locale.US, "%s %s %s.", CoreMessageSender.SEND_TIMED_OUT, " at ", ZonedDateTime.now(), cause))
829 : (ServiceBusException) cause;
830
831 TRACE_LOGGER.error("Send timed out", exception);
832 ExceptionUtil.completeExceptionally(pendingSendWork, exception, this, true);
833 }
834
835 private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) {
836
837 Timer.schedule(
838 () -> {
839 if (!linkClose.isDone()) {
840 Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Send Link(%s) timed out at %s", "Close", CoreMessageSender.this.sendLink.getName(), ZonedDateTime.now()));
841 TRACE_LOGGER.warn(operationTimedout.getMessage());
842
843 ExceptionUtil.completeExceptionally(linkClose, operationTimedout, CoreMessageSender.this, true);
844 }
845 },
846 timeout.remaining(),
847 TimerType.OneTimeRun);
848 }
849
850 @Override
851 protected CompletableFuture<Void> onClose() {
852 this.closeInternals(true);
853 return this.linkClose;
854 }
855
856 private void closeInternals(boolean waitForCloseCompletion) {
857 if (!this.getIsClosed()) {
858 if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) {
859 try {
860 this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
861
862 @Override
863 public void onEvent() {
864 if (CoreMessageSender.this.sendLink != null && CoreMessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED) {
865 TRACE_LOGGER.info("Closing send link to '{}'", CoreMessageSender.this.sendPath);
866 CoreMessageSender.this.underlyingFactory.deregisterForConnectionError(CoreMessageSender.this.sendLink);
867 CoreMessageSender.this.sendLink.close();
868 if (waitForCloseCompletion) {
869 CoreMessageSender.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageSender.this.operationTimeout));
870 } else {
871 AsyncUtil.completeFuture(CoreMessageSender.this.linkClose, null);
872 }
873 }
874 }
875 });
876 } catch (IOException e) {
877 AsyncUtil.completeFutureExceptionally(this.linkClose, e);
878 }
879 } else {
880 AsyncUtil.completeFuture(this.linkClose, null);
881 }
882
883 this.cancelSASTokenRenewTimer();
884 this.closeRequestResponseLink();
885 }
886 }
887
888 private static class WeightedDeliveryTag {
889 private final String deliveryTag;
890 private final int priority;
891
892 WeightedDeliveryTag(final String deliveryTag, final int priority) {
893 this.deliveryTag = deliveryTag;
894 this.priority = priority;
895 }
896
897 public String getDeliveryTag() {
898 return this.deliveryTag;
899 }
900
901 public int getPriority() {
902 return this.priority;
903 }
904 }
905
906 private static class DeliveryTagComparator implements Comparator<WeightedDeliveryTag>, Serializable {
907 private static final long serialVersionUID = -7057500582037295636L;
908 @Override
909 public int compare(WeightedDeliveryTag deliveryTag0, WeightedDeliveryTag deliveryTag1) {
910 return deliveryTag1.getPriority() - deliveryTag0.getPriority();
911 }
912 }
913
914 public CompletableFuture<long[]> scheduleMessageAsync(Message[] messages, TransactionContext transaction, Duration timeout) {
915 TRACE_LOGGER.debug("Sending '{}' scheduled message(s) to '{}'", messages.length, this.sendPath);
916 return this.createRequestResponseLink().thenComposeAsync((v) -> {
917 HashMap requestBodyMap = new HashMap();
918 Collection<HashMap> messageList = new LinkedList<HashMap>();
919 for (Message message : messages) {
920 HashMap messageEntry = new HashMap();
921
922 Pair<byte[], Integer> encodedPair;
923 try {
924 encodedPair = Util.encodeMessageToOptimalSizeArray(message, this.maxMessageSize);
925 } catch (PayloadSizeExceededException exception) {
926 TRACE_LOGGER.error("Payload size of message exceeded limit", exception);
927 final CompletableFuture<long[]> scheduleMessagesTask = new CompletableFuture<long[]>();
928 scheduleMessagesTask.completeExceptionally(exception);
929 return scheduleMessagesTask;
930 }
931
932 messageEntry.put(ClientConstants.REQUEST_RESPONSE_MESSAGE, new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem()));
933 messageEntry.put(ClientConstants.REQUEST_RESPONSE_MESSAGE_ID, message.getMessageId());
934
935 String sessionId = message.getGroupId();
936 if (!StringUtil.isNullOrEmpty(sessionId)) {
937 messageEntry.put(ClientConstants.REQUEST_RESPONSE_SESSION_ID, sessionId);
938 }
939
940 Object partitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.PARTITIONKEYNAME));
941 if (partitionKey != null && !((String) partitionKey).isEmpty()) {
942 messageEntry.put(ClientConstants.REQUEST_RESPONSE_PARTITION_KEY, partitionKey);
943 }
944
945 Object viaPartitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.VIAPARTITIONKEYNAME));
946 if (viaPartitionKey != null && !((String) viaPartitionKey).isEmpty()) {
947 messageEntry.put(ClientConstants.REQUEST_RESPONSE_VIA_PARTITION_KEY, viaPartitionKey);
948 }
949
950 messageList.add(messageEntry);
951 }
952 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_MESSAGES, messageList);
953 Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, requestBodyMap, Util.adjustServerTimeout(timeout), this.sendLink.getName());
954 CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, transaction, timeout);
955 return responseFuture.thenComposeAsync((responseMessage) -> {
956 CompletableFuture<long[]> returningFuture = new CompletableFuture<>();
957 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
958 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
959 long[] sequenceNumbers = (long[]) RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS);
960 if (TRACE_LOGGER.isDebugEnabled()) {
961 TRACE_LOGGER.debug("Scheduled messages sent. Received sequence numbers '{}'", Arrays.toString(sequenceNumbers));
962 }
963
964 returningFuture.complete(sequenceNumbers);
965 } else {
966
967 Exception scheduleException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
968 TRACE_LOGGER.error("Sending scheduled messages to '{}' failed.", this.sendPath, scheduleException);
969 returningFuture.completeExceptionally(scheduleException);
970 }
971 return returningFuture;
972 }, MessagingFactory.INTERNAL_THREAD_POOL);
973 }, MessagingFactory.INTERNAL_THREAD_POOL);
974 }
975
976 public CompletableFuture<Void> cancelScheduledMessageAsync(Long[] sequenceNumbers, Duration timeout) {
977 if (TRACE_LOGGER.isDebugEnabled()) {
978 TRACE_LOGGER.debug("Cancelling scheduled message(s) '{}' to '{}'", Arrays.toString(sequenceNumbers), this.sendPath);
979 }
980
981 return this.createRequestResponseLink().thenComposeAsync((v) -> {
982 HashMap requestBodyMap = new HashMap();
983 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers);
984
985 Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_CANCEL_CHEDULE_MESSAGE_OPERATION, requestBodyMap, Util.adjustServerTimeout(timeout), this.sendLink.getName());
986 CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, timeout);
987 return responseFuture.thenComposeAsync((responseMessage) -> {
988 CompletableFuture<Void> returningFuture = new CompletableFuture<Void>();
989 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
990 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
991 TRACE_LOGGER.debug("Cancelled scheduled messages in '{}'", this.sendPath);
992 returningFuture.complete(null);
993 } else {
994
995 Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
996 TRACE_LOGGER.error("Cancelling scheduled messages in '{}' failed.", this.sendPath, failureException);
997 returningFuture.completeExceptionally(failureException);
998 }
999 return returningFuture;
1000 }, MessagingFactory.INTERNAL_THREAD_POOL);
1001 }, MessagingFactory.INTERNAL_THREAD_POOL);
1002 }
1003
1004
1005 public CompletableFuture<Collection<Message>> peekMessagesAsync(long fromSequenceNumber, int messageCount) {
1006 TRACE_LOGGER.debug("Peeking '{}' messages in '{}' from sequence number '{}'", messageCount, this.sendPath, fromSequenceNumber);
1007 return this.createRequestResponseLink().thenComposeAsync((v) -> CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, null, this.sendLink.getName()), MessagingFactory.INTERNAL_THREAD_POOL);
1008 }
1009 }