1
2
3
4 package com.microsoft.azure.servicebus.primitives;
5
6 import java.io.IOException;
7 import java.net.URI;
8 import java.nio.channels.UnresolvedAddressException;
9 import java.time.Duration;
10 import java.time.Instant;
11 import java.util.LinkedList;
12 import java.util.Locale;
13 import java.util.UUID;
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledFuture;
19
20 import com.microsoft.azure.servicebus.TransactionContext;
21 import com.microsoft.azure.servicebus.Utils;
22 import com.microsoft.azure.servicebus.amqp.BaseLinkHandler;
23 import com.microsoft.azure.servicebus.amqp.ConnectionHandler;
24 import com.microsoft.azure.servicebus.amqp.DispatchHandler;
25 import com.microsoft.azure.servicebus.amqp.IAmqpConnection;
26 import com.microsoft.azure.servicebus.amqp.ProtonUtil;
27 import com.microsoft.azure.servicebus.amqp.ReactorDispatcher;
28 import com.microsoft.azure.servicebus.amqp.ReactorHandler;
29 import org.apache.qpid.proton.amqp.Binary;
30 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
31 import org.apache.qpid.proton.engine.BaseHandler;
32 import org.apache.qpid.proton.engine.Connection;
33 import org.apache.qpid.proton.engine.EndpointState;
34 import org.apache.qpid.proton.engine.Event;
35 import org.apache.qpid.proton.engine.Handler;
36 import org.apache.qpid.proton.engine.HandlerException;
37 import org.apache.qpid.proton.engine.Link;
38 import org.apache.qpid.proton.reactor.Reactor;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.slf4j.Marker;
42 import org.slf4j.MarkerFactory;
43
44 import com.microsoft.azure.servicebus.ClientSettings;
45 import com.microsoft.azure.servicebus.security.SecurityToken;
46
47
48
49
50
51
52
53 public class MessagingFactory extends ClientEntity implements IAmqpConnection {
54 private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessagingFactory.class);
55 public static final ExecutorService INTERNAL_THREAD_POOL = Executors.newCachedThreadPool();
56
57 private static final String REACTOR_THREAD_NAME_PREFIX = "ReactorThread";
58 private static final int MAX_CBS_LINK_CREATION_ATTEMPTS = 3;
59 private final String hostName;
60 private final CompletableFuture<Void> connetionCloseFuture;
61 private final ConnectionHandler connectionHandler;
62 private final ReactorHandler reactorHandler;
63 private final LinkedList<Link> registeredLinks;
64 private final Object reactorLock;
65 private final RequestResponseLinkCache managementLinksCache;
66
67 private Reactor reactor;
68 private ReactorDispatcher reactorScheduler;
69 private Connection connection;
70 private Controller controller;
71
72 private CompletableFuture<MessagingFactory> factoryOpenFuture;
73 private CompletableFuture<Void> cbsLinkCreationFuture;
74 private RequestResponseLink cbsLink;
75 private int cbsLinkCreationAttempts = 0;
76 private Throwable lastCBSLinkCreationException = null;
77 private URI namespaceEndpointUri;
78
79 private final ClientSettings clientSettings;
80
81 private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings) {
82 super("MessagingFactory".concat(StringUtil.getShortRandomString()));
83 this.clientSettings = clientSettings;
84 this.namespaceEndpointUri = namespaceEndpointUri;
85 this.hostName = namespaceEndpointUri.getHost();
86 this.registeredLinks = new LinkedList<Link>();
87 this.connetionCloseFuture = new CompletableFuture<Void>();
88 this.reactorLock = new Object();
89 this.connectionHandler = ConnectionHandler.create(clientSettings.getTransportType(), this);
90 this.factoryOpenFuture = new CompletableFuture<MessagingFactory>();
91 this.cbsLinkCreationFuture = new CompletableFuture<Void>();
92 this.managementLinksCache = new RequestResponseLinkCache(this);
93 this.reactorHandler = new ReactorHandler() {
94 @Override
95 public void onReactorInit(Event e) {
96 super.onReactorInit(e);
97
98 final Reactor r = e.getReactor();
99 TRACE_LOGGER.info("Creating connection to host '{}:{}'",
100 connectionHandler.getOutboundSocketHostName(),
101 connectionHandler.getOutboundSocketPort());
102 connection = r.connectionToHost(
103 connectionHandler.getOutboundSocketHostName(),
104 connectionHandler.getOutboundSocketPort(),
105 connectionHandler);
106 }
107 };
108 Timer.register(this.getClientId());
109 }
110
111
112
113
114
115
116
117
118 public TransactionContext startTransaction() throws ServiceBusException, InterruptedException {
119 return Utils.completeFuture(this.startTransactionAsync());
120 }
121
122
123
124
125
126
127 public CompletableFuture<TransactionContext> startTransactionAsync() {
128 return this.getController()
129 .thenCompose(controller -> controller.declareAsync()
130 .thenApply(binary -> new TransactionContext(binary.asByteBuffer(), this)));
131 }
132
133
134
135
136
137
138
139
140
141 public void endTransaction(TransactionContext transaction, boolean commit) throws ServiceBusException, InterruptedException {
142 Utils.completeFuture(this.endTransactionAsync(transaction, commit));
143 }
144
145
146
147
148
149
150
151
152 public CompletableFuture<Void> endTransactionAsync(TransactionContext transaction, boolean commit) {
153 if (transaction == null) {
154 CompletableFuture<Void> exceptionCompletion = new CompletableFuture<>();
155 exceptionCompletion.completeExceptionally(new ServiceBusException(false, "Transaction cannot not be null"));
156 return exceptionCompletion;
157 }
158
159 return this.getController()
160 .thenCompose(controller -> controller.dischargeAsync(new Binary(transaction.getTransactionId().array()), commit)
161 .thenRun(() -> transaction.notifyTransactionCompletion(commit)));
162 }
163
164 private CompletableFuture<Controller> getController() {
165 if (this.controller != null) {
166 return CompletableFuture.completedFuture(this.controller);
167 }
168
169 return createController();
170 }
171
172 private synchronized CompletableFuture<Controller> createController() {
173 if (this.controller != null) {
174 return CompletableFuture.completedFuture(this.controller);
175 }
176
177 Controllerimitives/Controller.html#Controller">Controller controller = new Controller(this.namespaceEndpointUri, this, this.clientSettings);
178 return controller.initializeAsync().thenApply(v -> {
179 this.controller = controller;
180 return controller;
181 });
182 }
183
184 @Override
185 public String getHostName() {
186 return this.hostName;
187 }
188
189 private Reactor getReactor() {
190 synchronized (this.reactorLock) {
191 return this.reactor;
192 }
193 }
194
195 private ReactorDispatcher getReactorScheduler() {
196 synchronized (this.reactorLock) {
197 return this.reactorScheduler;
198 }
199 }
200
201 private void startReactor(ReactorHandler reactorHandler) throws IOException {
202 TRACE_LOGGER.info("Creating and starting reactor");
203 Reactor newReactor = ProtonUtil.reactor(reactorHandler, this.connectionHandler.getMaxFrameSize());
204 synchronized (this.reactorLock) {
205 this.reactor = newReactor;
206 this.reactorScheduler = new ReactorDispatcher(newReactor);
207 }
208
209 String reactorThreadName = REACTOR_THREAD_NAME_PREFIX + UUID.randomUUID().toString();
210 Thread reactorThread = new Thread(new RunReactor(), reactorThreadName);
211 reactorThread.start();
212 TRACE_LOGGER.info("Started reactor");
213 }
214
215 Connection getConnection() {
216 if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
217 TRACE_LOGGER.info("Creating connection to host '{}:{}'", this.connectionHandler.getOutboundSocketHostName(), this.connectionHandler.getOutboundSocketPort());
218 this.connection = this.getReactor().connectionToHost(
219 this.connectionHandler.getOutboundSocketHostName(),
220 this.connectionHandler.getOutboundSocketPort(),
221 this.connectionHandler);
222 }
223
224 return this.connection;
225 }
226
227
228
229
230
231 public Duration getOperationTimeout() {
232 return this.clientSettings.getOperationTimeout();
233 }
234
235
236
237
238
239 public RetryPolicy getRetryPolicy() {
240 return this.clientSettings.getRetryPolicy();
241 }
242
243 public ClientSettings getClientSettings() {
244 return this.clientSettings;
245 }
246
247 public static CompletableFuture<MessagingFactory> createFromNamespaceNameAsyc(String sbNamespaceName, ClientSettings clientSettings) {
248 return createFromNamespaceEndpointURIAsyc(Util.convertNamespaceToEndPointURI(sbNamespaceName), clientSettings);
249 }
250
251 public static CompletableFuture<MessagingFactory> createFromNamespaceEndpointURIAsyc(URI namespaceEndpointURI, ClientSettings clientSettings) {
252 if (TRACE_LOGGER.isInfoEnabled()) {
253 TRACE_LOGGER.info("Creating messaging factory from namespace endpoint uri '{}'", namespaceEndpointURI.toString());
254 }
255
256 MessagingFactorysagingFactory.html#MessagingFactory">MessagingFactory messagingFactory = new MessagingFactory(namespaceEndpointURI, clientSettings);
257 try {
258 messagingFactory.startReactor(messagingFactory.reactorHandler);
259 } catch (IOException e) {
260 Marker fatalMarker = MarkerFactory.getMarker(ClientConstants.FATAL_MARKER);
261 TRACE_LOGGER.error(fatalMarker, "Starting reactor failed", e);
262 messagingFactory.factoryOpenFuture.completeExceptionally(e);
263 }
264 return messagingFactory.factoryOpenFuture;
265 }
266
267 public static MessagingFactory createFromNamespaceName(String sbNamespaceName, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
268 return completeFuture(createFromNamespaceNameAsyc(sbNamespaceName, clientSettings));
269 }
270
271 public static MessagingFactory createFromNamespaceEndpointURI(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
272 return completeFuture(createFromNamespaceEndpointURIAsyc(namespaceEndpointURI, clientSettings));
273 }
274
275
276
277
278
279
280
281 public static CompletableFuture<MessagingFactory> createFromConnectionStringBuilderAsync(final ConnectionStringBuilder builder) {
282 if (TRACE_LOGGER.isInfoEnabled()) {
283 TRACE_LOGGER.info("Creating messaging factory from connection string '{}'", builder.toLoggableString());
284 }
285
286 return createFromNamespaceEndpointURIAsyc(builder.getEndpoint(), Util.getClientSettingsFromConnectionStringBuilder(builder));
287 }
288
289
290
291
292
293
294
295 public static CompletableFuture<MessagingFactory> createFromConnectionStringAsync(final String connectionString) {
296 ConnectionStringBuilderonnectionStringBuilder.html#ConnectionStringBuilder">ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString);
297 return createFromConnectionStringBuilderAsync(builder);
298 }
299
300
301
302
303
304
305
306
307 public static MessagingFactory createFromConnectionStringBuilder(final ConnectionStringBuilder builder) throws InterruptedException, ExecutionException {
308 return createFromConnectionStringBuilderAsync(builder).get();
309 }
310
311
312
313
314
315
316
317
318 public static MessagingFactory createFromConnectionString(final String connectionString) throws InterruptedException, ExecutionException {
319 return createFromConnectionStringAsync(connectionString).get();
320 }
321
322
323
324
325 @Override
326 public void onConnectionOpen() {
327 if (!factoryOpenFuture.isDone()) {
328 TRACE_LOGGER.info("MessagingFactory opened.");
329 AsyncUtil.completeFuture(this.factoryOpenFuture, this);
330 }
331
332
333 TRACE_LOGGER.info("Connection opened to host.");
334 if (this.cbsLink == null) {
335 this.createCBSLinkAsync();
336 }
337 }
338
339
340
341
342 @Override
343 public void onConnectionError(ErrorCondition error) {
344 if (error != null && error.getCondition() != null) {
345 TRACE_LOGGER.error("Connection error. '{}'", error);
346 }
347
348 if (!this.factoryOpenFuture.isDone()) {
349 AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, ExceptionUtil.toException(error));
350 this.setClosed();
351 } else {
352 this.closeConnection(error, null);
353 }
354
355 if (this.getIsClosingOrClosed() && !this.connetionCloseFuture.isDone()) {
356 TRACE_LOGGER.info("Connection to host closed.");
357 AsyncUtil.completeFuture(this.connetionCloseFuture, null);
358 Timer.unregister(this.getClientId());
359 }
360 }
361
362 private void onReactorError(Exception cause) {
363 if (!this.factoryOpenFuture.isDone()) {
364 TRACE_LOGGER.error("Reactor error occured", cause);
365 AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, cause);
366 this.setClosed();
367 } else {
368 if (this.getIsClosingOrClosed()) {
369 return;
370 }
371
372 TRACE_LOGGER.warn("Reactor error occured", cause);
373
374 try {
375 this.startReactor(this.reactorHandler);
376 } catch (IOException e) {
377 Marker fatalMarker = MarkerFactory.getMarker(ClientConstants.FATAL_MARKER);
378 TRACE_LOGGER.error(fatalMarker, "Re-starting reactor failed with exception.", e);
379 this.onReactorError(cause);
380 }
381
382 this.closeConnection(null, cause);
383 }
384 }
385
386
387 private void closeConnection(ErrorCondition error, Exception cause) {
388
389 Connection currentConnection = this.connection;
390 if (currentConnection != null) {
391 Link[] links = this.registeredLinks.toArray(new Link[0]);
392 this.registeredLinks.clear();
393
394 TRACE_LOGGER.debug("Closing all links on the connection. Number of links '{}'", links.length);
395 for (Link link : links) {
396 link.close();
397 }
398
399 TRACE_LOGGER.debug("Closed all links on the connection. Number of links '{}'", links.length);
400
401 if (currentConnection.getLocalState() != EndpointState.CLOSED) {
402 TRACE_LOGGER.info("Closing connection to host");
403 currentConnection.close();
404 }
405
406 for (Link link : links) {
407 Handler handler = BaseHandler.getHandler(link);
408 if (handler != null && handler instanceof BaseLinkHandler) {
409 BaseLinkHandler/com/microsoft/azure/servicebus/amqp/BaseLinkHandler.html#BaseLinkHandler">BaseLinkHandler linkHandler = (BaseLinkHandler) handler;
410 if (error != null) {
411 linkHandler.processOnClose(link, error);
412 } else {
413 linkHandler.processOnClose(link, cause);
414 }
415 }
416 }
417 }
418 }
419
420 @Override
421 protected CompletableFuture<Void> onClose() {
422 if (!this.getIsClosed()) {
423 TRACE_LOGGER.info("Closing messaging factory");
424 CompletableFuture<Void> cbsLinkCloseFuture;
425 if (this.cbsLink == null) {
426 cbsLinkCloseFuture = CompletableFuture.completedFuture(null);
427 } else {
428 TRACE_LOGGER.info("Closing CBS link");
429 cbsLinkCloseFuture = this.cbsLink.closeAsync();
430 }
431
432 cbsLinkCloseFuture.thenRun(() -> this.managementLinksCache.freeAsync()).thenRun(() -> {
433 if (this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone()) {
434 this.cbsLinkCreationFuture.completeExceptionally(new Exception("Connection closed."));
435 }
436
437 if (this.connection != null && this.connection.getRemoteState() != EndpointState.CLOSED) {
438 try {
439 this.scheduleOnReactorThread(new DispatchHandler() {
440 @Override
441 public void onEvent() {
442 if (MessagingFactory.this.connection != null && MessagingFactory.this.connection.getLocalState() != EndpointState.CLOSED) {
443 TRACE_LOGGER.info("Closing connection to host");
444 MessagingFactory.this.connection.close();
445 }
446 }
447 });
448 } catch (IOException e) {
449 this.connetionCloseFuture.completeExceptionally(e);
450 }
451
452 Timer.schedule(() -> {
453 if (!MessagingFactory.this.connetionCloseFuture.isDone()) {
454 String errorMessage = "Closing MessagingFactory timed out.";
455 TRACE_LOGGER.warn(errorMessage);
456 AsyncUtil.completeFutureExceptionally(MessagingFactory.this.connetionCloseFuture, new TimeoutException(errorMessage));
457 }
458 },
459 this.clientSettings.getOperationTimeout(), TimerType.OneTimeRun);
460 } else {
461 this.connetionCloseFuture.complete(null);
462 Timer.unregister(this.getClientId());
463 }
464 });
465
466 return this.connetionCloseFuture;
467 } else {
468 return CompletableFuture.completedFuture(null);
469 }
470 }
471
472 private class RunReactor implements Runnable {
473 private final Reactor rctr;
474
475 RunReactor() {
476 this.rctr = MessagingFactory.this.getReactor();
477 }
478
479 public void run() {
480 TRACE_LOGGER.info("starting reactor instance.");
481 try {
482 this.rctr.setTimeout(3141);
483 this.rctr.start();
484 boolean continueProcessing = true;
485 while (!Thread.interrupted() && continueProcessing) {
486
487 if (MessagingFactory.this.getIsClosed()) {
488 TRACE_LOGGER.info("Gracefully releasing reactor thread as messaging factory is closed");
489 break;
490 }
491 continueProcessing = this.rctr.process();
492 }
493 TRACE_LOGGER.info("Stopping reactor");
494 this.rctr.stop();
495 } catch (HandlerException handlerException) {
496 Throwable cause = handlerException.getCause();
497 if (cause == null) {
498 cause = handlerException;
499 }
500
501 TRACE_LOGGER.warn("UnHandled exception while processing events in reactor:", handlerException);
502
503 String message = !StringUtil.isNullOrEmpty(cause.getMessage())
504 ? cause.getMessage()
505 : !StringUtil.isNullOrEmpty(handlerException.getMessage())
506 ? handlerException.getMessage()
507 : "Reactor encountered unrecoverable error";
508 ServiceBusExceptionerviceBusException.html#ServiceBusException">ServiceBusException sbException = new ServiceBusException(
509 true,
510 String.format(Locale.US, "%s, %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()),
511 cause);
512
513 if (cause instanceof UnresolvedAddressException) {
514 sbException = new CommunicationException(
515 String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network configuration. Please check to see if namespace information is correct. %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()),
516 cause);
517 }
518
519 MessagingFactory.this.onReactorError(sbException);
520 } finally {
521 this.rctr.free();
522 }
523 }
524 }
525
526
527
528
529 @Override
530 public void registerForConnectionError(Link link) {
531 if (link != null) {
532 this.registeredLinks.add(link);
533 }
534 }
535
536
537
538
539 @Override
540 public void deregisterForConnectionError(Link link) {
541 if (link != null) {
542 this.registeredLinks.remove(link);
543 }
544 }
545
546 void scheduleOnReactorThread(final DispatchHandler handler) throws IOException {
547 this.getReactorScheduler().invoke(handler);
548 }
549
550 void scheduleOnReactorThread(final int delay, final DispatchHandler handler) throws IOException {
551 this.getReactorScheduler().invoke(delay, handler);
552 }
553
554 CompletableFuture<Void> sendSecurityToken(String sasTokenAudienceUri) {
555 TRACE_LOGGER.debug("Sending token for {}", sasTokenAudienceUri);
556 CompletableFuture<SecurityToken> tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceUri);
557 return tokenFuture.thenComposeAsync((t) -> {
558 SecurityToken generatedSecurityToken = t;
559 CompletableFuture<Void> sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync((v) -> CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.clientSettings.getOperationTimeout()), generatedSecurityToken), MessagingFactory.INTERNAL_THREAD_POOL);
560
561 return sendTokenFuture.thenAccept((v) -> TRACE_LOGGER.debug("Sent token for {}", sasTokenAudienceUri));
562
563 }, MessagingFactory.INTERNAL_THREAD_POOL);
564 }
565
566 CompletableFuture<ScheduledFuture<?>> sendSecurityTokenAndSetRenewTimer(String sasTokenAudienceURI, boolean retryOnFailure, Runnable validityRenewer) {
567 CompletableFuture<ScheduledFuture<?>> result = new CompletableFuture<ScheduledFuture<?>>();
568 TRACE_LOGGER.debug("Sending token for {}", sasTokenAudienceURI);
569 CompletableFuture<Instant> sendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI);
570 sendTokenFuture.handleAsync((validUntil, sendTokenEx) -> {
571 if (sendTokenEx == null) {
572 TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI);
573 ScheduledFuture<?> renewalFuture = MessagingFactory.scheduleRenewTimer(validUntil, validityRenewer);
574 result.complete(renewalFuture);
575 } else {
576 Throwable sendFailureCause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
577 TRACE_LOGGER.warn("Sending CBS Token for {} failed.", sasTokenAudienceURI, sendFailureCause);
578 if (retryOnFailure) {
579
580 TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", sasTokenAudienceURI, ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS);
581 ScheduledFuture<?> renewalFuture = Timer.schedule(validityRenewer, Duration.ofSeconds(ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS), TimerType.OneTimeRun);
582 result.complete(renewalFuture);
583 } else {
584 if (sendFailureCause instanceof TimeoutException) {
585
586 TRACE_LOGGER.debug("Resending token for {}", sasTokenAudienceURI);
587 CompletableFuture<Instant> resendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI);
588 resendTokenFuture.handleAsync((resendValidUntil, resendTokenEx) -> {
589 if (resendTokenEx == null) {
590 TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI);
591 ScheduledFuture<?> renewalFuture = MessagingFactory.scheduleRenewTimer(resendValidUntil, validityRenewer);
592 result.complete(renewalFuture);
593 } else {
594 Throwable resendFailureCause = ExceptionUtil.extractAsyncCompletionCause(resendTokenEx);
595 TRACE_LOGGER.warn("Resending CBS Token for {} failed.", sasTokenAudienceURI, resendFailureCause);
596 result.completeExceptionally(resendFailureCause);
597 }
598 return null;
599 }, MessagingFactory.INTERNAL_THREAD_POOL);
600 } else {
601 result.completeExceptionally(sendFailureCause);
602 }
603 }
604 }
605 return null;
606 }, MessagingFactory.INTERNAL_THREAD_POOL);
607
608 return result;
609 }
610
611 private CompletableFuture<Instant> generateAndSendSecurityToken(String sasTokenAudienceURI) {
612 CompletableFuture<SecurityToken> tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceURI);
613 return tokenFuture.thenComposeAsync((t) -> {
614 SecurityToken generatedSecurityToken = t;
615 return this.cbsLinkCreationFuture.thenComposeAsync((v) -> CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, ClientConstants.SAS_TOKEN_SEND_TIMEOUT, generatedSecurityToken).thenApply((u) -> generatedSecurityToken.getValidUntil()), MessagingFactory.INTERNAL_THREAD_POOL);
616 }, MessagingFactory.INTERNAL_THREAD_POOL);
617 }
618
619 private static ScheduledFuture<?> scheduleRenewTimer(Instant currentTokenValidUntil, Runnable validityRenewer) {
620 if (currentTokenValidUntil == Instant.MAX) {
621
622 return null;
623 } else {
624
625 int renewInterval = Util.getTokenRenewIntervalInSeconds((int) Duration.between(Instant.now(), currentTokenValidUntil).getSeconds());
626 return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun);
627 }
628 }
629
630 CompletableFuture<RequestResponseLink> obtainRequestResponseLinkAsync(String entityPath, MessagingEntityType entityType) {
631 this.throwIfClosed(null);
632 return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, null, entityType);
633 }
634
635 CompletableFuture<RequestResponseLink> obtainRequestResponseLinkAsync(String entityPath, String transferDestinationPath, MessagingEntityType entityType) {
636 this.throwIfClosed(null);
637 return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, transferDestinationPath, entityType);
638 }
639
640 void releaseRequestResponseLink(String entityPath) {
641 if (!this.getIsClosed()) {
642 this.managementLinksCache.releaseRequestResponseLink(entityPath, null);
643 }
644 }
645
646 void releaseRequestResponseLink(String entityPath, String transferDestinationPath) {
647 if (!this.getIsClosed()) {
648 this.managementLinksCache.releaseRequestResponseLink(entityPath, transferDestinationPath);
649 }
650 }
651
652 private void createCBSLinkAsync() {
653 if (this.getIsClosingOrClosed()) {
654 return;
655 }
656
657 if (++this.cbsLinkCreationAttempts > MAX_CBS_LINK_CREATION_ATTEMPTS) {
658 Throwable completionEx = this.lastCBSLinkCreationException == null ? new Exception("CBS link creation failed multiple times.") : this.lastCBSLinkCreationException;
659 this.cbsLinkCreationFuture.completeExceptionally(completionEx);
660 } else {
661 String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath();
662 TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath);
663 RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null, null, null).handleAsync((cbsLink, ex) -> {
664 if (ex == null) {
665 TRACE_LOGGER.info("Created CBS link to {}", requestResponseLinkPath);
666 if (this.getIsClosingOrClosed()) {
667
668 cbsLink.closeAsync();
669 } else {
670 this.cbsLink = cbsLink;
671 this.cbsLinkCreationFuture.complete(null);
672 }
673 } else {
674 this.lastCBSLinkCreationException = ExceptionUtil.extractAsyncCompletionCause(ex);
675 TRACE_LOGGER.warn("Creating CBS link to {} failed. Attempts '{}'", requestResponseLinkPath, this.cbsLinkCreationAttempts);
676 this.createCBSLinkAsync();
677 }
678 return null;
679 }, MessagingFactory.INTERNAL_THREAD_POOL);
680
681 }
682 }
683
684 private static <T> T completeFuture(CompletableFuture<T> future) throws InterruptedException, ServiceBusException {
685 try {
686 return future.get();
687 } catch (InterruptedException ie) {
688
689 throw ie;
690 } catch (ExecutionException ee) {
691 Throwable cause = ee.getCause();
692 if (cause instanceof ServiceBusException) {
693 throw (ServiceBusException) cause;
694 } else {
695 throw new ServiceBusException(true, cause);
696 }
697 }
698 }
699 }