View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.microsoft.azure.servicebus.primitives;
5   
6   import java.io.IOException;
7   import java.net.URI;
8   import java.nio.channels.UnresolvedAddressException;
9   import java.time.Duration;
10  import java.time.Instant;
11  import java.util.LinkedList;
12  import java.util.Locale;
13  import java.util.UUID;
14  import java.util.concurrent.CompletableFuture;
15  import java.util.concurrent.ExecutionException;
16  import java.util.concurrent.ExecutorService;
17  import java.util.concurrent.Executors;
18  import java.util.concurrent.ScheduledFuture;
19  
20  import com.microsoft.azure.servicebus.TransactionContext;
21  import com.microsoft.azure.servicebus.Utils;
22  import com.microsoft.azure.servicebus.amqp.BaseLinkHandler;
23  import com.microsoft.azure.servicebus.amqp.ConnectionHandler;
24  import com.microsoft.azure.servicebus.amqp.DispatchHandler;
25  import com.microsoft.azure.servicebus.amqp.IAmqpConnection;
26  import com.microsoft.azure.servicebus.amqp.ProtonUtil;
27  import com.microsoft.azure.servicebus.amqp.ReactorDispatcher;
28  import com.microsoft.azure.servicebus.amqp.ReactorHandler;
29  import org.apache.qpid.proton.amqp.Binary;
30  import org.apache.qpid.proton.amqp.transport.ErrorCondition;
31  import org.apache.qpid.proton.engine.BaseHandler;
32  import org.apache.qpid.proton.engine.Connection;
33  import org.apache.qpid.proton.engine.EndpointState;
34  import org.apache.qpid.proton.engine.Event;
35  import org.apache.qpid.proton.engine.Handler;
36  import org.apache.qpid.proton.engine.HandlerException;
37  import org.apache.qpid.proton.engine.Link;
38  import org.apache.qpid.proton.reactor.Reactor;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  import org.slf4j.Marker;
42  import org.slf4j.MarkerFactory;
43  
44  import com.microsoft.azure.servicebus.ClientSettings;
45  import com.microsoft.azure.servicebus.security.SecurityToken;
46  
47  /**
48   * Abstracts all AMQP related details and encapsulates an AMQP connection and manages its life cycle. Each instance of
49   * this class represent one AMQP connection to the namespace. If an application creates multiple senders, receivers
50   * or clients using the same MessagingFactory instance, all those senders, receivers or clients will share the same connection to the namespace.
51   * @since 1.0
52   */
53  public class MessagingFactory extends ClientEntity implements IAmqpConnection {
54      private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessagingFactory.class);
55      public static final ExecutorService INTERNAL_THREAD_POOL = Executors.newCachedThreadPool();
56  
57      private static final String REACTOR_THREAD_NAME_PREFIX = "ReactorThread";
58      private static final int MAX_CBS_LINK_CREATION_ATTEMPTS = 3;
59      private final String hostName;
60      private final CompletableFuture<Void> connetionCloseFuture;
61      private final ConnectionHandler connectionHandler;
62      private final ReactorHandler reactorHandler;
63      private final LinkedList<Link> registeredLinks;
64      private final Object reactorLock;
65      private final RequestResponseLinkCache managementLinksCache;
66  
67      private Reactor reactor;
68      private ReactorDispatcher reactorScheduler;
69      private Connection connection;
70      private Controller controller;
71  
72      private CompletableFuture<MessagingFactory> factoryOpenFuture;
73      private CompletableFuture<Void> cbsLinkCreationFuture;
74      private RequestResponseLink cbsLink;
75      private int cbsLinkCreationAttempts = 0;
76      private Throwable lastCBSLinkCreationException = null;
77      private URI namespaceEndpointUri;
78  
79      private final ClientSettings clientSettings;
80  
81      private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings) {
82          super("MessagingFactory".concat(StringUtil.getShortRandomString()));
83          this.clientSettings = clientSettings;
84          this.namespaceEndpointUri = namespaceEndpointUri;
85          this.hostName = namespaceEndpointUri.getHost();
86          this.registeredLinks = new LinkedList<Link>();
87          this.connetionCloseFuture = new CompletableFuture<Void>();
88          this.reactorLock = new Object();
89          this.connectionHandler =   ConnectionHandler.create(clientSettings.getTransportType(), this);
90          this.factoryOpenFuture = new CompletableFuture<MessagingFactory>();
91          this.cbsLinkCreationFuture = new CompletableFuture<Void>();
92          this.managementLinksCache = new RequestResponseLinkCache(this);
93          this.reactorHandler = new ReactorHandler() {
94              @Override
95              public void onReactorInit(Event e) {
96                  super.onReactorInit(e);
97  
98                  final Reactor r = e.getReactor();
99                  TRACE_LOGGER.info("Creating connection to host '{}:{}'",
100                         connectionHandler.getOutboundSocketHostName(),
101                         connectionHandler.getOutboundSocketPort());
102                 connection = r.connectionToHost(
103                         connectionHandler.getOutboundSocketHostName(),
104                         connectionHandler.getOutboundSocketPort(),
105                         connectionHandler);
106             }
107         };
108         Timer.register(this.getClientId());
109     }
110 
111     /**
112      * Starts a new service side transaction. The {@link TransactionContext} should be passed to all operations that
113      * needs to be in this transaction.
114      * @return a new transaction
115      * @throws ServiceBusException if transaction fails to start
116      * @throws InterruptedException if the current thread was interrupted while waiting
117      */
118     public TransactionContext startTransaction() throws ServiceBusException, InterruptedException {
119         return Utils.completeFuture(this.startTransactionAsync());
120     }
121 
122     /**
123      * Starts a new service side transaction. The {@link TransactionContext} should be passed to all operations that
124      * needs to be in this transaction.
125      * @return A <code>CompletableFuture</code> which returns a new transaction
126      */
127     public CompletableFuture<TransactionContext> startTransactionAsync() {
128         return this.getController()
129                 .thenCompose(controller -> controller.declareAsync()
130                         .thenApply(binary -> new TransactionContext(binary.asByteBuffer(), this)));
131     }
132 
133     /**
134      * Ends a transaction that was initiated using {@link MessagingFactory#startTransactionAsync()}.
135      * @param transaction The transaction object.
136      * @param commit A boolean value of <code>true</code> indicates transaction to be committed. A value of
137      *                  <code>false</code> indicates a transaction rollback.
138      * @throws ServiceBusException if transaction fails to end
139      * @throws InterruptedException if the current thread was interrupted while waiting
140      */
141     public void endTransaction(TransactionContext transaction, boolean commit) throws ServiceBusException, InterruptedException {
142         Utils.completeFuture(this.endTransactionAsync(transaction, commit));
143     }
144 
145     /**
146      * Ends a transaction that was initiated using {@link MessagingFactory#startTransactionAsync()}.
147      * @param transaction The transaction object.
148      * @param commit A boolean value of <code>true</code> indicates transaction to be committed. A value of
149      *                  <code>false</code> indicates a transaction rollback.
150      * @return A <code>CompletableFuture</code>
151      */
152     public CompletableFuture<Void> endTransactionAsync(TransactionContext transaction, boolean commit) {
153         if (transaction == null) {
154             CompletableFuture<Void> exceptionCompletion = new CompletableFuture<>();
155             exceptionCompletion.completeExceptionally(new ServiceBusException(false, "Transaction cannot not be null"));
156             return exceptionCompletion;
157         }
158 
159         return this.getController()
160                 .thenCompose(controller -> controller.dischargeAsync(new Binary(transaction.getTransactionId().array()), commit)
161                 .thenRun(() -> transaction.notifyTransactionCompletion(commit)));
162     }
163 
164     private CompletableFuture<Controller> getController() {
165         if (this.controller != null) {
166             return CompletableFuture.completedFuture(this.controller);
167         }
168 
169         return createController();
170     }
171 
172     private synchronized CompletableFuture<Controller> createController() {
173         if (this.controller != null) {
174             return CompletableFuture.completedFuture(this.controller);
175         }
176 
177         Controllerimitives/Controller.html#Controller">Controller controller = new Controller(this.namespaceEndpointUri, this, this.clientSettings);
178         return controller.initializeAsync().thenApply(v -> {
179             this.controller = controller;
180             return controller;
181         });
182     }
183 
184     @Override
185     public String getHostName() {
186         return this.hostName;
187     }
188 
189     private Reactor getReactor() {
190         synchronized (this.reactorLock) {
191             return this.reactor;
192         }
193     }
194 
195     private ReactorDispatcher getReactorScheduler() {
196         synchronized (this.reactorLock) {
197             return this.reactorScheduler;
198         }
199     }
200 
201     private void startReactor(ReactorHandler reactorHandler) throws IOException {
202         TRACE_LOGGER.info("Creating and starting reactor");
203         Reactor newReactor = ProtonUtil.reactor(reactorHandler, this.connectionHandler.getMaxFrameSize());
204         synchronized (this.reactorLock) {
205             this.reactor = newReactor;
206             this.reactorScheduler = new ReactorDispatcher(newReactor);
207         }
208 
209         String reactorThreadName = REACTOR_THREAD_NAME_PREFIX + UUID.randomUUID().toString();
210         Thread reactorThread = new Thread(new RunReactor(), reactorThreadName);
211         reactorThread.start();
212         TRACE_LOGGER.info("Started reactor");
213     }
214 
215     Connection getConnection() {
216         if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
217             TRACE_LOGGER.info("Creating connection to host '{}:{}'", this.connectionHandler.getOutboundSocketHostName(), this.connectionHandler.getOutboundSocketPort());
218             this.connection = this.getReactor().connectionToHost(
219                     this.connectionHandler.getOutboundSocketHostName(),
220                     this.connectionHandler.getOutboundSocketPort(),
221                     this.connectionHandler);
222         }
223 
224         return this.connection;
225     }
226 
227     /**
228      * Gets the operation timeout from the connections string.
229      * @return operation timeout specified in the connection string
230      */
231     public Duration getOperationTimeout() {
232         return this.clientSettings.getOperationTimeout();
233     }
234 
235     /**
236      * Gets the retry policy from the connection string.
237      * @return retry policy specified in the connection string
238      */
239     public RetryPolicy getRetryPolicy() {
240         return this.clientSettings.getRetryPolicy();
241     }
242 
243     public ClientSettings getClientSettings() {
244         return this.clientSettings;
245     }
246 
247     public static CompletableFuture<MessagingFactory> createFromNamespaceNameAsyc(String sbNamespaceName, ClientSettings clientSettings) {
248         return createFromNamespaceEndpointURIAsyc(Util.convertNamespaceToEndPointURI(sbNamespaceName), clientSettings);
249     }
250 
251     public static CompletableFuture<MessagingFactory> createFromNamespaceEndpointURIAsyc(URI namespaceEndpointURI, ClientSettings clientSettings) {
252         if (TRACE_LOGGER.isInfoEnabled()) {
253             TRACE_LOGGER.info("Creating messaging factory from namespace endpoint uri '{}'", namespaceEndpointURI.toString());
254         }
255         
256         MessagingFactorysagingFactory.html#MessagingFactory">MessagingFactory messagingFactory = new MessagingFactory(namespaceEndpointURI, clientSettings);
257         try {
258             messagingFactory.startReactor(messagingFactory.reactorHandler);
259         } catch (IOException e) {
260             Marker fatalMarker = MarkerFactory.getMarker(ClientConstants.FATAL_MARKER);
261             TRACE_LOGGER.error(fatalMarker, "Starting reactor failed", e);
262             messagingFactory.factoryOpenFuture.completeExceptionally(e);
263         }
264         return messagingFactory.factoryOpenFuture;
265     }
266 
267     public static MessagingFactory createFromNamespaceName(String sbNamespaceName, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
268         return completeFuture(createFromNamespaceNameAsyc(sbNamespaceName, clientSettings));
269     }
270     
271     public static MessagingFactory createFromNamespaceEndpointURI(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
272         return completeFuture(createFromNamespaceEndpointURIAsyc(namespaceEndpointURI, clientSettings));
273     }
274 
275     /**
276      * Creates an instance of MessagingFactory from the given connection string builder. This is a non-blocking method.
277      * @param builder connection string builder to the  bus namespace or entity
278      * @return a <code>CompletableFuture</code> which completes when a connection is established to the namespace or when a connection couldn't be established.
279      * @see java.util.concurrent.CompletableFuture
280      */
281     public static CompletableFuture<MessagingFactory> createFromConnectionStringBuilderAsync(final ConnectionStringBuilder builder) {
282         if (TRACE_LOGGER.isInfoEnabled()) {
283             TRACE_LOGGER.info("Creating messaging factory from connection string '{}'", builder.toLoggableString());
284         }
285 
286         return createFromNamespaceEndpointURIAsyc(builder.getEndpoint(), Util.getClientSettingsFromConnectionStringBuilder(builder));
287     }
288 
289     /**
290      * Creates an instance of MessagingFactory from the given connection string. This is a non-blocking method.
291      * @param connectionString connection string to the  bus namespace or entity
292      * @return a <code>CompletableFuture</code> which completes when a connection is established to the namespace or when a connection couldn't be established.
293      * @see java.util.concurrent.CompletableFuture
294      */
295     public static CompletableFuture<MessagingFactory> createFromConnectionStringAsync(final String connectionString) {
296         ConnectionStringBuilderonnectionStringBuilder.html#ConnectionStringBuilder">ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString);
297         return createFromConnectionStringBuilderAsync(builder);
298     }
299 
300     /**
301      * Creates an instance of MessagingFactory from the given connection string builder. This method blocks for a connection to the namespace to be established.
302      * @param builder connection string builder to the  bus namespace or entity
303      * @return an instance of MessagingFactory
304      * @throws InterruptedException if blocking thread is interrupted
305      * @throws ExecutionException if a connection couldn't be established to the namespace. Cause of the failure can be found by calling {@link Exception#getCause()}
306      */
307     public static MessagingFactory createFromConnectionStringBuilder(final ConnectionStringBuilder builder) throws InterruptedException, ExecutionException {
308         return createFromConnectionStringBuilderAsync(builder).get();
309     }
310 
311     /**
312      * Creates an instance of MessagingFactory from the given connection string. This method blocks for a connection to the namespace to be established.
313      * @param connectionString connection string to the  bus namespace or entity
314      * @return an instance of MessagingFactory
315      * @throws InterruptedException if blocking thread is interrupted
316      * @throws ExecutionException if a connection couldn't be established to the namespace. Cause of the failure can be found by calling {@link Exception#getCause()}
317      */
318     public static MessagingFactory createFromConnectionString(final String connectionString) throws InterruptedException, ExecutionException {
319         return createFromConnectionStringAsync(connectionString).get();
320     }
321 
322     /**
323      * Internal method.&nbsp;Clients should not use this method.
324      */
325     @Override
326     public void onConnectionOpen() {
327         if (!factoryOpenFuture.isDone()) {
328             TRACE_LOGGER.info("MessagingFactory opened.");
329             AsyncUtil.completeFuture(this.factoryOpenFuture, this);
330         }
331 
332         // Connection opened. Initiate new cbs link creation
333         TRACE_LOGGER.info("Connection opened to host.");
334         if (this.cbsLink == null) {
335             this.createCBSLinkAsync();
336         }
337     }
338 
339     /**
340      * Internal method.&nbsp;Clients should not use this method.
341      */
342     @Override
343     public void onConnectionError(ErrorCondition error) {
344         if (error != null && error.getCondition() != null) {
345             TRACE_LOGGER.error("Connection error. '{}'", error);
346         }
347 
348         if (!this.factoryOpenFuture.isDone()) {
349             AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, ExceptionUtil.toException(error));
350             this.setClosed();
351         } else {
352             this.closeConnection(error, null);
353         }
354 
355         if (this.getIsClosingOrClosed() && !this.connetionCloseFuture.isDone()) {
356             TRACE_LOGGER.info("Connection to host closed.");
357             AsyncUtil.completeFuture(this.connetionCloseFuture, null);
358             Timer.unregister(this.getClientId());
359         }
360     }
361 
362     private void onReactorError(Exception cause) {
363         if (!this.factoryOpenFuture.isDone()) {
364             TRACE_LOGGER.error("Reactor error occured", cause);
365             AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, cause);
366             this.setClosed();
367         } else {
368             if (this.getIsClosingOrClosed()) {
369                 return;
370             }
371 
372             TRACE_LOGGER.warn("Reactor error occured", cause);
373 
374             try {
375                 this.startReactor(this.reactorHandler);
376             } catch (IOException e) {
377                 Marker fatalMarker = MarkerFactory.getMarker(ClientConstants.FATAL_MARKER);
378                 TRACE_LOGGER.error(fatalMarker, "Re-starting reactor failed with exception.", e);
379                 this.onReactorError(cause);
380             }
381 
382             this.closeConnection(null, cause);
383         }
384     }
385 
386     // One of the parameters must be null
387     private void closeConnection(ErrorCondition error, Exception cause) {
388         // Important to copy the reference of the connection as a call to getConnection might create a new connection while we are still in this method
389         Connection currentConnection = this.connection;
390         if (currentConnection != null) {
391             Link[] links = this.registeredLinks.toArray(new Link[0]);
392             this.registeredLinks.clear();
393 
394             TRACE_LOGGER.debug("Closing all links on the connection. Number of links '{}'", links.length);
395             for (Link link : links) {
396                 link.close();
397             }
398 
399             TRACE_LOGGER.debug("Closed all links on the connection. Number of links '{}'", links.length);
400 
401             if (currentConnection.getLocalState() != EndpointState.CLOSED) {
402                 TRACE_LOGGER.info("Closing connection to host");
403                 currentConnection.close();
404             }
405 
406             for (Link link : links) {
407                 Handler handler = BaseHandler.getHandler(link);
408                 if (handler != null && handler instanceof BaseLinkHandler) {
409                     BaseLinkHandler/com/microsoft/azure/servicebus/amqp/BaseLinkHandler.html#BaseLinkHandler">BaseLinkHandler linkHandler = (BaseLinkHandler) handler;
410                     if (error != null) {
411                         linkHandler.processOnClose(link, error);
412                     } else {
413                         linkHandler.processOnClose(link, cause);
414                     }
415                 }
416             }
417         }
418     }
419 
420     @Override
421     protected CompletableFuture<Void> onClose() {
422         if (!this.getIsClosed()) {
423             TRACE_LOGGER.info("Closing messaging factory");
424             CompletableFuture<Void> cbsLinkCloseFuture;
425             if (this.cbsLink == null) {
426                 cbsLinkCloseFuture = CompletableFuture.completedFuture(null);
427             } else {
428                 TRACE_LOGGER.info("Closing CBS link");
429                 cbsLinkCloseFuture = this.cbsLink.closeAsync();
430             }
431 
432             cbsLinkCloseFuture.thenRun(() -> this.managementLinksCache.freeAsync()).thenRun(() -> {
433                 if (this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone()) {
434                     this.cbsLinkCreationFuture.completeExceptionally(new Exception("Connection closed."));
435                 }
436 
437                 if (this.connection != null && this.connection.getRemoteState() != EndpointState.CLOSED) {
438                     try {
439                         this.scheduleOnReactorThread(new DispatchHandler() {
440                             @Override
441                             public void onEvent() {
442                                 if (MessagingFactory.this.connection != null && MessagingFactory.this.connection.getLocalState() != EndpointState.CLOSED) {
443                                     TRACE_LOGGER.info("Closing connection to host");
444                                     MessagingFactory.this.connection.close();
445                                 }
446                             }
447                         });
448                     } catch (IOException e) {
449                         this.connetionCloseFuture.completeExceptionally(e);
450                     }
451 
452                     Timer.schedule(() -> {
453                         if (!MessagingFactory.this.connetionCloseFuture.isDone()) {
454                             String errorMessage = "Closing MessagingFactory timed out.";
455                             TRACE_LOGGER.warn(errorMessage);
456                             AsyncUtil.completeFutureExceptionally(MessagingFactory.this.connetionCloseFuture, new TimeoutException(errorMessage));
457                         }
458                     },
459                         this.clientSettings.getOperationTimeout(), TimerType.OneTimeRun);
460                 } else {
461                     this.connetionCloseFuture.complete(null);
462                     Timer.unregister(this.getClientId());
463                 }
464             });
465 
466             return this.connetionCloseFuture;
467         } else {
468             return CompletableFuture.completedFuture(null);
469         }
470     }
471 
472     private class RunReactor implements Runnable {
473         private final Reactor rctr;
474 
475         RunReactor() {
476             this.rctr = MessagingFactory.this.getReactor();
477         }
478 
479         public void run() {
480             TRACE_LOGGER.info("starting reactor instance.");
481             try {
482                 this.rctr.setTimeout(3141);
483                 this.rctr.start();
484                 boolean continueProcessing = true;
485                 while (!Thread.interrupted() && continueProcessing) {
486                     // If factory is closed, stop reactor too
487                     if (MessagingFactory.this.getIsClosed()) {
488                         TRACE_LOGGER.info("Gracefully releasing reactor thread as messaging factory is closed");
489                         break;
490                     }
491                     continueProcessing = this.rctr.process();
492                 }
493                 TRACE_LOGGER.info("Stopping reactor");
494                 this.rctr.stop();
495             } catch (HandlerException handlerException) {
496                 Throwable cause = handlerException.getCause();
497                 if (cause == null) {
498                     cause = handlerException;
499                 }
500 
501                 TRACE_LOGGER.warn("UnHandled exception while processing events in reactor:", handlerException);
502 
503                 String message = !StringUtil.isNullOrEmpty(cause.getMessage())
504                         ? cause.getMessage()
505                         : !StringUtil.isNullOrEmpty(handlerException.getMessage())
506                             ? handlerException.getMessage()
507                             : "Reactor encountered unrecoverable error";
508                 ServiceBusExceptionerviceBusException.html#ServiceBusException">ServiceBusException sbException = new ServiceBusException(
509                         true,
510                         String.format(Locale.US, "%s, %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()),
511                         cause);
512 
513                 if (cause instanceof UnresolvedAddressException) {
514                     sbException = new CommunicationException(
515                             String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network configuration. Please check to see if namespace information is correct. %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()),
516                             cause);
517                 }
518 
519                 MessagingFactory.this.onReactorError(sbException);
520             } finally {
521                 this.rctr.free();
522             }
523         }
524     }
525 
526     /**
527      * Internal method.&nbsp;Clients should not use this method.
528      */
529     @Override
530     public void registerForConnectionError(Link link) {
531         if (link != null) {
532             this.registeredLinks.add(link);
533         }
534     }
535 
536     /**
537      * Internal method.&nbsp;Clients should not use this method.
538      */
539     @Override
540     public void deregisterForConnectionError(Link link) {
541         if (link != null) {
542             this.registeredLinks.remove(link);
543         }
544     }
545 
546     void scheduleOnReactorThread(final DispatchHandler handler) throws IOException {
547         this.getReactorScheduler().invoke(handler);
548     }
549 
550     void scheduleOnReactorThread(final int delay, final DispatchHandler handler) throws IOException {
551         this.getReactorScheduler().invoke(delay, handler);
552     }
553 
554     CompletableFuture<Void> sendSecurityToken(String sasTokenAudienceUri) {
555         TRACE_LOGGER.debug("Sending token for {}", sasTokenAudienceUri);
556         CompletableFuture<SecurityToken> tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceUri);
557         return tokenFuture.thenComposeAsync((t) -> {
558             SecurityToken generatedSecurityToken = t;
559             CompletableFuture<Void> sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync((v) -> CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.clientSettings.getOperationTimeout()), generatedSecurityToken), MessagingFactory.INTERNAL_THREAD_POOL);
560 
561             return sendTokenFuture.thenAccept((v) -> TRACE_LOGGER.debug("Sent token for {}", sasTokenAudienceUri));
562 
563         }, MessagingFactory.INTERNAL_THREAD_POOL);
564     }
565 
566     CompletableFuture<ScheduledFuture<?>> sendSecurityTokenAndSetRenewTimer(String sasTokenAudienceURI, boolean retryOnFailure, Runnable validityRenewer) {
567         CompletableFuture<ScheduledFuture<?>> result = new CompletableFuture<ScheduledFuture<?>>();
568         TRACE_LOGGER.debug("Sending token for {}", sasTokenAudienceURI);
569         CompletableFuture<Instant> sendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI);
570         sendTokenFuture.handleAsync((validUntil, sendTokenEx) -> {
571             if (sendTokenEx == null) {
572                 TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI);
573                 ScheduledFuture<?> renewalFuture = MessagingFactory.scheduleRenewTimer(validUntil, validityRenewer);
574                 result.complete(renewalFuture);
575             } else {
576                 Throwable sendFailureCause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
577                 TRACE_LOGGER.warn("Sending CBS Token for {} failed.", sasTokenAudienceURI, sendFailureCause);
578                 if (retryOnFailure) {
579                     // Just schedule another attempt
580                     TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", sasTokenAudienceURI, ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS);
581                     ScheduledFuture<?> renewalFuture = Timer.schedule(validityRenewer, Duration.ofSeconds(ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS), TimerType.OneTimeRun);
582                     result.complete(renewalFuture);
583                 } else {
584                     if (sendFailureCause instanceof TimeoutException) {
585                         // Retry immediately on timeout. This is a special case as CBSLink may be disconnected right after the token is sent, but before it reaches the service
586                         TRACE_LOGGER.debug("Resending token for {}", sasTokenAudienceURI);
587                         CompletableFuture<Instant> resendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI);
588                         resendTokenFuture.handleAsync((resendValidUntil, resendTokenEx) -> {
589                             if (resendTokenEx == null) {
590                                 TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI);
591                                 ScheduledFuture<?> renewalFuture = MessagingFactory.scheduleRenewTimer(resendValidUntil, validityRenewer);
592                                 result.complete(renewalFuture);
593                             } else {
594                                 Throwable resendFailureCause = ExceptionUtil.extractAsyncCompletionCause(resendTokenEx);
595                                 TRACE_LOGGER.warn("Resending CBS Token for {} failed.", sasTokenAudienceURI, resendFailureCause);
596                                 result.completeExceptionally(resendFailureCause);
597                             }
598                             return null;
599                         }, MessagingFactory.INTERNAL_THREAD_POOL);
600                     } else {
601                         result.completeExceptionally(sendFailureCause);
602                     }
603                 }
604             }
605             return null;
606         }, MessagingFactory.INTERNAL_THREAD_POOL);
607 
608         return result;
609     }
610 
611     private CompletableFuture<Instant> generateAndSendSecurityToken(String sasTokenAudienceURI) {
612         CompletableFuture<SecurityToken> tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceURI);
613         return tokenFuture.thenComposeAsync((t) -> {
614             SecurityToken generatedSecurityToken = t;
615             return this.cbsLinkCreationFuture.thenComposeAsync((v) -> CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, ClientConstants.SAS_TOKEN_SEND_TIMEOUT, generatedSecurityToken).thenApply((u) -> generatedSecurityToken.getValidUntil()), MessagingFactory.INTERNAL_THREAD_POOL);
616         }, MessagingFactory.INTERNAL_THREAD_POOL);
617     }
618 
619     private static ScheduledFuture<?> scheduleRenewTimer(Instant currentTokenValidUntil, Runnable validityRenewer) {
620         if (currentTokenValidUntil == Instant.MAX) {
621             // User provided token or will never expire
622             return null;
623         } else {
624             // It will eventually expire. Renew it
625             int renewInterval = Util.getTokenRenewIntervalInSeconds((int) Duration.between(Instant.now(), currentTokenValidUntil).getSeconds());
626             return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun);
627         }
628     }
629 
630     CompletableFuture<RequestResponseLink> obtainRequestResponseLinkAsync(String entityPath, MessagingEntityType entityType) {
631         this.throwIfClosed(null);
632         return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, null, entityType);
633     }
634 
635     CompletableFuture<RequestResponseLink> obtainRequestResponseLinkAsync(String entityPath, String transferDestinationPath, MessagingEntityType entityType) {
636         this.throwIfClosed(null);
637         return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, transferDestinationPath, entityType);
638     }
639 
640     void releaseRequestResponseLink(String entityPath) {
641         if (!this.getIsClosed()) {
642             this.managementLinksCache.releaseRequestResponseLink(entityPath, null);
643         }
644     }
645 
646     void releaseRequestResponseLink(String entityPath, String transferDestinationPath) {
647         if (!this.getIsClosed()) {
648             this.managementLinksCache.releaseRequestResponseLink(entityPath, transferDestinationPath);
649         }
650     }
651 
652     private void createCBSLinkAsync() {
653         if (this.getIsClosingOrClosed()) {
654             return;
655         }
656 
657         if (++this.cbsLinkCreationAttempts > MAX_CBS_LINK_CREATION_ATTEMPTS) {
658             Throwable completionEx = this.lastCBSLinkCreationException == null ? new Exception("CBS link creation failed multiple times.") : this.lastCBSLinkCreationException;
659             this.cbsLinkCreationFuture.completeExceptionally(completionEx);
660         } else {
661             String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath();
662             TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath);
663             RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null, null, null).handleAsync((cbsLink, ex) -> {
664                 if (ex == null) {
665                     TRACE_LOGGER.info("Created CBS link to {}", requestResponseLinkPath);
666                     if (this.getIsClosingOrClosed()) {
667                         // Factory is closed before CBSLink could be created. Close the created CBS link too
668                         cbsLink.closeAsync();
669                     } else {
670                         this.cbsLink = cbsLink;
671                         this.cbsLinkCreationFuture.complete(null);
672                     }                    
673                 } else {
674                     this.lastCBSLinkCreationException = ExceptionUtil.extractAsyncCompletionCause(ex);
675                     TRACE_LOGGER.warn("Creating CBS link to {} failed. Attempts '{}'", requestResponseLinkPath, this.cbsLinkCreationAttempts);
676                     this.createCBSLinkAsync();
677                 }
678                 return null;
679             }, MessagingFactory.INTERNAL_THREAD_POOL);
680 
681         }
682     }
683 
684     private static <T> T completeFuture(CompletableFuture<T> future) throws InterruptedException, ServiceBusException {
685         try {
686             return future.get();
687         } catch (InterruptedException ie) {
688             // Rare instance
689             throw ie;
690         } catch (ExecutionException ee) {
691             Throwable cause = ee.getCause();
692             if (cause instanceof ServiceBusException) {
693                 throw (ServiceBusException) cause;
694             } else {
695                 throw new ServiceBusException(true, cause);
696             }
697         }
698     }
699 }