View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.microsoft.azure.servicebus.primitives;
5   
6   import java.io.IOException;
7   import java.time.Duration;
8   import java.time.Instant;
9   import java.time.ZonedDateTime;
10  import java.util.ArrayList;
11  import java.util.Arrays;
12  import java.util.Collection;
13  import java.util.Date;
14  import java.util.HashMap;
15  import java.util.Iterator;
16  import java.util.LinkedList;
17  import java.util.List;
18  import java.util.Locale;
19  import java.util.Map;
20  import java.util.UUID;
21  import java.util.concurrent.CompletableFuture;
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.ScheduledFuture;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.stream.Collectors;
27  
28  import com.microsoft.azure.servicebus.TransactionContext;
29  import org.apache.qpid.proton.amqp.Binary;
30  import org.apache.qpid.proton.amqp.Symbol;
31  import org.apache.qpid.proton.amqp.UnsignedInteger;
32  import org.apache.qpid.proton.amqp.messaging.Accepted;
33  import org.apache.qpid.proton.amqp.messaging.AmqpValue;
34  import org.apache.qpid.proton.amqp.messaging.Modified;
35  import org.apache.qpid.proton.amqp.messaging.Outcome;
36  import org.apache.qpid.proton.amqp.messaging.Rejected;
37  import org.apache.qpid.proton.amqp.messaging.Released;
38  import org.apache.qpid.proton.amqp.messaging.Source;
39  import org.apache.qpid.proton.amqp.messaging.Target;
40  import org.apache.qpid.proton.amqp.transaction.TransactionalState;
41  import org.apache.qpid.proton.amqp.transport.DeliveryState;
42  import org.apache.qpid.proton.amqp.transport.ErrorCondition;
43  import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
44  import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
45  import org.apache.qpid.proton.engine.BaseHandler;
46  import org.apache.qpid.proton.engine.Connection;
47  import org.apache.qpid.proton.engine.Delivery;
48  import org.apache.qpid.proton.engine.EndpointState;
49  import org.apache.qpid.proton.engine.Receiver;
50  import org.apache.qpid.proton.engine.Session;
51  import org.apache.qpid.proton.message.Message;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  
55  import com.microsoft.azure.servicebus.amqp.DispatchHandler;
56  import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
57  import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
58  import com.microsoft.azure.servicebus.amqp.SessionHandler;
59  
60  /*
61   * Common Receiver that abstracts all amqp related details
62   * translates event-driven reactor model into async receive Api
63   */
64  
65  // TODO: Take a re-look at the choice of collections used. Some of them are overkill may be.
66  public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider {
67      private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageReceiver.class);
68      private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5); // service closes link long before this timeout expires
69      private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1); // Wakes up every 1 millisecond
70      private static final Duration UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(500); // Wakes up every 500 milliseconds
71      private static final Duration ZERO_TIMEOUT_APPROXIMATION = Duration.ofMillis(200);
72      private static final int CREDIT_FLOW_BATCH_SIZE = 50; // Arbitrarily chosen 50 to avoid sending too many flows in case prefetch count is large
73  
74      private final Object requestResonseLinkCreationLock = new Object();
75      private final ConcurrentLinkedQueue<ReceiveWorkItem> pendingReceives;
76      private final ConcurrentHashMap<String, UpdateStateWorkItem> pendingUpdateStateRequests;
77      private final ConcurrentHashMap<String, Delivery> tagsToDeliveriesMap;
78      private final MessagingFactory underlyingFactory;
79      private final String receivePath;
80      private final String sasTokenAudienceURI;
81      private final Duration operationTimeout;
82      private final CompletableFuture<Void> linkClose;
83      private final Object prefetchCountSync;
84      private final SettleModePair settleModePair;
85      private final RetryPolicy retryPolicy;
86      private int prefetchCount;
87      private String sessionId;
88      private boolean isSessionReceiver;
89      private boolean isBrowsableSession;
90      private Instant sessionLockedUntilUtc;
91      private boolean isSessionLockLost;
92      private ConcurrentLinkedQueue<MessageWithDeliveryTag> prefetchedMessages;
93      private Receiver receiveLink;
94      private RequestResponseLink requestResponseLink;
95      private WorkItem<CoreMessageReceiver> linkOpen;
96  
97      private Exception lastKnownLinkError;
98      private Instant lastKnownErrorReportedAt;
99      private final AtomicInteger creditToFlow;
100     private final AtomicInteger creditNeededtoServePendingReceives;
101     private final AtomicInteger currentPrefetechedMessagesCount; // size() on concurrentlinkedqueue is o(n) operation
102     private ScheduledFuture<?> sasTokenRenewTimerFuture;
103     private CompletableFuture<Void> requestResponseLinkCreationFuture;
104     private CompletableFuture<Void> receiveLinkReopenFuture;
105     private final Runnable timedOutUpdateStateRequestsDaemon;
106     private final Runnable returnMesagesLoopDaemon;
107     private final ScheduledFuture<?> updateStateRequestsTimeoutChecker;
108     private final ScheduledFuture<?> returnMessagesLoopRunner;
109     private final MessagingEntityType entityType;
110 
111     // TODO: Change onReceiveComplete to handle empty deliveries. Change onError to retry updateState requests.
112     private CoreMessageReceiver(final MessagingFactory factory,
113             final String name,
114             final String recvPath,
115             final String sessionId,
116             final int prefetchCount,
117             final SettleModePair settleModePair,
118             final MessagingEntityType entityType) {
119         super(name);
120 
121         this.underlyingFactory = factory;
122         this.operationTimeout = factory.getOperationTimeout();
123         this.receivePath = recvPath;
124         this.sasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), recvPath);
125         this.sessionId = sessionId;
126         this.isSessionReceiver = false;
127         this.isBrowsableSession = false;
128         this.prefetchCount = prefetchCount;
129         this.settleModePair = settleModePair;
130         this.prefetchedMessages = new ConcurrentLinkedQueue<>();
131         this.linkClose = new CompletableFuture<>();
132         this.lastKnownLinkError = null;
133         this.prefetchCountSync = new Object();
134         this.retryPolicy = factory.getRetryPolicy();
135         this.pendingReceives = new ConcurrentLinkedQueue<>();
136 
137         this.pendingUpdateStateRequests = new ConcurrentHashMap<>();
138         this.tagsToDeliveriesMap = new ConcurrentHashMap<>();
139         this.lastKnownErrorReportedAt = Instant.now();
140         this.receiveLinkReopenFuture = null;
141         this.creditToFlow = new AtomicInteger();
142         this.creditNeededtoServePendingReceives = new AtomicInteger();
143         this.currentPrefetechedMessagesCount = new AtomicInteger();
144         this.entityType = entityType;
145 
146         this.timedOutUpdateStateRequestsDaemon = () -> {
147             try {
148                 TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", CoreMessageReceiver.this.receivePath);
149                 for (Map.Entry<String, UpdateStateWorkItem> entry : CoreMessageReceiver.this.pendingUpdateStateRequests.entrySet()) {
150                     Duration remainingTime = entry.getValue().getTimeoutTracker().remaining();
151                     if (remainingTime.isZero() || remainingTime.isNegative()) {
152                         CoreMessageReceiver.this.pendingUpdateStateRequests.remove(entry.getKey());
153                         Exception exception = entry.getValue().getLastKnownException();
154                         if (exception == null) {
155                             exception = new TimeoutException("Request timed out.");
156                         }
157                         TRACE_LOGGER.error("UpdateState request timed out. Delivery:{}", entry.getKey(), exception);
158                         AsyncUtil.completeFutureExceptionally(entry.getValue().getWork(), exception);
159                     }
160                 }
161                 TRACE_LOGGER.trace("'{}' core message receiver's internal loop to complete timed out update state requests stopped.", CoreMessageReceiver.this.receivePath);
162             } catch (Throwable e) {
163                 // Shouldn't throw any exception for the executor to run multiple times.. Should never come here
164             }
165         };
166 
167         // CONTRACT: message should be delivered to the caller of MessageReceiver.receive() only from prefetched messages
168         this.returnMesagesLoopDaemon = () -> {
169             try {
170                 TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", CoreMessageReceiver.this.receivePath);
171                 while (!CoreMessageReceiver.this.prefetchedMessages.isEmpty()) {
172                     ReceiveWorkItem currentReceive = CoreMessageReceiver.this.pendingReceives.poll();
173                     if (currentReceive != null) {
174                         if (!currentReceive.getWork().isDone()) {
175                             TRACE_LOGGER.debug("Returning the message received from '{}' to a pending receive request", CoreMessageReceiver.this.receivePath);
176                             currentReceive.cancelTimeoutTask(false);
177                             List<MessageWithDeliveryTag> messages = CoreMessageReceiver.this.receiveCore(currentReceive.getMaxMessageCount());
178                             CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(currentReceive.getMaxMessageCount());
179                             AsyncUtil.completeFuture(currentReceive.getWork(), messages);
180                         }
181                     } else {
182                         break;
183                     }
184                 }
185                 TRACE_LOGGER.trace("'{}' core message receiver's internal loop to return messages to waiting clients stopped.", CoreMessageReceiver.this.receivePath);
186             } catch (Throwable e) {
187                 // Shouldn't throw any exception for the executor to run multiple times.. Should never come here
188             }
189         };
190 
191         // As all update state requests have the same timeout, one timer is better than having one timer per request
192         this.updateStateRequestsTimeoutChecker = Timer.schedule(timedOutUpdateStateRequestsDaemon, CoreMessageReceiver.UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
193         // Scheduling it as a separate thread that wakes up at regular very short intervals.. Doesn't wait on incoming receive requests from callers or incoming deliveries from reactor
194         this.returnMessagesLoopRunner = Timer.schedule(returnMesagesLoopDaemon, CoreMessageReceiver.RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
195     }
196 
197     // Connection has to be associated with Reactor before Creating a receiver on it.
198     @Deprecated
199     public static CompletableFuture<CoreMessageReceiver> create(
200             final MessagingFactory factory,
201             final String name,
202             final String recvPath,
203             final int prefetchCount,
204             final SettleModePair settleModePair) {
205         return create(factory, name, recvPath, prefetchCount, settleModePair, null);
206     }
207 
208     @Deprecated
209     public static CompletableFuture<CoreMessageReceiver> create(
210             final MessagingFactory factory,
211             final String name,
212             final String recvPath,
213             final String sessionId,
214             final boolean isBrowsableSession,
215             final int prefetchCount,
216             final SettleModePair settleModePair) {
217         return create(factory, name, recvPath, sessionId, isBrowsableSession, prefetchCount, settleModePair, null);
218     }
219 
220     public static CompletableFuture<CoreMessageReceiver> create(
221             final MessagingFactory factory,
222             final String name,
223             final String recvPath,
224             final int prefetchCount,
225             final SettleModePair settleModePair,
226             final MessagingEntityType entityType) {
227         TRACE_LOGGER.info("Creating core message receiver to '{}'", recvPath);
228         CoreMessageReceiveroreMessageReceiver.html#CoreMessageReceiver">CoreMessageReceiver msgReceiver = new CoreMessageReceiver(
229                 factory,
230                 name,
231                 recvPath,
232                 null,
233                 prefetchCount,
234                 settleModePair,
235                 entityType);
236         return msgReceiver.createLink();
237     }
238 
239     public static CompletableFuture<CoreMessageReceiver> create(
240             final MessagingFactory factory,
241             final String name,
242             final String recvPath,
243             final String sessionId,
244             final boolean isBrowsableSession,
245             final int prefetchCount,
246             final SettleModePair settleModePair,
247             final MessagingEntityType entityType) {
248         TRACE_LOGGER.info("Creating core session receiver to '{}', sessionId '{}', browseonly session '{}'", recvPath, sessionId, isBrowsableSession);
249         CoreMessageReceiveroreMessageReceiver.html#CoreMessageReceiver">CoreMessageReceiver msgReceiver = new CoreMessageReceiver(
250                 factory,
251                 name,
252                 recvPath,
253                 sessionId,
254                 prefetchCount,
255                 settleModePair,
256                 entityType);
257         msgReceiver.isSessionReceiver = true;
258         msgReceiver.isBrowsableSession = isBrowsableSession;
259         return msgReceiver.createLink();
260     }
261 
262     private CompletableFuture<CoreMessageReceiver> createLink() {
263         this.linkOpen = new WorkItem<>(new CompletableFuture<>(), this.operationTimeout);
264         this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
265         this.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> {
266             if (sasTokenEx != null) {
267                 Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
268                 TRACE_LOGGER.error("Sending SAS Token failed. ReceivePath:{}", this.receivePath, cause);
269                 this.linkOpen.getWork().completeExceptionally(cause);
270             } else {
271                 try {
272                     this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
273                         @Override
274                         public void onEvent() {
275                             CoreMessageReceiver.this.createReceiveLink();
276                         }
277                     });
278                 } catch (IOException ioException) {
279                     this.cancelSASTokenRenewTimer();
280                     this.linkOpen.getWork().completeExceptionally(new ServiceBusException(false, "Failed to create Receiver, see cause for more details.", ioException));
281                 }
282             }
283             
284             return null;
285         }, MessagingFactory.INTERNAL_THREAD_POOL);
286 
287         return this.linkOpen.getWork();
288     }
289 
290     private CompletableFuture<Void> createRequestResponseLinkAsync() {
291         synchronized (this.requestResonseLinkCreationLock) {
292             if (this.requestResponseLinkCreationFuture == null) {
293                 this.requestResponseLinkCreationFuture = new CompletableFuture<>();
294                 this.underlyingFactory.obtainRequestResponseLinkAsync(this.receivePath, this.entityType).handleAsync((rrlink, ex) -> {
295                     if (ex == null) {
296                         this.requestResponseLink = rrlink;
297                         this.requestResponseLinkCreationFuture.complete(null);
298                     } else {
299                         Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex);
300                         this.requestResponseLinkCreationFuture.completeExceptionally(cause);
301                         // Set it to null so next call will retry rr link creation
302                         synchronized (this.requestResonseLinkCreationLock) {
303                             this.requestResponseLinkCreationFuture = null;
304                         }
305                     }
306                     return null;
307                 }, MessagingFactory.INTERNAL_THREAD_POOL);
308             }
309             
310             return this.requestResponseLinkCreationFuture;
311         }
312     }
313 
314     private void closeRequestResponseLink() {
315         synchronized (this.requestResonseLinkCreationLock) {
316             if (this.requestResponseLinkCreationFuture != null) {
317                 this.requestResponseLinkCreationFuture.thenRun(() -> {
318                     this.underlyingFactory.releaseRequestResponseLink(this.receivePath);
319                     this.requestResponseLink = null;
320                 });
321                 this.requestResponseLinkCreationFuture = null;
322             }
323         }
324     }
325 
326     private void createReceiveLink() {
327         TRACE_LOGGER.info("Creating receive link to '{}'", this.receivePath);
328         Connection connection = this.underlyingFactory.getConnection();
329 
330         final Session session = connection.session();
331         session.setIncomingCapacity(Integer.MAX_VALUE);
332         session.open();
333         BaseHandler.setHandler(session, new SessionHandler(this.receivePath));
334 
335         final String receiveLinkNamePrefix = "Receiver".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
336         final String receiveLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer())
337             ? receiveLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer())
338             : receiveLinkNamePrefix;
339         final Receiver receiver = session.receiver(receiveLinkName);
340 
341         Source source = new Source();
342         source.setAddress(receivePath);
343         Map<Symbol, Object> linkProperties = new HashMap<>();
344         // ServiceBus expects timeout to be of type unsignedint
345         linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()));
346         if (this.entityType != null) {
347             linkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue());
348         }
349 
350         if (this.isSessionReceiver) {
351             HashMap filterMap = new HashMap();
352             filterMap.put(ClientConstants.SESSION_FILTER, this.sessionId);
353             source.setFilter(filterMap);
354 
355             linkProperties.put(ClientConstants.LINK_PEEKMODE_PROPERTY, this.isBrowsableSession);
356         }
357 
358         receiver.setSource(source);
359         receiver.setTarget(new Target());
360 
361         // Set settle modes
362         TRACE_LOGGER.debug("Receive link settle mode '{}'", this.settleModePair);
363         receiver.setSenderSettleMode(this.settleModePair.getSenderSettleMode());
364         receiver.setReceiverSettleMode(this.settleModePair.getReceiverSettleMode());
365 
366         receiver.setProperties(linkProperties);
367 
368         final ReceiveLinkHandlerceiveLinkHandler.html#ReceiveLinkHandler">ReceiveLinkHandler handler = new ReceiveLinkHandler(this);
369         BaseHandler.setHandler(receiver, handler);
370         receiver.open();
371         this.receiveLink = receiver;
372         this.underlyingFactory.registerForConnectionError(this.receiveLink);
373     }
374 
375     CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean retryOnFailure) {
376         if (this.getIsClosingOrClosed()) {
377             return CompletableFuture.completedFuture(null);
378         } else {
379             CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
380             return sendTokenFuture.thenAccept((f) -> this.sasTokenRenewTimerFuture = f);
381         }
382     }
383 
384     private void throwIfInUnusableState() {
385         if (this.isSessionReceiver && this.isSessionLockLost) {
386             throw new IllegalStateException("Session lock lost and cannot be used. Close this session and accept another session.");
387         }
388 
389         this.throwIfClosed(this.lastKnownLinkError);
390     }
391     
392     private void cancelSASTokenRenewTimer() {
393         if (this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) {
394             this.sasTokenRenewTimerFuture.cancel(true);
395             TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
396         }
397     }
398 
399     private List<MessageWithDeliveryTag> receiveCore(int messageCount) {
400         List<MessageWithDeliveryTag> returnMessages = null;
401         MessageWithDeliveryTag currentMessage = this.prefetchedMessages.poll();
402         int returnedMessageCount = 0;
403         while (currentMessage != null) {
404             this.currentPrefetechedMessagesCount.decrementAndGet();
405             if (returnMessages == null) {
406                 returnMessages = new LinkedList<>();
407             }
408 
409             returnMessages.add(currentMessage);
410             if (++returnedMessageCount >= messageCount) {
411                 break;
412             }
413 
414             currentMessage = this.prefetchedMessages.poll();
415         }
416 
417         return returnMessages;
418     }
419 
420     public int getPrefetchCount() {
421         synchronized (this.prefetchCountSync) {
422             return this.prefetchCount;
423         }
424     }
425 
426     public String getSessionId() {
427         return this.sessionId;
428     }
429 
430 
431     public Instant getSessionLockedUntilUtc() {
432         if (this.isSessionReceiver) {
433             return this.sessionLockedUntilUtc;
434         } else {
435             throw new RuntimeException("Object is not a session receiver");
436         }
437     }
438 
439     public void setPrefetchCount(final int value) throws ServiceBusException {
440         if (value < 0) {
441             throw new IllegalArgumentException("Prefetch count cannot be negative.");
442         }
443         this.throwIfInUnusableState();
444         final int deltaPrefetchCount;
445         synchronized (this.prefetchCountSync) {
446             deltaPrefetchCount = value - this.prefetchCount;
447             this.prefetchCount = value;
448             TRACE_LOGGER.info("Setting prefetch count to '{}' on recieve link to '{}'", value, this.receivePath);
449         }
450 
451         if (deltaPrefetchCount > 0) {
452             try {
453                 this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
454                     @Override
455                     public void onEvent() {
456                         sendFlow(deltaPrefetchCount);
457                     }
458                 });
459             } catch (IOException ioException) {
460                 throw new ServiceBusException(false, "Setting prefetch count failed, see cause for more details", ioException);
461             }
462         }
463     }
464 
465     public CompletableFuture<Collection<MessageWithDeliveryTag>> receiveAsync(final int maxMessageCount, Duration timeout) {
466         this.throwIfInUnusableState();
467 
468         if (maxMessageCount <= 0) {
469             throw new IllegalArgumentException("parameter 'maxMessageCount' should be a positive number");
470         }
471 
472         TRACE_LOGGER.debug("Receiving maximum of '{}' messages from '{}'", maxMessageCount, this.receivePath);
473         CompletableFuture<Collection<MessageWithDeliveryTag>> onReceive = new CompletableFuture<>();
474         final ReceiveWorkItemeceiveWorkItem.html#ReceiveWorkItem">ReceiveWorkItem receiveWorkItem = new ReceiveWorkItem(onReceive, timeout, maxMessageCount);
475         this.creditNeededtoServePendingReceives.addAndGet(maxMessageCount);
476         this.pendingReceives.add(receiveWorkItem);
477         // ZERO timeout is special case in SBMP clients where the timeout is sent to the service along with request. It meant 'give me messages you already have, but don't wait'.
478         // As we don't send timeout to service in AMQP, treating this as a special case and using a very short timeout
479         if (timeout == Duration.ZERO) {
480             timeout = ZERO_TIMEOUT_APPROXIMATION;
481         }
482 
483         Timer.schedule(
484             () -> {
485                 if (CoreMessageReceiver.this.pendingReceives.remove(receiveWorkItem)) {
486                     CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
487                     TRACE_LOGGER.info("No messages received from '{}'. Pending receive request timed out. Returning null to the client.", CoreMessageReceiver.this.receivePath);
488                     AsyncUtil.completeFuture(receiveWorkItem.getWork(), null);
489                 }
490             },
491             timeout,
492             TimerType.OneTimeRun);
493         
494         this.ensureLinkIsOpen().thenRun(() -> this.addCredit(receiveWorkItem));
495         return onReceive;
496     }
497 
498     @Override
499     public void onOpenComplete(Exception exception) {
500         if (exception == null) {
501             TRACE_LOGGER.info("Receive link to '{}' opened.", this.receivePath);
502             if (this.isSessionReceiver) {
503                 Map remoteSourceFilter = ((Source) this.receiveLink.getRemoteSource()).getFilter();
504                 if (remoteSourceFilter != null && remoteSourceFilter.containsKey(ClientConstants.SESSION_FILTER)) {
505                     String remoteSessionId = (String) remoteSourceFilter.get(ClientConstants.SESSION_FILTER);
506                     this.sessionId = remoteSessionId;
507 
508                     if (this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.LOCKED_UNTIL_UTC)) {
509                         this.sessionLockedUntilUtc = Util.convertDotNetTicksToInstant((long) this.receiveLink.getRemoteProperties().get(ClientConstants.LOCKED_UNTIL_UTC));
510                     } else {
511                         TRACE_LOGGER.warn("Accepted a session with id '{}', from '{}' which didn't set '{}' property on the receive link.", this.sessionId, this.receivePath, ClientConstants.LOCKED_UNTIL_UTC);
512                         this.sessionLockedUntilUtc = Instant.ofEpochMilli(0);
513                     }
514 
515                     TRACE_LOGGER.info("Accepted session with id '{}', lockedUntilUtc '{}' from '{}'.", this.sessionId, this.sessionLockedUntilUtc, this.receivePath);
516                 } else {
517                     exception = new ServiceBusException(false, "SessionId filter not set on the remote source.");
518                 }
519             }
520         }
521 
522         if (exception == null) {
523             if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
524                 AsyncUtil.completeFuture(this.linkOpen.getWork(), this);
525             }
526 
527             if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
528                 AsyncUtil.completeFuture(this.receiveLinkReopenFuture, null);
529             }
530 
531             this.lastKnownLinkError = null;
532 
533             this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
534 
535             this.sendFlow(this.prefetchCount - this.currentPrefetechedMessagesCount.get());
536 
537             TRACE_LOGGER.debug("receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}",
538                     this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount);
539         } else {
540             this.cancelSASTokenRenewTimer();
541             
542             if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
543                 TRACE_LOGGER.error("Opening receive link '{}' to '{}' failed.", this.receiveLink.getName(), this.receivePath, exception);
544                 this.setClosed();
545                 ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), exception, this, true);
546             }
547 
548             if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
549                 TRACE_LOGGER.warn("Opening receive link '{}' to '{}' failed.", this.receiveLink.getName(), this.receivePath, exception);
550                 AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
551             }
552 
553             this.lastKnownLinkError = exception;
554         }
555     }
556 
557     @Override
558     public void onReceiveComplete(Delivery delivery) {
559         this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
560         byte[] deliveryTag = delivery.getTag();
561         String deliveryTagAsString  = StringUtil.convertBytesToString(delivery.getTag());
562         TRACE_LOGGER.debug("Received a delivery '{}' from '{}'", deliveryTagAsString, this.receivePath);
563         if (deliveryTag == null || deliveryTag.length == 0 || !this.tagsToDeliveriesMap.containsKey(deliveryTagAsString)) {
564             TRACE_LOGGER.debug("Received a message from '{}'. Adding to prefecthed messages.", this.receivePath);
565             try {
566                 Message message = Util.readMessageFromDelivery(receiveLink, delivery);
567 
568                 if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.SETTLED) {
569                     // No op. Delivery comes settled from the sender
570                     delivery.disposition(Accepted.getInstance());
571                     delivery.settle();
572                 } else {
573                     this.tagsToDeliveriesMap.put(StringUtil.convertBytesToString(delivery.getTag()), delivery);
574                     receiveLink.advance();
575                 }
576 
577                 // Accuracy of count is not that important. So not making those two operations atomic
578                 this.currentPrefetechedMessagesCount.incrementAndGet();
579                 this.prefetchedMessages.add(new MessageWithDeliveryTag(message, delivery.getTag()));
580             } catch (Exception e) {
581                 TRACE_LOGGER.warn("Reading message from delivery '{}' from '{}', session '{}' failed with unexpected exception.", deliveryTagAsString, this.receivePath, this.sessionId, e);
582                 delivery.disposition(Released.getInstance());
583                 delivery.settle();
584                 return;
585             }
586         } else {
587             DeliveryState remoteState = delivery.getRemoteState();
588             TRACE_LOGGER.debug("Received a delivery '{}' with state '{}' from '{}'", deliveryTagAsString, remoteState, this.receivePath);
589 
590             Outcome remoteOutcome = null;
591             if (remoteState instanceof Outcome) {
592                 remoteOutcome = (Outcome) remoteState;
593             } else if (remoteState instanceof TransactionalState) {
594                 remoteOutcome = ((TransactionalState) remoteState).getOutcome();
595             }
596 
597             if (remoteOutcome != null) {
598                 UpdateStateWorkItem matchingUpdateStateWorkItem = this.pendingUpdateStateRequests.get(deliveryTagAsString);
599                 if (matchingUpdateStateWorkItem != null) {
600                     DeliveryState matchingUpdateWorkItemDeliveryState = matchingUpdateStateWorkItem.getDeliveryState();
601                     if (matchingUpdateWorkItemDeliveryState instanceof TransactionalState) {
602                         matchingUpdateWorkItemDeliveryState = (DeliveryState) ((TransactionalState) matchingUpdateWorkItemDeliveryState).getOutcome();
603                     }
604 
605                     // This comparison is ugly. Using it for the lack of equals operation on Outcome classes
606                     if (remoteOutcome.getClass().getName().equals(matchingUpdateWorkItemDeliveryState.getClass().getName())) {
607                         TRACE_LOGGER.debug("Completing a pending updateState operation for delivery '{}' from '{}'", deliveryTagAsString, this.receivePath);
608                         this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, null);
609                     } else {
610 //                      if(matchingUpdateStateWorkItem.expectedOutcome instanceof Accepted)
611 //                      {
612                         TRACE_LOGGER.warn("Received delivery '{}' state '{}' doesn't match expected state '{}'", deliveryTagAsString, remoteState, matchingUpdateStateWorkItem.deliveryState);
613                         // Complete requests
614                         if (remoteOutcome instanceof Rejected) {
615                             Rejected rejected = (Rejected) remoteOutcome;
616                             ErrorCondition error = rejected.getError();
617                             Exception exception = ExceptionUtil.toException(error);
618 
619                             if (ExceptionUtil.isGeneralError(error.getCondition())) {
620                                 this.lastKnownLinkError = exception;
621                                 this.lastKnownErrorReportedAt = Instant.now();
622                             }
623 
624                             Duration retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), exception, matchingUpdateStateWorkItem.getTimeoutTracker().remaining());
625                             if (retryInterval == null) {
626                                 TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception);
627                                 this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
628                             } else {
629                                 matchingUpdateStateWorkItem.setLastKnownException(exception);
630                                 // Retry after retry interval
631                                 TRACE_LOGGER.debug("Pending updateState operation for delivery '{}' will be retried after '{}'", deliveryTagAsString, retryInterval);
632                                 try {
633                                     this.underlyingFactory.scheduleOnReactorThread((int) retryInterval.toMillis(), new DeliveryStateDispatchHandler(delivery, matchingUpdateStateWorkItem.getDeliveryState()));
634                                 } catch (IOException ioException) {
635                                     this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem,
636                                             new ServiceBusException(false, "Operation failed while scheduling a retry on Reactor, see cause for more details.", ioException));
637                                 }
638                             }
639                         } else if (remoteOutcome instanceof Released) {
640                             Exception exception = new OperationCancelledException(remoteOutcome.toString());
641                             TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception);
642                             this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
643                         } else {
644                             Exception exception = new ServiceBusException(false, remoteOutcome.toString());
645                             TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception);
646                             this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
647                         }
648 //                      }
649                     }
650                 }
651             }
652         }
653     }
654 
655     @Override
656     public void onError(Exception exception) {
657         this.creditToFlow.set(0);
658         this.cancelSASTokenRenewTimer();
659         if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.UNSETTLED) {
660             this.prefetchedMessages.clear();
661             this.currentPrefetechedMessagesCount.set(0);
662             this.tagsToDeliveriesMap.clear();
663         }
664 
665         if (this.getIsClosingOrClosed()) {
666             TRACE_LOGGER.info("Receive link to '{}', sessionId '{}' closed", this.receivePath, this.sessionId);
667             AsyncUtil.completeFuture(this.linkClose, null);
668             this.clearAllPendingWorkItems(exception);
669         } else {
670             this.underlyingFactory.deregisterForConnectionError(this.receiveLink);
671             TRACE_LOGGER.warn("Receive link '{}' to '{}', sessionId '{}' closed with error.", this.receiveLink.getName(), this.receivePath, this.sessionId, exception);
672             this.lastKnownLinkError = exception;
673             if ((this.linkOpen != null && !this.linkOpen.getWork().isDone())
674                 || (this.receiveLinkReopenFuture != null && !receiveLinkReopenFuture.isDone())) {
675                 this.onOpenComplete(exception);
676             }
677 
678             if (exception != null
679                 && (!(exception instanceof ServiceBusException./../com/microsoft/azure/servicebus/primitives/ServiceBusException.html#ServiceBusException">ServiceBusException) || !((ServiceBusException) exception).getIsTransient())) {
680                 this.clearAllPendingWorkItems(exception);
681                 
682                 if (this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException)) {
683                     // No point in retrying to establish a link.. SessionLock is lost
684                     TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId);
685                     this.isSessionLockLost = true;
686                     this.closeAsync();
687                 }
688             } else {
689                 // TODO: Why recreating link needs to wait for retry interval of pending receive?
690                 ReceiveWorkItem workItem = this.pendingReceives.peek();
691                 if (workItem != null && workItem.getTimeoutTracker() != null) {
692                     Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy()
693                             .getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining());
694                     if (nextRetryInterval != null) {
695                         TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval);
696                         Timer.schedule(() -> CoreMessageReceiver.this.ensureLinkIsOpen(), nextRetryInterval, TimerType.OneTimeRun);
697                     }
698                 }
699             }
700         }
701     }
702 
703     private void reduceCreditForCompletedReceiveRequest(int maxCreditCountOfReceiveRequest) {
704         this.creditNeededtoServePendingReceives.updateAndGet((c) -> {
705             int updatedCredit = c - maxCreditCountOfReceiveRequest;
706             return (updatedCredit > 0) ? updatedCredit : 0;
707         });
708     }
709 
710     private void addCredit(ReceiveWorkItem receiveWorkItem) {
711         // Timed out receive requests and batch receive requests completed with less than maxCount messages might have sent more credit
712         // than consumed by the receiver resulting in excess credit at the service endpoint.
713         int creditToFlowForWorkItem = this.creditNeededtoServePendingReceives.get() - (this.receiveLink.getCredit() + this.currentPrefetechedMessagesCount.get() + this.creditToFlow.get()) + this.prefetchCount;
714         if (creditToFlowForWorkItem > 0) {
715             int currentTotalCreditToSend = this.creditToFlow.addAndGet(creditToFlowForWorkItem);
716             if (currentTotalCreditToSend >= this.prefetchCount || currentTotalCreditToSend >= CREDIT_FLOW_BATCH_SIZE) {
717                 try {
718                     this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
719                         @Override
720                         public void onEvent() {
721                             // Send credit accumulated so far to make it less chat-ty
722                             int accumulatedCredit = CoreMessageReceiver.this.creditToFlow.getAndSet(0);
723                             sendFlow(accumulatedCredit);
724                         }
725                     });
726                 } catch (IOException ioException) {
727                     this.pendingReceives.remove(receiveWorkItem);
728                     this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
729                     receiveWorkItem.getWork().completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException));
730                     receiveWorkItem.cancelTimeoutTask(false);
731                 }
732             }
733         }
734     }
735 
736     private void sendFlow(int credits) {
737         if (!this.isBrowsableSession && credits > 0) {
738             this.receiveLink.flow(credits);
739             TRACE_LOGGER.debug("Sent flow to the service. receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}",
740                     this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), credits);
741         }
742     }
743 
744     private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) {
745         // timer to signal a timeout if exceeds the operationTimeout on MessagingFactory
746         Timer.schedule(
747             () -> {
748                 if (!linkOpen.getWork().isDone()) {
749                     CoreMessageReceiver.this.closeInternals(false);
750                     CoreMessageReceiver.this.setClosed();
751 
752                     Exception operationTimedout = new TimeoutException(
753                             String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()),
754                             CoreMessageReceiver.this.lastKnownLinkError);
755                     TRACE_LOGGER.warn(operationTimedout.getMessage());
756                     ExceptionUtil.completeExceptionally(linkOpen.getWork(), operationTimedout, CoreMessageReceiver.this, true);
757                 }
758             },
759             timeout.remaining(),
760             TimerType.OneTimeRun);
761     }
762 
763     private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) {
764         // timer to signal a timeout if exceeds the operationTimeout on MessagingFactory
765         Timer.schedule(
766             () -> {
767                 if (!linkClose.isDone()) {
768                     Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", CoreMessageReceiver.this.receiveLink.getName(), ZonedDateTime.now()));
769                     TRACE_LOGGER.warn(operationTimedout.getMessage());
770 
771                     ExceptionUtil.completeExceptionally(linkClose, operationTimedout, CoreMessageReceiver.this, true);
772                 }
773             },
774             timeout.remaining(),
775             TimerType.OneTimeRun);
776     }
777 
778     @Override
779     public void onClose(ErrorCondition condition) {
780         if (condition == null) {
781             this.onError(new ServiceBusException(true,
782                     String.format(Locale.US, "Closing the link. LinkName(%s), EntityPath(%s)", this.receiveLink.getName(), this.receivePath)));
783         } else {
784             Exception completionException = ExceptionUtil.toException(condition);
785             this.onError(completionException);
786         }
787     }
788 
789     @Override
790     public ErrorContext getContext() {
791         final boolean isLinkOpened = this.linkOpen != null && this.linkOpen.getWork().isDone();
792         final String referenceId = this.receiveLink != null && this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)
793                 ? this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString()
794                         : ((this.receiveLink != null) ? this.receiveLink.getName() : null);
795 
796         ReceiverErrorContexteiverErrorContext.html#ReceiverErrorContext">ReceiverErrorContext errorContext = new ReceiverErrorContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null,
797                 this.receivePath,
798                 referenceId,
799                 isLinkOpened ? this.prefetchCount : null,
800                 isLinkOpened && this.receiveLink != null ? this.receiveLink.getCredit() : null,
801                 this.currentPrefetechedMessagesCount.get());
802 
803         return errorContext;
804     }
805 
806     @Override
807     protected CompletableFuture<Void> onClose() {
808         this.closeInternals(true);
809         return this.linkClose;
810     }
811 
812     private void closeInternals(boolean waitForCloseCompletion) {
813         if (!this.getIsClosed()) {
814             if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) {
815                 try {
816                     this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
817                         
818                         @Override
819                         public void onEvent() {
820                             if (CoreMessageReceiver.this.receiveLink != null && CoreMessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) {
821                                 TRACE_LOGGER.info("Closing receive link to '{}'", CoreMessageReceiver.this.receivePath);
822                                 CoreMessageReceiver.this.receiveLink.close();
823                                 CoreMessageReceiver.this.underlyingFactory.deregisterForConnectionError(CoreMessageReceiver.this.receiveLink);
824                                 if (waitForCloseCompletion) {
825                                     CoreMessageReceiver.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageReceiver.this.operationTimeout));
826                                 } else {
827                                     AsyncUtil.completeFuture(CoreMessageReceiver.this.linkClose, null);
828                                 }
829                             }
830                         }
831                     });
832                 } catch (IOException e) {
833                     AsyncUtil.completeFutureExceptionally(this.linkClose, e);
834                 }
835             } else {
836                 AsyncUtil.completeFuture(this.linkClose, null);
837             }
838             
839             this.cancelSASTokenRenewTimer();
840             this.closeRequestResponseLink();
841             this.updateStateRequestsTimeoutChecker.cancel(false);
842             this.returnMessagesLoopRunner.cancel(false);
843         }
844     }
845 
846     /*
847     This is to be used for messages which are received on receiveLink.
848      */
849     public CompletableFuture<Void> completeMessageAsync(byte[] deliveryTag, TransactionContext transaction) {
850         Outcome outcome = Accepted.getInstance();
851         return this.updateMessageStateAsync(deliveryTag, outcome, transaction);
852     }
853 
854     /*
855     This is to be used for messages which are received on RequestResponseLink
856      */
857     public CompletableFuture<Void> completeMessageAsync(UUID lockToken, TransactionContext transaction) {
858         return this.updateDispositionAsync(
859                 new UUID[]{lockToken},
860                 ClientConstants.DISPOSITION_STATUS_COMPLETED,
861                 null,
862                 null,
863                 null,
864                 transaction);
865     }
866 
867     public CompletableFuture<Void> abandonMessageAsync(byte[] deliveryTag, Map<String, Object> propertiesToModify, TransactionContext transaction) {
868         Modified outcome = new Modified();
869         if (propertiesToModify != null) {
870             outcome.setMessageAnnotations(propertiesToModify);
871         }
872         return this.updateMessageStateAsync(deliveryTag, outcome, transaction);
873     }
874 
875     public CompletableFuture<Void> abandonMessageAsync(UUID lockToken, Map<String, Object> propertiesToModify, TransactionContext transaction) {
876         return this.updateDispositionAsync(
877                 new UUID[]{lockToken},
878                 ClientConstants.DISPOSITION_STATUS_ABANDONED,
879                 null,
880                 null,
881                 propertiesToModify,
882                 transaction);
883     }
884 
885     public CompletableFuture<Void> deferMessageAsync(byte[] deliveryTag, Map<String, Object> propertiesToModify, TransactionContext transaction) {
886         Modified outcome = new Modified();
887         outcome.setUndeliverableHere(true);
888         if (propertiesToModify != null) {
889             outcome.setMessageAnnotations(propertiesToModify);
890         }
891         return this.updateMessageStateAsync(deliveryTag, outcome, transaction);
892     }
893 
894     public CompletableFuture<Void> deferMessageAsync(UUID lockToken, Map<String, Object> propertiesToModify, TransactionContext transaction) {
895         return this.updateDispositionAsync(
896                 new UUID[]{lockToken},
897                 ClientConstants.DISPOSITION_STATUS_DEFERED,
898                 null,
899                 null,
900                 propertiesToModify,
901                 transaction);
902     }
903 
904     public CompletableFuture<Void> deadLetterMessageAsync(
905             byte[] deliveryTag,
906             String deadLetterReason,
907             String deadLetterErrorDescription,
908             Map<String, Object> propertiesToModify,
909             TransactionContext transaction) {
910         Rejected outcome = new Rejected();
911         ErrorCondition error = new ErrorCondition(ClientConstants.DEADLETTERNAME, null);
912         Map<String, Object> errorInfo = new HashMap<>();
913         if (!StringUtil.isNullOrEmpty(deadLetterReason)) {
914             errorInfo.put(ClientConstants.DEADLETTER_REASON_HEADER, deadLetterReason);
915         }
916         if (!StringUtil.isNullOrEmpty(deadLetterErrorDescription)) {
917             errorInfo.put(ClientConstants.DEADLETTER_ERROR_DESCRIPTION_HEADER, deadLetterErrorDescription);
918         }
919         if (propertiesToModify != null) {
920             errorInfo.putAll(propertiesToModify);
921         }
922         error.setInfo(errorInfo);
923         outcome.setError(error);
924 
925         return this.updateMessageStateAsync(deliveryTag, outcome, transaction);
926     }
927 
928     public CompletableFuture<Void> deadLetterMessageAsync(
929             UUID lockToken,
930             String deadLetterReason,
931             String deadLetterErrorDescription,
932             Map<String, Object> propertiesToModify,
933             TransactionContext transaction) {
934         return this.updateDispositionAsync(
935                 new UUID[]{lockToken},
936                 ClientConstants.DISPOSITION_STATUS_SUSPENDED,
937                 deadLetterReason,
938                 deadLetterErrorDescription,
939                 propertiesToModify,
940                 transaction);
941     }
942 
943     private CompletableFuture<Void> updateMessageStateAsync(byte[] deliveryTag, Outcome outcome, TransactionContext transaction) {
944         this.throwIfInUnusableState();
945         CompletableFuture<Void> completeMessageFuture = new CompletableFuture<>();
946 
947         String deliveryTagAsString = StringUtil.convertBytesToString(deliveryTag);
948         TRACE_LOGGER.debug("Updating message state of delivery '{}' to '{}'", deliveryTagAsString, outcome);
949         Delivery delivery = CoreMessageReceiver.this.tagsToDeliveriesMap.get(deliveryTagAsString);
950         if (delivery == null) {
951             TRACE_LOGGER.error("Delivery not found for delivery tag '{}'. Either receive link to '{}' closed with a transient error and reopened or the delivery was already settled by complete/abandon/defer/deadletter.", deliveryTagAsString, this.receivePath);
952             completeMessageFuture.completeExceptionally(generateDeliveryNotFoundException());
953         } else {
954             DeliveryState state;
955             if (transaction != TransactionContext.NULL_TXN) {
956                 state = new TransactionalState();
957                 ((TransactionalState) state).setTxnId(new Binary(transaction.getTransactionId().array()));
958                 ((TransactionalState) state).setOutcome(outcome);
959             } else {
960                 state = (DeliveryState) outcome;
961             }
962 
963             final UpdateStateWorkItems/UpdateStateWorkItem.html#UpdateStateWorkItem">UpdateStateWorkItem workItem = new UpdateStateWorkItem(completeMessageFuture, state, CoreMessageReceiver.this.operationTimeout);
964             CoreMessageReceiver.this.pendingUpdateStateRequests.put(deliveryTagAsString, workItem);
965             
966             CoreMessageReceiver.this.ensureLinkIsOpen().thenRun(() -> {
967                 try {
968                     this.underlyingFactory.scheduleOnReactorThread(new DeliveryStateDispatchHandler(delivery, state));
969                 } catch (IOException ioException) {
970                     completeMessageFuture.completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException));
971                 }
972             });
973         }
974 
975         return completeMessageFuture;
976     }
977 
978     private synchronized CompletableFuture<Void> ensureLinkIsOpen() {
979         // Send SAS token before opening a link as connection might have been closed and reopened
980         if (!(this.receiveLink.getLocalState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE)) {
981             if (this.receiveLinkReopenFuture == null || this.receiveLinkReopenFuture.isDone()) {
982                 TRACE_LOGGER.info("Recreating receive link to '{}'", this.receivePath);
983                 this.retryPolicy.incrementRetryCount(this.getClientId());
984                 this.receiveLinkReopenFuture = new CompletableFuture<>();
985                 // Variable just to be closed over by the scheduled runnable. The runnable should cancel only the closed over future, not the parent's instance variable which can change
986                 final CompletableFuture<Void> linkReopenFutureThatCanBeCancelled = this.receiveLinkReopenFuture;
987                 Timer.schedule(
988                     () -> {
989                         if (!linkReopenFutureThatCanBeCancelled.isDone()) {
990                             CoreMessageReceiver.this.cancelSASTokenRenewTimer();
991                             Exception operationTimedout = new TimeoutException(
992                                     String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()));
993 
994                             TRACE_LOGGER.warn(operationTimedout.getMessage());
995                             AsyncUtil.completeFutureExceptionally(linkReopenFutureThatCanBeCancelled, operationTimedout);
996                         }
997                     },
998                     CoreMessageReceiver.LINK_REOPEN_TIMEOUT,
999                     TimerType.OneTimeRun);
1000                 this.cancelSASTokenRenewTimer();
1001                 this.sendTokenAndSetRenewTimer(false).handleAsync((v, sendTokenEx) -> {
1002                     if (sendTokenEx != null) {
1003                         Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
1004                         TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.receivePath, cause);
1005                         this.receiveLinkReopenFuture.completeExceptionally(sendTokenEx);
1006                         this.clearAllPendingWorkItems(sendTokenEx);
1007                     } else {
1008                         try {
1009                             this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
1010                                 @Override
1011                                 public void onEvent() {
1012                                     CoreMessageReceiver.this.createReceiveLink();
1013                                 }
1014                             });
1015                         } catch (IOException ioEx) {
1016                             this.receiveLinkReopenFuture.completeExceptionally(ioEx);
1017                         }
1018                     }
1019                     return null;
1020                 }, MessagingFactory.INTERNAL_THREAD_POOL);
1021             }
1022 
1023             return this.receiveLinkReopenFuture;
1024         } else {
1025             return CompletableFuture.completedFuture(null);
1026         }
1027     }
1028 
1029     private void completePendingUpdateStateWorkItem(Delivery delivery, String deliveryTagAsString, UpdateStateWorkItem workItem, Exception exception) {
1030         boolean isSettled = delivery.remotelySettled();
1031         if (isSettled) {
1032             delivery.settle();
1033         }
1034 
1035         if (exception == null) {
1036             AsyncUtil.completeFuture(workItem.getWork(), null);
1037         } else {
1038             ExceptionUtil.completeExceptionally(workItem.getWork(), exception, this, true);
1039         }
1040 
1041         if (isSettled) {
1042             this.tagsToDeliveriesMap.remove(deliveryTagAsString);
1043             this.pendingUpdateStateRequests.remove(deliveryTagAsString);
1044         }
1045     }
1046 
1047     private void clearAllPendingWorkItems(Throwable exception) {
1048         TRACE_LOGGER.info("Completeing all pending receive and updateState operation on the receiver to '{}'", this.receivePath);
1049         final boolean isTransientException = exception == null
1050             || (exception instanceof ServiceBusExceptionom/microsoft/azure/servicebus/primitives/ServiceBusException.html#ServiceBusException">ServiceBusException && ((ServiceBusException) exception).getIsTransient());
1051 
1052         Iterator<ReceiveWorkItem> pendingRecivesIterator = this.pendingReceives.iterator();
1053         while (pendingRecivesIterator.hasNext()) {
1054             ReceiveWorkItem workItem = pendingRecivesIterator.next();
1055             pendingRecivesIterator.remove();
1056 
1057             CompletableFuture<Collection<MessageWithDeliveryTag>> future = workItem.getWork();
1058             workItem.cancelTimeoutTask(false);
1059             this.reduceCreditForCompletedReceiveRequest(workItem.getMaxMessageCount());
1060             if (isTransientException) {
1061                 AsyncUtil.completeFuture(future, null);
1062             } else {
1063                 ExceptionUtil.completeExceptionally(future, exception, this, true);
1064             }
1065         }
1066 
1067         for (Map.Entry<String, UpdateStateWorkItem> pendingUpdate : this.pendingUpdateStateRequests.entrySet()) {
1068             pendingUpdateStateRequests.remove(pendingUpdate.getKey());
1069             ExceptionUtil.completeExceptionally(pendingUpdate.getValue().getWork(), exception, this, true);
1070         }
1071     }
1072 
1073     private static IllegalArgumentException generateDeliveryNotFoundException() {
1074         return new IllegalArgumentException("Delivery not found on the receive link.");
1075     }
1076 
1077     private static ServiceBusException generateDispatacherSchedulingFailedException(String operation, Exception cause) {
1078         return new ServiceBusException(false, operation + " failed while dispatching to Reactor, see cause for more details.", cause);
1079     }
1080 
1081     public CompletableFuture<Collection<Instant>> renewMessageLocksAsync(UUID[] lockTokens) {
1082         this.throwIfInUnusableState();
1083         if (TRACE_LOGGER.isDebugEnabled()) {
1084             TRACE_LOGGER.debug("Renewing message locks for lock tokens '{}' of entity '{}', sesion '{}'", Arrays.toString(lockTokens), this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1085         }
1086         return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1087             HashMap requestBodyMap = new HashMap();
1088             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, lockTokens);
1089             if (this.isSessionReceiver) {
1090                 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1091             }
1092 
1093             Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RENEWLOCK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1094             CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1095             return responseFuture.thenComposeAsync((responseMessage) -> {
1096                 CompletableFuture<Collection<Instant>> returningFuture = new CompletableFuture<>();
1097                 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1098                 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1099                     if (TRACE_LOGGER.isDebugEnabled()) {
1100                         TRACE_LOGGER.debug("Message locks for lock tokens '{}' renewed", Arrays.toString(lockTokens));
1101                     }
1102 
1103                     Date[] expirations = (Date[]) RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_EXPIRATIONS);
1104                     returningFuture.complete(Arrays.stream(expirations).map((d) -> d.toInstant()).collect(Collectors.toList()));
1105                 } else {
1106                     // error response
1107                     Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1108                     TRACE_LOGGER.error("Renewing message locks for lock tokens '{}' on entity '{}' failed", Arrays.toString(lockTokens), this.receivePath, failureException);
1109                     returningFuture.completeExceptionally(failureException);
1110                 }
1111                 return returningFuture;
1112             }, MessagingFactory.INTERNAL_THREAD_POOL);
1113         }, MessagingFactory.INTERNAL_THREAD_POOL);
1114     }
1115 
1116     public CompletableFuture<Collection<MessageWithLockToken>> receiveDeferredMessageBatchAsync(Long[] sequenceNumbers) {
1117         this.throwIfInUnusableState();
1118         if (TRACE_LOGGER.isDebugEnabled()) {
1119             TRACE_LOGGER.debug("Receiving messages for sequence numbers '{}' from entity '{}', sesion '{}'", Arrays.toString(sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1120         }
1121         return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1122             HashMap requestBodyMap = new HashMap();
1123             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers);
1124             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_RECEIVER_SETTLE_MODE, UnsignedInteger.valueOf(this.settleModePair.getReceiverSettleMode() == ReceiverSettleMode.FIRST ? 0 : 1));
1125             if (this.isSessionReceiver) {
1126                 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1127             }
1128 
1129             Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1130             CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1131             return responseFuture.thenComposeAsync((responseMessage) -> {
1132                 CompletableFuture<Collection<MessageWithLockToken>> returningFuture = new CompletableFuture<>();
1133                 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1134                 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1135                     if (TRACE_LOGGER.isDebugEnabled()) {
1136                         TRACE_LOGGER.debug("Received messges for sequence numbers '{}' from entity '{}', sesion '{}'", Arrays.toString(sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1137                     }
1138                     List<MessageWithLockToken> receivedMessages = new ArrayList<>();
1139                     Object responseBodyMap = ((AmqpValue) responseMessage.getBody()).getValue();
1140                     if (responseBodyMap != null && responseBodyMap instanceof Map) {
1141                         Object messages = ((Map) responseBodyMap).get(ClientConstants.REQUEST_RESPONSE_MESSAGES);
1142                         if (messages != null && messages instanceof Iterable) {
1143                             for (Object message : (Iterable) messages) {
1144                                 if (message instanceof Map) {
1145                                     Message receivedMessage = Message.Factory.create();
1146                                     Binary messagePayLoad = (Binary) ((Map) message).get(ClientConstants.REQUEST_RESPONSE_MESSAGE);
1147                                     receivedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength());
1148                                     UUID lockToken = ClientConstants.ZEROLOCKTOKEN;
1149                                     if (((Map) message).containsKey(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN)) {
1150                                         lockToken = (UUID) ((Map) message).get(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN);
1151                                     }
1152 
1153                                     receivedMessages.add(new MessageWithLockToken(receivedMessage, lockToken));
1154                                 }
1155                             }
1156                         }
1157                     }
1158                     returningFuture.complete(receivedMessages);
1159                 } else {
1160                     // error response
1161                     Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1162                     TRACE_LOGGER.error("Receiving messages by sequence numbers '{}' from entity '{}' failed", Arrays.toString(sequenceNumbers), this.receivePath, failureException);
1163                     returningFuture.completeExceptionally(failureException);
1164                 }
1165                 return returningFuture;
1166             }, MessagingFactory.INTERNAL_THREAD_POOL);
1167         }, MessagingFactory.INTERNAL_THREAD_POOL);
1168     }
1169 
1170     public CompletableFuture<Void> updateDispositionAsync(
1171             UUID[] lockTokens,
1172             String dispositionStatus,
1173             String deadLetterReason,
1174             String deadLetterErrorDescription,
1175             Map<String, Object> propertiesToModify,
1176             TransactionContext transaction) {
1177         this.throwIfInUnusableState();
1178         if (TRACE_LOGGER.isDebugEnabled()) {
1179             TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}'", Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1180         }
1181         return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1182             HashMap requestBodyMap = new HashMap();
1183             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, lockTokens);
1184             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DISPOSITION_STATUS, dispositionStatus);
1185 
1186             if (deadLetterReason != null) {
1187                 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_REASON, deadLetterReason);
1188             }
1189 
1190             if (deadLetterErrorDescription != null) {
1191                 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_DESCRIPTION, deadLetterErrorDescription);
1192             }
1193 
1194             if (propertiesToModify != null && propertiesToModify.size() > 0) {
1195                 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_PROPERTIES_TO_MODIFY, propertiesToModify);
1196             }
1197 
1198             if (this.isSessionReceiver) {
1199                 requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1200             }
1201 
1202             Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1203             CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, transaction, this.operationTimeout);
1204             return responseFuture.thenComposeAsync((responseMessage) -> {
1205                 CompletableFuture<Void> returningFuture = new CompletableFuture<>();
1206                 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1207                 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1208                     if (TRACE_LOGGER.isDebugEnabled()) {
1209                         TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}' succeeded.", Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : "");
1210                     }
1211                     returningFuture.complete(null);
1212                 } else {
1213                     // error response
1214                     Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1215                     TRACE_LOGGER.error("Update disposition on entity '{}' failed", this.receivePath, failureException);
1216                     returningFuture.completeExceptionally(failureException);
1217                 }
1218                 return returningFuture;
1219             }, MessagingFactory.INTERNAL_THREAD_POOL);
1220         }, MessagingFactory.INTERNAL_THREAD_POOL);
1221     }
1222 
1223     public CompletableFuture<Void> renewSessionLocksAsync() {
1224         this.throwIfInUnusableState();
1225         TRACE_LOGGER.debug("Renewing session lock on entity '{}' of sesion '{}'", this.receivePath, this.getSessionId());
1226         return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1227             HashMap requestBodyMap = new HashMap();
1228             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1229 
1230             Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RENEW_SESSIONLOCK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1231             CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1232             return responseFuture.thenComposeAsync((responseMessage) -> {
1233                 CompletableFuture<Void> returningFuture = new CompletableFuture<>();
1234                 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1235                 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1236                     Date expiration = (Date) RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_EXPIRATION);
1237                     this.sessionLockedUntilUtc = expiration.toInstant();
1238                     TRACE_LOGGER.debug("Session lock on entity '{}' of sesion '{}' renewed until '{}'", this.receivePath, this.getSessionId(), this.sessionLockedUntilUtc);
1239                     returningFuture.complete(null);
1240                 } else {
1241                     // error response
1242                     Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1243                     TRACE_LOGGER.error("Renewing session lock on entity '{}' of sesion '{}' failed", this.receivePath, this.getSessionId(), failureException);
1244                     returningFuture.completeExceptionally(failureException);
1245                 }
1246                 return returningFuture;
1247             }, MessagingFactory.INTERNAL_THREAD_POOL);
1248         }, MessagingFactory.INTERNAL_THREAD_POOL);
1249     }
1250 
1251     public CompletableFuture<byte[]> getSessionStateAsync() {
1252         this.throwIfInUnusableState();
1253         TRACE_LOGGER.debug("Getting session state of sesion '{}' from entity '{}'", this.getSessionId(), this.receivePath);
1254         return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1255             HashMap requestBodyMap = new HashMap();
1256             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1257 
1258             Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1259             CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1260             return responseFuture.thenComposeAsync((responseMessage) -> {
1261                 CompletableFuture<byte[]> returningFuture = new CompletableFuture<>();
1262                 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1263                 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1264                     TRACE_LOGGER.debug("Got session state of sesion '{}' from entity '{}'", this.getSessionId(), this.receivePath);
1265                     byte[] receivedState = null;
1266                     Map bodyMap = RequestResponseUtils.getResponseBody(responseMessage);
1267                     if (bodyMap.containsKey(ClientConstants.REQUEST_RESPONSE_SESSION_STATE)) {
1268                         Object sessionState = bodyMap.get(ClientConstants.REQUEST_RESPONSE_SESSION_STATE);
1269                         if (sessionState != null) {
1270                             receivedState = ((Binary) sessionState).getArray();
1271                         }
1272                     }
1273 
1274                     returningFuture.complete(receivedState);
1275                 } else {
1276                     // error response
1277                     Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1278                     TRACE_LOGGER.error("Getting session state of sesion '{}' from entity '{}' failed", this.getSessionId(), this.receivePath, failureException);
1279                     returningFuture.completeExceptionally(failureException);
1280                 }
1281                 return returningFuture;
1282             }, MessagingFactory.INTERNAL_THREAD_POOL);
1283         }, MessagingFactory.INTERNAL_THREAD_POOL);
1284     }
1285 
1286     // NULL session state is allowed
1287     public CompletableFuture<Void> setSessionStateAsync(byte[] sessionState) {
1288         this.throwIfInUnusableState();
1289         TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}'", this.getSessionId(), this.receivePath);
1290         return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> {
1291             HashMap requestBodyMap = new HashMap();
1292             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId());
1293             requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSION_STATE, sessionState == null ? null : new Binary(sessionState));
1294 
1295             Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
1296             CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
1297             return responseFuture.thenComposeAsync((responseMessage) -> {
1298                 CompletableFuture<Void> returningFuture = new CompletableFuture<>();
1299                 int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
1300                 if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
1301                     TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}' succeeded", this.getSessionId(), this.receivePath);
1302                     returningFuture.complete(null);
1303                 } else {
1304                     // error response
1305                     Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
1306                     TRACE_LOGGER.error("Setting session state of sesion '{}' on entity '{}' failed", this.getSessionId(), this.receivePath, failureException);
1307                     returningFuture.completeExceptionally(failureException);
1308                 }
1309                 return returningFuture;
1310             }, MessagingFactory.INTERNAL_THREAD_POOL);
1311         }, MessagingFactory.INTERNAL_THREAD_POOL);
1312     }
1313 
1314     // A receiver can be used to peek messages from any session-id, useful for browsable sessions
1315     public CompletableFuture<Collection<Message>> peekMessagesAsync(long fromSequenceNumber, int messageCount, String sessionId) {
1316         this.throwIfInUnusableState();
1317         return this.createRequestResponseLinkAsync().thenComposeAsync((v) ->
1318                 CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, sessionId, this.receiveLink.getName()),
1319             MessagingFactory.INTERNAL_THREAD_POOL);
1320     }
1321 
1322     private static class DeliveryStateDispatchHandler extends DispatchHandler {
1323         final Delivery delivery;
1324         final DeliveryState deliveryState;
1325 
1326         DeliveryStateDispatchHandler(Delivery delivery, DeliveryState deliveryState) {
1327             this.delivery = delivery;
1328             this.deliveryState = deliveryState;
1329         }
1330 
1331         @Override
1332         public void onEvent() {
1333             delivery.disposition(deliveryState);
1334         }
1335     }
1336 }