1
2
3
4 package com.microsoft.azure.servicebus.primitives;
5
6 import java.io.IOException;
7 import java.time.Duration;
8 import java.time.Instant;
9 import java.time.ZonedDateTime;
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Date;
14 import java.util.HashMap;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Locale;
19 import java.util.Map;
20 import java.util.UUID;
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.stream.Collectors;
27
28 import com.microsoft.azure.servicebus.TransactionContext;
29 import org.apache.qpid.proton.amqp.Binary;
30 import org.apache.qpid.proton.amqp.Symbol;
31 import org.apache.qpid.proton.amqp.UnsignedInteger;
32 import org.apache.qpid.proton.amqp.messaging.Accepted;
33 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
34 import org.apache.qpid.proton.amqp.messaging.Modified;
35 import org.apache.qpid.proton.amqp.messaging.Outcome;
36 import org.apache.qpid.proton.amqp.messaging.Rejected;
37 import org.apache.qpid.proton.amqp.messaging.Released;
38 import org.apache.qpid.proton.amqp.messaging.Source;
39 import org.apache.qpid.proton.amqp.messaging.Target;
40 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
41 import org.apache.qpid.proton.amqp.transport.DeliveryState;
42 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
43 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
44 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
45 import org.apache.qpid.proton.engine.BaseHandler;
46 import org.apache.qpid.proton.engine.Connection;
47 import org.apache.qpid.proton.engine.Delivery;
48 import org.apache.qpid.proton.engine.EndpointState;
49 import org.apache.qpid.proton.engine.Receiver;
50 import org.apache.qpid.proton.engine.Session;
51 import org.apache.qpid.proton.message.Message;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.microsoft.azure.servicebus.amqp.DispatchHandler;
56 import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
57 import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
58 import com.microsoft.azure.servicebus.amqp.SessionHandler;
59
60
61
62
63
64
65
66 public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider {
67 private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageReceiver.class);
68 private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5);
69 private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1);
70 private static final Duration UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(500);
71 private static final Duration ZERO_TIMEOUT_APPROXIMATION = Duration.ofMillis(200);
72 private static final int CREDIT_FLOW_BATCH_SIZE = 50;
73
74 private final Object requestResonseLinkCreationLock = new Object();
75 private final ConcurrentLinkedQueue<ReceiveWorkItem> pendingReceives;
76 private final ConcurrentHashMap<String, UpdateStateWorkItem> pendingUpdateStateRequests;
77 private final ConcurrentHashMap<String, Delivery> tagsToDeliveriesMap;
78 private final MessagingFactory underlyingFactory;
79 private final String receivePath;
80 private final String sasTokenAudienceURI;
81 private final Duration operationTimeout;
82 private final CompletableFuture<Void> linkClose;
83 private final Object prefetchCountSync;
84 private final SettleModePair settleModePair;
85 private final RetryPolicy retryPolicy;
86 private int prefetchCount;
87 private String sessionId;
88 private boolean isSessionReceiver;
89 private boolean isBrowsableSession;
90 private Instant sessionLockedUntilUtc;
91 private boolean isSessionLockLost;
92 private ConcurrentLinkedQueue<MessageWithDeliveryTag> prefetchedMessages;
93 private Receiver receiveLink;
94 private RequestResponseLink requestResponseLink;
95 private WorkItem<CoreMessageReceiver> linkOpen;
96
97 private Exception lastKnownLinkError;
98 private Instant lastKnownErrorReportedAt;
99 private final AtomicInteger creditToFlow;
100 private final AtomicInteger creditNeededtoServePendingReceives;
101 private final AtomicInteger currentPrefetechedMessagesCount;
102 private ScheduledFuture<?> sasTokenRenewTimerFuture;
103 private CompletableFuture<Void> requestResponseLinkCreationFuture;
104 private CompletableFuture<Void> receiveLinkReopenFuture;
105 private final Runnable timedOutUpdateStateRequestsDaemon;
106 private final Runnable returnMesagesLoopDaemon;
107 private final ScheduledFuture<?> updateStateRequestsTimeoutChecker;
108 private final ScheduledFuture<?> returnMessagesLoopRunner;
109 private final MessagingEntityType entityType;
110
111
112 private CoreMessageReceiver(final MessagingFactory factory,
113 final String name,
114 final String recvPath,
115 final String sessionId,
116 final int prefetchCount,
117 final SettleModePair settleModePair,
118 final MessagingEntityType entityType) {
119 super(name);
120
121 this.underlyingFactory = factory;
122 this.operationTimeout = factory.getOperationTimeout();
123 this.receivePath = recvPath;
124 this.sasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), recvPath);
125 this.sessionId = sessionId;
126 this.isSessionReceiver = false;
127 this.isBrowsableSession = false;
128 this.prefetchCount = prefetchCount;
129 this.settleModePair = settleModePair;
130 this.prefetchedMessages = new ConcurrentLinkedQueue<>();
131 this.linkClose = new CompletableFuture<>();
132 this.lastKnownLinkError = null;
133 this.prefetchCountSync = new Object();
134 this.retryPolicy = factory.getRetryPolicy();
135 this.pendingReceives = new ConcurrentLinkedQueue<>();
136
137 this.pendingUpdateStateRequests = new ConcurrentHashMap<>();
138 this.tagsToDeliveriesMap = new ConcurrentHashMap<>();
139 this.lastKnownErrorReportedAt = Instant.now();
140 this.receiveLinkReopenFuture = null;
141 this.creditToFlow = new AtomicInteger();
142 this.creditNeededtoServePendingReceives = new AtomicInteger();
143 this.currentPrefetechedMessagesCount = new AtomicInteger();
144 this.entityType = entityType;
145
146 this.timedOutUpdateStateRequestsDaemon = () -> {
147 try {
148 TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", CoreMessageReceiver.this.receivePath);
149 for (Map.Entry<String, UpdateStateWorkItem> entry : CoreMessageReceiver.this.pendingUpdateStateRequests.entrySet()) {
150 Duration remainingTime = entry.getValue().getTimeoutTracker().remaining();
151 if (remainingTime.isZero() || remainingTime.isNegative()) {
152 CoreMessageReceiver.this.pendingUpdateStateRequests.remove(entry.getKey());
153 Exception exception = entry.getValue().getLastKnownException();
154 if (exception == null) {
155 exception = new TimeoutException("Request timed out.");
156 }
157 TRACE_LOGGER.error("UpdateState request timed out. Delivery:{}", entry.getKey(), exception);
158 AsyncUtil.completeFutureExceptionally(entry.getValue().getWork(), exception);
159 }
160 }
161 TRACE_LOGGER.trace("'{}' core message receiver's internal loop to complete timed out update state requests stopped.", CoreMessageReceiver.this.receivePath);
162 } catch (Throwable e) {
163
164 }
165 };
166
167
168 this.returnMesagesLoopDaemon = () -> {
169 try {
170 TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", CoreMessageReceiver.this.receivePath);
171 while (!CoreMessageReceiver.this.prefetchedMessages.isEmpty()) {
172 ReceiveWorkItem currentReceive = CoreMessageReceiver.this.pendingReceives.poll();
173 if (currentReceive != null) {
174 if (!currentReceive.getWork().isDone()) {
175 TRACE_LOGGER.debug("Returning the message received from '{}' to a pending receive request", CoreMessageReceiver.this.receivePath);
176 currentReceive.cancelTimeoutTask(false);
177 List<MessageWithDeliveryTag> messages = CoreMessageReceiver.this.receiveCore(currentReceive.getMaxMessageCount());
178 CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(currentReceive.getMaxMessageCount());
179 AsyncUtil.completeFuture(currentReceive.getWork(), messages);
180 }
181 } else {
182 break;
183 }
184 }
185 TRACE_LOGGER.trace("'{}' core message receiver's internal loop to return messages to waiting clients stopped.", CoreMessageReceiver.this.receivePath);
186 } catch (Throwable e) {
187
188 }
189 };
190
191
192 this.updateStateRequestsTimeoutChecker = Timer.schedule(timedOutUpdateStateRequestsDaemon, CoreMessageReceiver.UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
193
194 this.returnMessagesLoopRunner = Timer.schedule(returnMesagesLoopDaemon, CoreMessageReceiver.RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
195 }
196
197
198 @Deprecated
199 public static CompletableFuture<CoreMessageReceiver> create(
200 final MessagingFactory factory,
201 final String name,
202 final String recvPath,
203 final int prefetchCount,
204 final SettleModePair settleModePair) {
205 return create(factory, name, recvPath, prefetchCount, settleModePair, null);
206 }
207
208 @Deprecated
209 public static CompletableFuture<CoreMessageReceiver> create(
210 final MessagingFactory factory,
211 final String name,
212 final String recvPath,
213 final String sessionId,
214 final boolean isBrowsableSession,
215 final int prefetchCount,
216 final SettleModePair settleModePair) {
217 return create(factory, name, recvPath, sessionId, isBrowsableSession, prefetchCount, settleModePair, null);
218 }
219
220 public static CompletableFuture<CoreMessageReceiver> create(
221 final MessagingFactory factory,
222 final String name,
223 final String recvPath,
224 final int prefetchCount,
225 final SettleModePair settleModePair,
226 final MessagingEntityType entityType) {
227 TRACE_LOGGER.info("Creating core message receiver to '{}'", recvPath);
228 CoreMessageReceiveroreMessageReceiver.html#CoreMessageReceiver">CoreMessageReceiver msgReceiver = new CoreMessageReceiver(
229 factory,
230 name,
231 recvPath,
232 null,
233 prefetchCount,
234 settleModePair,
235 entityType);
236 return msgReceiver.createLink();
237 }
238
239 public static CompletableFuture<CoreMessageReceiver> create(
240 final MessagingFactory factory,
241 final String name,
242 final String recvPath,
243 final String sessionId,
244 final boolean isBrowsableSession,
245 final int prefetchCount,
246 final SettleModePair settleModePair,
247 final MessagingEntityType entityType) {
248 TRACE_LOGGER.info("Creating core session receiver to '{}', sessionId '{}', browseonly session '{}'", recvPath, sessionId, isBrowsableSession);
249 CoreMessageReceiveroreMessageReceiver.html#CoreMessageReceiver">CoreMessageReceiver msgReceiver = new CoreMessageReceiver(
250 factory,
251 name,
252 recvPath,
253 sessionId,
254 prefetchCount,
255 settleModePair,
256 entityType);
257 msgReceiver.isSessionReceiver = true;
258 msgReceiver.isBrowsableSession = isBrowsableSession;
259 return msgReceiver.createLink();
260 }
261
262 private CompletableFuture<CoreMessageReceiver> createLink() {
263 this.linkOpen = new WorkItem<>(new CompletableFuture<>(), this.operationTimeout);
264 this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
265 this.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> {
266 if (sasTokenEx != null) {
267 Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
268 TRACE_LOGGER.error("Sending SAS Token failed. ReceivePath:{}", this.receivePath, cause);
269 this.linkOpen.getWork().completeExceptionally(cause);
270 } else {
271 try {
272 this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
273 @Override
274 public void onEvent() {
275 CoreMessageReceiver.this.createReceiveLink();
276 }
277 });
278 } catch (IOException ioException) {
279 this.cancelSASTokenRenewTimer();
280 this.linkOpen.getWork().completeExceptionally(new ServiceBusException(false, "Failed to create Receiver, see cause for more details.", ioException));
281 }
282 }
283
284 return null;
285 }, MessagingFactory.INTERNAL_THREAD_POOL);
286
287 return this.linkOpen.getWork();
288 }
289
290 private CompletableFuture<Void> createRequestResponseLinkAsync() {
291 synchronized (this.requestResonseLinkCreationLock) {
292 if (this.requestResponseLinkCreationFuture == null) {
293 this.requestResponseLinkCreationFuture = new CompletableFuture<>();
294 this.underlyingFactory.obtainRequestResponseLinkAsync(this.receivePath, this.entityType).handleAsync((rrlink, ex) -> {
295 if (ex == null) {
296 this.requestResponseLink = rrlink;
297 this.requestResponseLinkCreationFuture.complete(null);
298 } else {
299 Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex);
300 this.requestResponseLinkCreationFuture.completeExceptionally(cause);
301
302 synchronized (this.requestResonseLinkCreationLock) {
303 this.requestResponseLinkCreationFuture = null;
304 }
305 }
306 return null;
307 }, MessagingFactory.INTERNAL_THREAD_POOL);
308 }
309
310 return this.requestResponseLinkCreationFuture;
311 }
312 }
313
314 private void closeRequestResponseLink() {
315 synchronized (this.requestResonseLinkCreationLock) {
316 if (this.requestResponseLinkCreationFuture != null) {
317 this.requestResponseLinkCreationFuture.thenRun(() -> {
318 this.underlyingFactory.releaseRequestResponseLink(this.receivePath);
319 this.requestResponseLink = null;
320 });
321 this.requestResponseLinkCreationFuture = null;
322 }
323 }
324 }
325
326 private void createReceiveLink() {
327 TRACE_LOGGER.info("Creating receive link to '{}'", this.receivePath);
328 Connection connection = this.underlyingFactory.getConnection();
329
330 final Session session = connection.session();
331 session.setIncomingCapacity(Integer.MAX_VALUE);
332 session.open();
333 BaseHandler.setHandler(session, new SessionHandler(this.receivePath));
334
335 final String receiveLinkNamePrefix = "Receiver".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
336 final String receiveLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer())
337 ? receiveLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer())
338 : receiveLinkNamePrefix;
339 final Receiver receiver = session.receiver(receiveLinkName);
340
341 Source source = new Source();
342 source.setAddress(receivePath);
343 Map<Symbol, Object> linkProperties = new HashMap<>();
344
345 linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()));
346 if (this.entityType != null) {
347 linkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue());
348 }
349
350 if (this.isSessionReceiver) {
351 HashMap filterMap = new HashMap();
352 filterMap.put(ClientConstants.SESSION_FILTER, this.sessionId);
353 source.setFilter(filterMap);
354
355 linkProperties.put(ClientConstants.LINK_PEEKMODE_PROPERTY, this.isBrowsableSession);
356 }
357
358 receiver.setSource(source);
359 receiver.setTarget(new Target());
360
361
362 TRACE_LOGGER.debug("Receive link settle mode '{}'", this.settleModePair);
363 receiver.setSenderSettleMode(this.settleModePair.getSenderSettleMode());
364 receiver.setReceiverSettleMode(this.settleModePair.getReceiverSettleMode());
365
366 receiver.setProperties(linkProperties);
367
368 final ReceiveLinkHandlerceiveLinkHandler.html#ReceiveLinkHandler">ReceiveLinkHandler handler = new ReceiveLinkHandler(this);
369 BaseHandler.setHandler(receiver, handler);
370 receiver.open();
371 this.receiveLink = receiver;
372 this.underlyingFactory.registerForConnectionError(this.receiveLink);
373 }
374
375 CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean retryOnFailure) {
376 if (this.getIsClosingOrClosed()) {
377 return CompletableFuture.completedFuture(null);
378 } else {
379 CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
380 return sendTokenFuture.thenAccept((f) -> this.sasTokenRenewTimerFuture = f);
381 }
382 }
383
384 private void throwIfInUnusableState() {
385 if (this.isSessionReceiver && this.isSessionLockLost) {
386 throw new IllegalStateException("Session lock lost and cannot be used. Close this session and accept another session.");
387 }
388
389 this.throwIfClosed(this.lastKnownLinkError);
390 }
391
392 private void cancelSASTokenRenewTimer() {
393 if (this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) {
394 this.sasTokenRenewTimerFuture.cancel(true);
395 TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
396 }
397 }
398
399 private List<MessageWithDeliveryTag> receiveCore(int messageCount) {
400 List<MessageWithDeliveryTag> returnMessages = null;
401 MessageWithDeliveryTag currentMessage = this.prefetchedMessages.poll();
402 int returnedMessageCount = 0;
403 while (currentMessage != null) {
404 this.currentPrefetechedMessagesCount.decrementAndGet();
405 if (returnMessages == null) {
406 returnMessages = new LinkedList<>();
407 }
408
409 returnMessages.add(currentMessage);
410 if (++returnedMessageCount >= messageCount) {
411 break;
412 }
413
414 currentMessage = this.prefetchedMessages.poll();
415 }
416
417 return returnMessages;
418 }
419
420 public int getPrefetchCount() {
421 synchronized (this.prefetchCountSync) {
422 return this.prefetchCount;
423 }
424 }
425
426 public String getSessionId() {
427 return this.sessionId;
428 }
429
430
431 public Instant getSessionLockedUntilUtc() {
432 if (this.isSessionReceiver) {
433 return this.sessionLockedUntilUtc;
434 } else {
435 throw new RuntimeException("Object is not a session receiver");
436 }
437 }
438
439 public void setPrefetchCount(final int value) throws ServiceBusException {
440 if (value < 0) {
441 throw new IllegalArgumentException("Prefetch count cannot be negative.");
442 }
443 this.throwIfInUnusableState();
444 final int deltaPrefetchCount;
445 synchronized (this.prefetchCountSync) {
446 deltaPrefetchCount = value - this.prefetchCount;
447 this.prefetchCount = value;
448 TRACE_LOGGER.info("Setting prefetch count to '{}' on recieve link to '{}'", value, this.receivePath);
449 }
450
451 if (deltaPrefetchCount > 0) {
452 try {
453 this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
454 @Override
455 public void onEvent() {
456 sendFlow(deltaPrefetchCount);
457 }
458 });
459 } catch (IOException ioException) {
460 throw new ServiceBusException(false, "Setting prefetch count failed, see cause for more details", ioException);
461 }
462 }
463 }
464
465 public CompletableFuture<Collection<MessageWithDeliveryTag>> receiveAsync(final int maxMessageCount, Duration timeout) {
466 this.throwIfInUnusableState();
467
468 if (maxMessageCount <= 0) {
469 throw new IllegalArgumentException("parameter 'maxMessageCount' should be a positive number");
470 }
471
472 TRACE_LOGGER.debug("Receiving maximum of '{}' messages from '{}'", maxMessageCount, this.receivePath);
473 CompletableFuture<Collection<MessageWithDeliveryTag>> onReceive = new CompletableFuture<>();
474 final ReceiveWorkItemeceiveWorkItem.html#ReceiveWorkItem">ReceiveWorkItem receiveWorkItem = new ReceiveWorkItem(onReceive, timeout, maxMessageCount);
475 this.creditNeededtoServePendingReceives.addAndGet(maxMessageCount);
476 this.pendingReceives.add(receiveWorkItem);
477
478
479 if (timeout == Duration.ZERO) {
480 timeout = ZERO_TIMEOUT_APPROXIMATION;
481 }
482
483 Timer.schedule(
484 () -> {
485 if (CoreMessageReceiver.this.pendingReceives.remove(receiveWorkItem)) {
486 CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
487 TRACE_LOGGER.info("No messages received from '{}'. Pending receive request timed out. Returning null to the client.", CoreMessageReceiver.this.receivePath);
488 AsyncUtil.completeFuture(receiveWorkItem.getWork(), null);
489 }
490 },
491 timeout,
492 TimerType.OneTimeRun);
493
494 this.ensureLinkIsOpen().thenRun(() -> this.addCredit(receiveWorkItem));
495 return onReceive;
496 }
497
498 @Override
499 public void onOpenComplete(Exception exception) {
500 if (exception == null) {
501 TRACE_LOGGER.info("Receive link to '{}' opened.", this.receivePath);
502 if (this.isSessionReceiver) {
503 Map remoteSourceFilter = ((Source) this.receiveLink.getRemoteSource()).getFilter();
504 if (remoteSourceFilter != null && remoteSourceFilter.containsKey(ClientConstants.SESSION_FILTER)) {
505 String remoteSessionId = (String) remoteSourceFilter.get(ClientConstants.SESSION_FILTER);
506 this.sessionId = remoteSessionId;
507
508 if (this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.LOCKED_UNTIL_UTC)) {
509 this.sessionLockedUntilUtc = Util.convertDotNetTicksToInstant((long) this.receiveLink.getRemoteProperties().get(ClientConstants.LOCKED_UNTIL_UTC));
510 } else {
511 TRACE_LOGGER.warn("Accepted a session with id '{}', from '{}' which didn't set '{}' property on the receive link.", this.sessionId, this.receivePath, ClientConstants.LOCKED_UNTIL_UTC);
512 this.sessionLockedUntilUtc = Instant.ofEpochMilli(0);
513 }
514
515 TRACE_LOGGER.info("Accepted session with id '{}', lockedUntilUtc '{}' from '{}'.", this.sessionId, this.sessionLockedUntilUtc, this.receivePath);
516 } else {
517 exception = new ServiceBusException(false, "SessionId filter not set on the remote source.");
518 }
519 }
520 }
521
522 if (exception == null) {
523 if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
524 AsyncUtil.completeFuture(this.linkOpen.getWork(), this);
525 }
526
527 if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
528 AsyncUtil.completeFuture(this.receiveLinkReopenFuture, null);
529 }
530
531 this.lastKnownLinkError = null;
532
533 this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
534
535 this.sendFlow(this.prefetchCount - this.currentPrefetechedMessagesCount.get());
536
537 TRACE_LOGGER.debug("receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}",
538 this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount);
539 } else {
540 this.cancelSASTokenRenewTimer();
541
542 if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
543 TRACE_LOGGER.error("Opening receive link '{}' to '{}' failed.", this.receiveLink.getName(), this.receivePath, exception);
544 this.setClosed();
545 ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), exception, this, true);
546 }
547
548 if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
549 TRACE_LOGGER.warn("Opening receive link '{}' to '{}' failed.", this.receiveLink.getName(), this.receivePath, exception);
550 AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
551 }
552
553 this.lastKnownLinkError = exception;
554 }
555 }
556
557 @Override
558 public void onReceiveComplete(Delivery delivery) {
559 this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
560 byte[] deliveryTag = delivery.getTag();
561 String deliveryTagAsString = StringUtil.convertBytesToString(delivery.getTag());
562 TRACE_LOGGER.debug("Received a delivery '{}' from '{}'", deliveryTagAsString, this.receivePath);
563 if (deliveryTag == null || deliveryTag.length == 0 || !this.tagsToDeliveriesMap.containsKey(deliveryTagAsString)) {
564 TRACE_LOGGER.debug("Received a message from '{}'. Adding to prefecthed messages.", this.receivePath);
565 try {
566 Message message = Util.readMessageFromDelivery(receiveLink, delivery);
567
568 if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.SETTLED) {
569
570 delivery.disposition(Accepted.getInstance());
571 delivery.settle();
572 } else {
573 this.tagsToDeliveriesMap.put(StringUtil.convertBytesToString(delivery.getTag()), delivery);
574 receiveLink.advance();
575 }
576
577
578 this.currentPrefetechedMessagesCount.incrementAndGet();
579 this.prefetchedMessages.add(new MessageWithDeliveryTag(message, delivery.getTag()));
580 } catch (Exception e) {
581 TRACE_LOGGER.warn("Reading message from delivery '{}' from '{}', session '{}' failed with unexpected exception.", deliveryTagAsString, this.receivePath, this.sessionId, e);
582 delivery.disposition(Released.getInstance());
583 delivery.settle();
584 return;
585 }
586 } else {
587 DeliveryState remoteState = delivery.getRemoteState();
588 TRACE_LOGGER.debug("Received a delivery '{}' with state '{}' from '{}'", deliveryTagAsString, remoteState, this.receivePath);
589
590 Outcome remoteOutcome = null;
591 if (remoteState instanceof Outcome) {
592 remoteOutcome = (Outcome) remoteState;
593 } else if (remoteState instanceof TransactionalState) {
594 remoteOutcome = ((TransactionalState) remoteState).getOutcome();
595 }
596
597 if (remoteOutcome != null) {
598 UpdateStateWorkItem matchingUpdateStateWorkItem = this.pendingUpdateStateRequests.get(deliveryTagAsString);
599 if (matchingUpdateStateWorkItem != null) {
600 DeliveryState matchingUpdateWorkItemDeliveryState = matchingUpdateStateWorkItem.getDeliveryState();
601 if (matchingUpdateWorkItemDeliveryState instanceof TransactionalState) {
602 matchingUpdateWorkItemDeliveryState = (DeliveryState) ((TransactionalState) matchingUpdateWorkItemDeliveryState).getOutcome();
603 }
604
605
606 if (remoteOutcome.getClass().getName().equals(matchingUpdateWorkItemDeliveryState.getClass().getName())) {
607 TRACE_LOGGER.debug("Completing a pending updateState operation for delivery '{}' from '{}'", deliveryTagAsString, this.receivePath);
608 this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, null);
609 } else {
610
611
612 TRACE_LOGGER.warn("Received delivery '{}' state '{}' doesn't match expected state '{}'", deliveryTagAsString, remoteState, matchingUpdateStateWorkItem.deliveryState);
613
614 if (remoteOutcome instanceof Rejected) {
615 Rejected rejected = (Rejected) remoteOutcome;
616 ErrorCondition error = rejected.getError();
617 Exception exception = ExceptionUtil.toException(error);
618
619 if (ExceptionUtil.isGeneralError(error.getCondition())) {
620 this.lastKnownLinkError = exception;
621 this.lastKnownErrorReportedAt = Instant.now();
622 }
623
624 Duration retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), exception, matchingUpdateStateWorkItem.getTimeoutTracker().remaining());
625 if (retryInterval == null) {
626 TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception);
627 this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
628 } else {
629 matchingUpdateStateWorkItem.setLastKnownException(exception);
630
631 TRACE_LOGGER.debug("Pending updateState operation for delivery '{}' will be retried after '{}'", deliveryTagAsString, retryInterval);
632 try {
633 this.underlyingFactory.scheduleOnReactorThread((int) retryInterval.toMillis(), new DeliveryStateDispatchHandler(delivery, matchingUpdateStateWorkItem.getDeliveryState()));
634 } catch (IOException ioException) {
635 this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem,
636 new ServiceBusException(false, "Operation failed while scheduling a retry on Reactor, see cause for more details.", ioException));
637 }
638 }
639 } else if (remoteOutcome instanceof Released) {
640 Exception exception = new OperationCancelledException(remoteOutcome.toString());
641 TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception);
642 this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
643 } else {
644 Exception exception = new ServiceBusException(false, remoteOutcome.toString());
645 TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception);
646 this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
647 }
648
649 }
650 }
651 }
652 }
653 }
654
655 @Override
656 public void onError(Exception exception) {
657 this.creditToFlow.set(0);
658 this.cancelSASTokenRenewTimer();
659 if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.UNSETTLED) {
660 this.prefetchedMessages.clear();
661 this.currentPrefetechedMessagesCount.set(0);
662 this.tagsToDeliveriesMap.clear();
663 }
664
665 if (this.getIsClosingOrClosed()) {
666 TRACE_LOGGER.info("Receive link to '{}', sessionId '{}' closed", this.receivePath, this.sessionId);
667 AsyncUtil.completeFuture(this.linkClose, null);
668 this.clearAllPendingWorkItems(exception);
669 } else {
670 this.underlyingFactory.deregisterForConnectionError(this.receiveLink);
671 TRACE_LOGGER.warn("Receive link '{}' to '{}', sessionId '{}' closed with error.", this.receiveLink.getName(), this.receivePath, this.sessionId, exception);
672 this.lastKnownLinkError = exception;
673 if ((this.linkOpen != null && !this.linkOpen.getWork().isDone())
674 || (this.receiveLinkReopenFuture != null && !receiveLinkReopenFuture.isDone())) {
675 this.onOpenComplete(exception);
676 }
677
678 if (exception != null
679 && (!(exception instanceof ServiceBusException./../com/microsoft/azure/servicebus/primitives/ServiceBusException.html#ServiceBusException">ServiceBusException) || !((ServiceBusException) exception).getIsTransient())) {
680 this.clearAllPendingWorkItems(exception);
681
682 if (this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException)) {
683
684 TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId);
685 this.isSessionLockLost = true;
686 this.closeAsync();
687 }
688 } else {
689
690 ReceiveWorkItem workItem = this.pendingReceives.peek();
691 if (workItem != null && workItem.getTimeoutTracker() != null) {
692 Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy()
693 .getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining());
694 if (nextRetryInterval != null) {
695 TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval);
696 Timer.schedule(() -> CoreMessageReceiver.this.ensureLinkIsOpen(), nextRetryInterval, TimerType.OneTimeRun);
697 }
698 }
699 }
700 }
701 }
702
703 private void reduceCreditForCompletedReceiveRequest(int maxCreditCountOfReceiveRequest) {
704 this.creditNeededtoServePendingReceives.updateAndGet((c) -> {
705 int updatedCredit = c - maxCreditCountOfReceiveRequest;
706 return (updatedCredit > 0) ? updatedCredit : 0;
707 });
708 }
709
710 private void addCredit(ReceiveWorkItem receiveWorkItem) {
711
712
713 int creditToFlowForWorkItem = this.creditNeededtoServePendingReceives.get() - (this.receiveLink.getCredit() + this.currentPrefetechedMessagesCount.get() + this.creditToFlow.get()) + this.prefetchCount;
714 if (creditToFlowForWorkItem > 0) {
715 int currentTotalCreditToSend = this.creditToFlow.addAndGet(creditToFlowForWorkItem);
716 if (currentTotalCreditToSend >= this.prefetchCount || currentTotalCreditToSend >= CREDIT_FLOW_BATCH_SIZE) {
717 try {
718 this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
719 @Override
720 public void onEvent() {
721
722 int accumulatedCredit = CoreMessageReceiver.this.creditToFlow.getAndSet(0);
723 sendFlow(accumulatedCredit);
724 }
725 });
726 } catch (IOException ioException) {
727 this.pendingReceives.remove(receiveWorkItem);
728 this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
729 receiveWorkItem.getWork().completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException));
730 receiveWorkItem.cancelTimeoutTask(false);
731 }
732 }
733 }
734 }
735
736 private void sendFlow(int credits) {
737 if (!this.isBrowsableSession && credits > 0) {
738 this.receiveLink.flow(credits);
739 TRACE_LOGGER.debug("Sent flow to the service. receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}",
740 this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), credits);
741 }
742 }
743
744 private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) {
745
746 Timer.schedule(
747 () -> {
748 if (!linkOpen.getWork().isDone()) {
749 CoreMessageReceiver.this.closeInternals(false);
750 CoreMessageReceiver.this.setClosed();
751
752 Exception operationTimedout = new TimeoutException(
753 String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()),
754 CoreMessageReceiver.this.lastKnownLinkError);
755 TRACE_LOGGER.warn(operationTimedout.getMessage());
756 ExceptionUtil.completeExceptionally(linkOpen.getWork(), operationTimedout, CoreMessageReceiver.this, true);
757 }
758 },
759 timeout.remaining(),
760 TimerType.OneTimeRun);
761 }
762
763 private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) {
764
765 Timer.schedule(
766 () -> {
767 if (!linkClose.isDone()) {
768 Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", CoreMessageReceiver.this.receiveLink.getName(), ZonedDateTime.now()));
769 TRACE_LOGGER.warn(operationTimedout.getMessage());
770
771 ExceptionUtil.completeExceptionally(linkClose, operationTimedout, CoreMessageReceiver.this, true);
772 }
773 },
774 timeout.remaining(),
775 TimerType.OneTimeRun);
776 }
777
778 @Override
779 public void onClose(ErrorCondition condition) {
780 if (condition == null) {
781 this.onError(new ServiceBusException(true,
782 String.format(Locale.US, "Closing the link. LinkName(%s), EntityPath(%s)", this.receiveLink.getName(), this.receivePath)));
783 } else {
784 Exception completionException = ExceptionUtil.toException(condition);
785 this.onError(completionException);
786 }
787 }
788
789 @Override
790 public ErrorContext getContext() {
791 final boolean isLinkOpened = this.linkOpen != null && this.linkOpen.getWork().isDone();
792 final String referenceId = this.receiveLink != null && this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)
793 ? this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString()
794 : ((this.receiveLink != null) ? this.receiveLink.getName() : null);
795
796 ReceiverErrorContexteiverErrorContext.html#ReceiverErrorContext">ReceiverErrorContext errorContext = new ReceiverErrorContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null,
797 this.receivePath,
798 referenceId,
799 isLinkOpened ? this.prefetchCount : null,
800 isLinkOpened && this.receiveLink != null ? this.receiveLink.getCredit() : null,
801 this.currentPrefetechedMessagesCount.get());
802
803 return errorContext;
804 }
805
806 @Override
807 protected CompletableFuture<Void> onClose() {
808 this.closeInternals(true);
809 return this.linkClose;
810 }
811
812 private void closeInternals(boolean waitForCloseCompletion) {
813 if (!this.getIsClosed()) {
814 if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) {
815 try {
816 this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
817
818 @Override
819 public void onEvent() {
820 if (CoreMessageReceiver.this.receiveLink != null && CoreMessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) {
821 TRACE_LOGGER.info("Closing receive link to '{}'", CoreMessageReceiver.this.receivePath);
822 CoreMessageReceiver.this.receiveLink.close();
823 CoreMessageReceiver.this.underlyingFactory.deregisterForConnectionError(CoreMessageReceiver.this.receiveLink);
824 if (waitForCloseCompletion) {
825 CoreMessageReceiver.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageReceiver.this.operationTimeout));
826 } else {
827 AsyncUtil.completeFuture(CoreMessageReceiver.this.linkClose, null);
828 }
829 }
830 }
831 });
832 } catch (IOException e) {
833 AsyncUtil.completeFutureExceptionally(this.linkClose, e);
834 }
835 } else {
836 AsyncUtil.completeFuture(this.linkClose, null);
837 }
838
839 this.cancelSASTokenRenewTimer();
840 this.closeRequestResponseLink();
841 this.updateStateRequestsTimeoutChecker.cancel(false);
842 this.returnMessagesLoopRunner.cancel(false);
843 }
844 }
845
846
847
848
849 public CompletableFuture<Void> completeMessageAsync(byte[] deliveryTag, TransactionContext transaction) {
850 Outcome outcome = Accepted.getInstance();
851 return this.updateMessageStateAsync(deliveryTag, outcome, transaction);
852 }
853
854
855
856
857 public CompletableFuture<Void> completeMessageAsync(UUID lockToken, TransactionContext transaction) {
858 return this.updateDispositionAsync(
859 new UUID[]{lockToken},
860 ClientConstants.DISPOSITION_STATUS_COMPLETED,
861 null,
862 null,
863 null,
864 transaction);
865 }
866
867 public CompletableFuture<Void> abandonMessageAsync(byte[] deliveryTag, Map<String, Object> propertiesToModify, TransactionContext transaction) {
868 Modified outcome = new Modified();
869 if (propertiesToModify != null) {
870 outcome.setMessageAnnotations(propertiesToModify);
871 }
872 return this.updateMessageStateAsync(deliveryTag, outcome, transaction);
873 }
874
875 public CompletableFuture<Void> abandonMessageAsync(UUID lockToken, Map<String, Object> propertiesToModify, TransactionContext transaction) {
876 return this.updateDispositionAsync(
877 new UUID[]{lockToken},
878 ClientConstants.DISPOSITION_STATUS_ABANDONED,
879 null,
880 null,
881 propertiesToModify,
882 transaction);
883 }
884
885 public CompletableFuture<Void> deferMessageAsync(byte[] deliveryTag, Map<String, Object> propertiesToModify, TransactionContext transaction) {
886 Modified outcome = new Modified();
887 outcome.setUndeliverableHere(true);
888 if (propertiesToModify != null) {
889 outcome.setMessageAnnotations(propertiesToModify);
890 }
891 return this.updateMessageStateAsync(deliveryTag, outcome, transaction);
892 }
893
894 public CompletableFuture<Void> deferMessageAsync(UUID lockToken, Map<String, Object> propertiesToModify, TransactionContext transaction) {
895 return this.updateDispositionAsync(
896 new UUID[]{lockToken},
897 ClientConstants.DISPOSITION_STATUS_DEFERED,
898 null,
899 null,
900 propertiesToModify,
901 transaction);
902 }
903
904 public CompletableFuture<Void> deadLetterMessageAsync(
905 byte[] deliveryTag,
906 String deadLetterReason,
907 String deadLetterErrorDescription,
908 Map<String, Object> propertiesToModify,
909 TransactionContext transaction) {
910 Rejected outcome = new Rejected();
911 ErrorCondition error = new ErrorCondition(ClientConstants.DEADLETTERNAME, null);
912 Map<String, Object> errorInfo = new HashMap<>();
913 if (!StringUtil.isNullOrEmpty(deadLetterReason)) {
914 errorInfo.put(ClientConstants.DEADLETTER_REASON_HEADER, deadLetterReason);
915 }
916 if (!StringUtil.isNullOrEmpty(deadLetterErrorDescription)) {
917 errorInfo.put(ClientConstants.DEADLETTER_ERROR_DESCRIPTION_HEADER, deadLetterErrorDescription);
918 }
919 if (propertiesToModify != null) {
920 errorInfo.putAll(propertiesToModify);
921 }
922 error.setInfo(errorInfo);
923 outcome.setError(error);
924
925 return this.updateMessageStateAsync(deliveryTag, outcome, transaction);
926 }
927
928 public CompletableFuture<Void> deadLetterMessageAsync(
929 UUID lockToken,
930 String deadLetterReason,
931 String deadLetterErrorDescription,
932 Map<String, Object> propertiesToModify,
933 TransactionContext transaction) {
934 return this.updateDispositionAsync(
935 new UUID[]{lockToken},
936 ClientConstants.DISPOSITION_STATUS_SUSPENDED,
937 deadLetterReason,
938 deadLetterErrorDescription,
939 propertiesToModify,
940 transaction);
941 }
942
943 private CompletableFuture<Void> updateMessageStateAsync(byte[] deliveryTag, Outcome outcome, TransactionContext transaction) {
944 this.throwIfInUnusableState();
945 CompletableFuture<Void> completeMessageFuture = new CompletableFuture<>();
946
947 String deliveryTagAsString = StringUtil.convertBytesToString(deliveryTag);
948 TRACE_LOGGER.debug("Updating message state of delivery '{}' to '{}'", deliveryTagAsString, outcome);
949 Delivery delivery = CoreMessageReceiver.this.tagsToDeliveriesMap.get(deliveryTagAsString);
950 if (delivery == null) {
951 TRACE_LOGGER.error("Delivery not found for delivery tag '{}'. Either receive link to '{}' closed with a transient error and reopened or the delivery was already settled by complete/abandon/defer/deadletter.", deliveryTagAsString, this.receivePath);
952 completeMessageFuture.completeExceptionally(generateDeliveryNotFoundException());
953 } else {
954 DeliveryState state;
955 if (transaction != TransactionContext.NULL_TXN) {
956 state = new TransactionalState();
957 ((TransactionalState) state).setTxnId(new Binary(transaction.getTransactionId().array()));
958 ((TransactionalState) state).setOutcome(outcome);
959 } else {
960 state = (DeliveryState) outcome;
961 }
962
963 final UpdateStateWorkItems/UpdateStateWorkItem.html#UpdateStateWorkItem">UpdateStateWorkItem workItem = new UpdateStateWorkItem(completeMessageFuture, state, CoreMessageReceiver.this.operationTimeout);
964 CoreMessageReceiver.this.pendingUpdateStateRequests.put(deliveryTagAsString, workItem);
965
966 CoreMessageReceiver.this.ensureLinkIsOpen().thenRun(() -> {
967 try {
968 this.underlyingFactory.scheduleOnReactorThread(new DeliveryStateDispatchHandler(delivery, state));
969 } catch (IOException ioException) {
970 completeMessageFuture.completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException));
971 }
972 });
973 }
974
975 return completeMessageFuture;
976 }
977
978 private synchronized CompletableFuture<Void> ensureLinkIsOpen() {
979
980 if (!(this.receiveLink.getLocalState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE)) {
981 if (this.receiveLinkReopenFuture == null || this.receiveLinkReopenFuture.isDone()) {
982 TRACE_LOGGER.info("Recreating receive link to '{}'", this.receivePath);
983 this.retryPolicy.incrementRetryCount(this.getClientId());
984 this.receiveLinkReopenFuture = new CompletableFuture<>();
985
986 final CompletableFuture<Void> linkReopenFutureThatCanBeCancelled = this.receiveLinkReopenFuture;
987 Timer.schedule(
988 () -> {
989 if (!linkReopenFutureThatCanBeCancelled.isDone()) {
990 CoreMessageReceiver.this.cancelSASTokenRenewTimer();
991 Exception operationTimedout = new TimeoutException(
992 String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()));
993
994 TRACE_LOGGER.warn(operationTimedout.getMessage());
995 AsyncUtil.completeFutureExceptionally(linkReopenFutureThatCanBeCancelled, operationTimedout);
996 }
997 },
998 CoreMessageReceiver.LINK_REOPEN_TIMEOUT,
999 TimerType.OneTimeRun);
1000 this.cancelSASTokenRenewTimer();
1001 this.sendTokenAndSetRenewTimer(false).handleAsync((v, sendTokenEx) -> {
1002 if (sendTokenEx != null) {
1003 Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
1004 TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.receivePath, cause);
1005 this.receiveLinkReopenFuture.completeExceptionally(sendTokenEx);
1006 this.clearAllPendingWorkItems(sendTokenEx);
1007 } else {
1008 try {
1009 this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
1010 @Override
1011 public void onEvent() {
1012 CoreMessageReceiver.this.createReceiveLink();
1013 }
1014 });
1015 } catch (IOException ioEx) {
1016 this.receiveLinkReopenFuture.completeExceptionally(ioEx);
1017 }
1018 }
1019 return null;
1020 }, MessagingFactory.INTERNAL_THREAD_POOL);
1021 }
1022
1023 return this.receiveLinkReopenFuture;
1024 } else {
1025 return CompletableFuture.completedFuture(null);
1026 }
1027 }
1028
1029 private void completePendingUpdateStateWorkItem(Delivery delivery, String deliveryTagAsString, UpdateStateWorkItem workItem, Exception exception) {
1030 boolean isSettled = delivery.remotelySettled();
1031 if (isSettled) {
1032 delivery.settle();
1033 }
1034
1035 if (exception == null) {
1036 AsyncUtil.completeFuture(workItem.getWork(), null);
1037 } else {
1038 ExceptionUtil.completeExceptionally(workItem.getWork(), exception, this, true);
1039 }
1040
1041 if (isSettled) {
1042 this.tagsToDeliveriesMap.remove(deliveryTagAsString);
1043 this.pendingUpdateStateRequests.remove(deliveryTagAsString);
1044 }
1045 }
1046
1047 private void clearAllPendingWorkItems(Throwable exception) {
1048 TRACE_LOGGER.info("Completeing all pending receive and updateState operation on the receiver to '{}'", this.receivePath);
1049 final boolean isTransientException = exception == null
1050 || (exception instanceof ServiceBusExceptionom/microsoft/azure/servicebus/primitives/ServiceBusException.html#ServiceBusException">ServiceBusException && ((ServiceBusException) exception).getIsTransient());
1051
1052 Iterator<ReceiveWorkItem> pendingRecivesIterator = this.pendingReceives.iterator();
1053 while (pendingRecivesIterator.hasNext()) {
1054 ReceiveWorkItem workItem = pendingRecivesIterator.next();
1055 pendingRecivesIterator.remove();
1056
1057 CompletableFuture<Collection<MessageWithDeliveryTag>> future = workItem.getWork();
1058 workItem.cancelTimeoutTask(false);
1059 this.reduceCreditForCompletedReceiveRequest(workItem.getMaxMessageCount());
1060 if (isTransientException) {
1061 AsyncUtil.completeFuture(future, null);
1062 } else {
1063 ExceptionUtil.completeExceptionally(future, exception, this, true);
1064 }
1065 }
1066
1067 for (Map.Entry<String, UpdateStateWorkItem> pendingUpdate : this.pendingUpdateStateRequests.entrySet()) {
1068 pendingUpdateStateRequests.remove(pendingUpdate.getKey());
1069 ExceptionUtil.completeExceptionally(pendingUpdate.getValue().getWork(), exception, this, true);
1070 }
1071 }
1072
1073 private static IllegalArgumentException generateDeliveryNotFoundException() {
1074 return new IllegalArgumentException("Delivery not found on the receive link.");
1075 }
1076
1077 private static ServiceBusException generateDispatacherSchedulingFailedException(String operation, Exception cause) {
1078 return new ServiceBusException(false, operation + " failed while dispatching to Reactor, see cause for more details.", cause);
1079 }
1080
1081 public CompletableFuture<Collection<Instant>> renewMessageLocksAsync(UUID[] lockTokens) {
1082 this.throwIfInUnusableState();
1083 if (TRACE_LOGGER.isDebugEnabled()) {
1084 TRACE_LOGGER.debug("Renewing message locks for lock tokens '{}' of entity '{}', sesion '{}'", Arrays.toString(lockTokens), this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1085 }
1086 return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1087 HashMap requestBodyMap = new HashMap();
1088 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, lockTokens);
1089 if (this.isSessionReceiver) {
1090 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1091 }
1092
1093 Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RENEWLOCK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1094 CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1095 return responseFuture.thenComposeAsync((responseMessage) -> {
1096 CompletableFuture<Collection<Instant>> returningFuture = new CompletableFuture<>();
1097 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1098 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1099 if (TRACE_LOGGER.isDebugEnabled()) {
1100 TRACE_LOGGER.debug("Message locks for lock tokens '{}' renewed", Arrays.toString(lockTokens));
1101 }
1102
1103 Date[] expirations = (Date[]) RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_EXPIRATIONS);
1104 returningFuture.complete(Arrays.stream(expirations).map((d) -> d.toInstant()).collect(Collectors.toList()));
1105 } else {
1106
1107 Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1108 TRACE_LOGGER.error("Renewing message locks for lock tokens '{}' on entity '{}' failed", Arrays.toString(lockTokens), this.receivePath, failureException);
1109 returningFuture.completeExceptionally(failureException);
1110 }
1111 return returningFuture;
1112 }, MessagingFactory.INTERNAL_THREAD_POOL);
1113 }, MessagingFactory.INTERNAL_THREAD_POOL);
1114 }
1115
1116 public CompletableFuture<Collection<MessageWithLockToken>> receiveDeferredMessageBatchAsync(Long[] sequenceNumbers) {
1117 this.throwIfInUnusableState();
1118 if (TRACE_LOGGER.isDebugEnabled()) {
1119 TRACE_LOGGER.debug("Receiving messages for sequence numbers '{}' from entity '{}', sesion '{}'", Arrays.toString(sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1120 }
1121 return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1122 HashMap requestBodyMap = new HashMap();
1123 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers);
1124 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_RECEIVER_SETTLE_MODE, UnsignedInteger.valueOf(this.settleModePair.getReceiverSettleMode() == ReceiverSettleMode.FIRST ? 0 : 1));
1125 if (this.isSessionReceiver) {
1126 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1127 }
1128
1129 Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1130 CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1131 return responseFuture.thenComposeAsync((responseMessage) -> {
1132 CompletableFuture<Collection<MessageWithLockToken>> returningFuture = new CompletableFuture<>();
1133 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1134 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1135 if (TRACE_LOGGER.isDebugEnabled()) {
1136 TRACE_LOGGER.debug("Received messges for sequence numbers '{}' from entity '{}', sesion '{}'", Arrays.toString(sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1137 }
1138 List<MessageWithLockToken> receivedMessages = new ArrayList<>();
1139 Object responseBodyMap = ((AmqpValue) responseMessage.getBody()).getValue();
1140 if (responseBodyMap != null && responseBodyMap instanceof Map) {
1141 Object messages = ((Map) responseBodyMap).get(ClientConstants.REQUEST_RESPONSE_MESSAGES);
1142 if (messages != null && messages instanceof Iterable) {
1143 for (Object message : (Iterable) messages) {
1144 if (message instanceof Map) {
1145 Message receivedMessage = Message.Factory.create();
1146 Binary messagePayLoad = (Binary) ((Map) message).get(ClientConstants.REQUEST_RESPONSE_MESSAGE);
1147 receivedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength());
1148 UUID lockToken = ClientConstants.ZEROLOCKTOKEN;
1149 if (((Map) message).containsKey(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN)) {
1150 lockToken = (UUID) ((Map) message).get(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN);
1151 }
1152
1153 receivedMessages.add(new MessageWithLockToken(receivedMessage, lockToken));
1154 }
1155 }
1156 }
1157 }
1158 returningFuture.complete(receivedMessages);
1159 } else {
1160
1161 Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1162 TRACE_LOGGER.error("Receiving messages by sequence numbers '{}' from entity '{}' failed", Arrays.toString(sequenceNumbers), this.receivePath, failureException);
1163 returningFuture.completeExceptionally(failureException);
1164 }
1165 return returningFuture;
1166 }, MessagingFactory.INTERNAL_THREAD_POOL);
1167 }, MessagingFactory.INTERNAL_THREAD_POOL);
1168 }
1169
1170 public CompletableFuture<Void> updateDispositionAsync(
1171 UUID[] lockTokens,
1172 String dispositionStatus,
1173 String deadLetterReason,
1174 String deadLetterErrorDescription,
1175 Map<String, Object> propertiesToModify,
1176 TransactionContext transaction) {
1177 this.throwIfInUnusableState();
1178 if (TRACE_LOGGER.isDebugEnabled()) {
1179 TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}'", Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1180 }
1181 return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1182 HashMap requestBodyMap = new HashMap();
1183 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, lockTokens);
1184 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DISPOSITION_STATUS, dispositionStatus);
1185
1186 if (deadLetterReason != null) {
1187 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_REASON, deadLetterReason);
1188 }
1189
1190 if (deadLetterErrorDescription != null) {
1191 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_DESCRIPTION, deadLetterErrorDescription);
1192 }
1193
1194 if (propertiesToModify != null && propertiesToModify.size() > 0) {
1195 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_PROPERTIES_TO_MODIFY, propertiesToModify);
1196 }
1197
1198 if (this.isSessionReceiver) {
1199 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1200 }
1201
1202 Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1203 CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, transaction, this.operationTimeout);
1204 return responseFuture.thenComposeAsync((responseMessage) -> {
1205 CompletableFuture<Void> returningFuture = new CompletableFuture<>();
1206 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1207 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1208 if (TRACE_LOGGER.isDebugEnabled()) {
1209 TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}' succeeded.", Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1210 }
1211 returningFuture.complete(null);
1212 } else {
1213
1214 Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1215 TRACE_LOGGER.error("Update disposition on entity '{}' failed", this.receivePath, failureException);
1216 returningFuture.completeExceptionally(failureException);
1217 }
1218 return returningFuture;
1219 }, MessagingFactory.INTERNAL_THREAD_POOL);
1220 }, MessagingFactory.INTERNAL_THREAD_POOL);
1221 }
1222
1223 public CompletableFuture<Void> renewSessionLocksAsync() {
1224 this.throwIfInUnusableState();
1225 TRACE_LOGGER.debug("Renewing session lock on entity '{}' of sesion '{}'", this.receivePath, this.getSessionId());
1226 return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1227 HashMap requestBodyMap = new HashMap();
1228 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1229
1230 Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RENEW_SESSIONLOCK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1231 CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1232 return responseFuture.thenComposeAsync((responseMessage) -> {
1233 CompletableFuture<Void> returningFuture = new CompletableFuture<>();
1234 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1235 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1236 Date expiration = (Date) RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_EXPIRATION);
1237 this.sessionLockedUntilUtc = expiration.toInstant();
1238 TRACE_LOGGER.debug("Session lock on entity '{}' of sesion '{}' renewed until '{}'", this.receivePath, this.getSessionId(), this.sessionLockedUntilUtc);
1239 returningFuture.complete(null);
1240 } else {
1241
1242 Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1243 TRACE_LOGGER.error("Renewing session lock on entity '{}' of sesion '{}' failed", this.receivePath, this.getSessionId(), failureException);
1244 returningFuture.completeExceptionally(failureException);
1245 }
1246 return returningFuture;
1247 }, MessagingFactory.INTERNAL_THREAD_POOL);
1248 }, MessagingFactory.INTERNAL_THREAD_POOL);
1249 }
1250
1251 public CompletableFuture<byte[]> getSessionStateAsync() {
1252 this.throwIfInUnusableState();
1253 TRACE_LOGGER.debug("Getting session state of sesion '{}' from entity '{}'", this.getSessionId(), this.receivePath);
1254 return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1255 HashMap requestBodyMap = new HashMap();
1256 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1257
1258 Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1259 CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1260 return responseFuture.thenComposeAsync((responseMessage) -> {
1261 CompletableFuture<byte[]> returningFuture = new CompletableFuture<>();
1262 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1263 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1264 TRACE_LOGGER.debug("Got session state of sesion '{}' from entity '{}'", this.getSessionId(), this.receivePath);
1265 byte[] receivedState = null;
1266 Map bodyMap = RequestResponseUtils.getResponseBody(responseMessage);
1267 if (bodyMap.containsKey(ClientConstants.REQUEST_RESPONSE_SESSION_STATE)) {
1268 Object sessionState = bodyMap.get(ClientConstants.REQUEST_RESPONSE_SESSION_STATE);
1269 if (sessionState != null) {
1270 receivedState = ((Binary) sessionState).getArray();
1271 }
1272 }
1273
1274 returningFuture.complete(receivedState);
1275 } else {
1276
1277 Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1278 TRACE_LOGGER.error("Getting session state of sesion '{}' from entity '{}' failed", this.getSessionId(), this.receivePath, failureException);
1279 returningFuture.completeExceptionally(failureException);
1280 }
1281 return returningFuture;
1282 }, MessagingFactory.INTERNAL_THREAD_POOL);
1283 }, MessagingFactory.INTERNAL_THREAD_POOL);
1284 }
1285
1286
1287 public CompletableFuture<Void> setSessionStateAsync(byte[] sessionState) {
1288 this.throwIfInUnusableState();
1289 TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}'", this.getSessionId(), this.receivePath);
1290 return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1291 HashMap requestBodyMap = new HashMap();
1292 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1293 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSION_STATE, sessionState == null ? null : new Binary(sessionState));
1294
1295 Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1296 CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1297 return responseFuture.thenComposeAsync((responseMessage) -> {
1298 CompletableFuture<Void> returningFuture = new CompletableFuture<>();
1299 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1300 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1301 TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}' succeeded", this.getSessionId(), this.receivePath);
1302 returningFuture.complete(null);
1303 } else {
1304
1305 Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1306 TRACE_LOGGER.error("Setting session state of sesion '{}' on entity '{}' failed", this.getSessionId(), this.receivePath, failureException);
1307 returningFuture.completeExceptionally(failureException);
1308 }
1309 return returningFuture;
1310 }, MessagingFactory.INTERNAL_THREAD_POOL);
1311 }, MessagingFactory.INTERNAL_THREAD_POOL);
1312 }
1313
1314
1315 public CompletableFuture<Collection<Message>> peekMessagesAsync(long fromSequenceNumber, int messageCount, String sessionId) {
1316 this.throwIfInUnusableState();
1317 return this.createRequestResponseLinkAsync().thenComposeAsync((v) ->
1318 CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, sessionId, this.receiveLink.getName()),
1319 MessagingFactory.INTERNAL_THREAD_POOL);
1320 }
1321
1322 private static class DeliveryStateDispatchHandler extends DispatchHandler {
1323 final Delivery delivery;
1324 final DeliveryState deliveryState;
1325
1326 DeliveryStateDispatchHandler(Delivery delivery, DeliveryState deliveryState) {
1327 this.delivery = delivery;
1328 this.deliveryState = deliveryState;
1329 }
1330
1331 @Override
1332 public void onEvent() {
1333 delivery.disposition(deliveryState);
1334 }
1335 }
1336 }