View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.microsoft.azure.eventprocessorhost;
5   
6   import org.slf4j.Logger;
7   import org.slf4j.LoggerFactory;
8   
9   import java.util.concurrent.CompletableFuture;
10  import java.util.concurrent.ConcurrentHashMap;
11  import java.util.function.Consumer;
12  
13  
14  class PumpManager extends Closable implements Consumer<String> {
15      private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PumpManager.class);
16      protected final HostContext hostContext;
17      protected ConcurrentHashMap<String, PartitionPump> pumpStates; // protected for testability
18  
19      PumpManager(HostContext hostContext, Closable parent) {
20          super(parent);
21  
22          this.hostContext = hostContext;
23  
24          this.pumpStates = new ConcurrentHashMap<String, PartitionPump>();
25      }
26  
27      public void addPump(CompleteLease lease) {
28          if (getIsClosingOrClosed()) {
29              TRACE_LOGGER.info(this.hostContext.withHostAndPartition(lease, "Shutting down, not creating new pump"));
30              return;
31          }
32  
33          PartitionPump capturedPump = this.pumpStates.get(lease.getPartitionId()); // CONCURRENTHASHTABLE
34          if (capturedPump != null) {
35              // There already is a pump. This should never happen and it's not harmless if it does. If we get here,
36              // it implies that the existing pump is a zombie which is not renewing its lease.
37              TRACE_LOGGER.error(this.hostContext.withHostAndPartition(lease, "throwing away zombie pump"));
38              // Shutdown should remove the pump from the hashmap, but we don't know what state this pump is in so
39              // remove it manually. ConcurrentHashMap specifies that removing an item that doesn't exist is a safe no-op.
40              this.pumpStates.remove(lease.getPartitionId());
41              // Call shutdown to try to clean up, but do not wait.
42              capturedPump.shutdown(CloseReason.Shutdown);
43          }
44  
45          TRACE_LOGGER.info(this.hostContext.withHostAndPartition(lease, "creating new pump"));
46          PartitionPump newPartitionPump = createNewPump(lease);
47          this.pumpStates.put(lease.getPartitionId(), newPartitionPump);
48          newPartitionPump.startPump();
49      }
50  
51      // Callback used by pumps during pump shutdown. 
52      @Override
53      public void accept(String partitionId) {
54          // These are fast, non-blocking actions.
55          this.pumpStates.remove(partitionId);
56          removingPumpTestHook(partitionId);
57      }
58  
59      // Separated out so that tests can override and substitute their own pump class.
60      protected PartitionPump createNewPump(CompleteLease lease) {
61          return new PartitionPump(this.hostContext, lease, this, this);
62      }
63  
64      public CompletableFuture<Void> removePump(String partitionId, final CloseReason reason) {
65          CompletableFuture<Void> retval = CompletableFuture.completedFuture(null);
66          PartitionPump capturedPump = this.pumpStates.get(partitionId); // CONCURRENTHASHTABLE
67          if (capturedPump != null) {
68              TRACE_LOGGER.info(this.hostContext.withHostAndPartition(partitionId,
69                      "closing pump for reason " + reason.toString()));
70              retval = capturedPump.shutdown(reason);
71          } else {
72              // Shouldn't get here but not really harmful, so just trace.
73              TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId,
74                      "no pump found to remove for partition " + partitionId));
75          }
76          return retval;
77      }
78  
79      public CompletableFuture<Void> removeAllPumps(CloseReason reason) {
80          setClosing();
81  
82          CompletableFuture<?>[] futures = new CompletableFuture<?>[this.pumpStates.size()];
83          int i = 0;
84          for (String partitionId : this.pumpStates.keySet()) {
85              futures[i++] = removePump(partitionId, reason);
86          }
87  
88          return CompletableFuture.allOf(futures).whenCompleteAsync((empty, e) -> {
89              setClosed();
90          }, this.hostContext.getExecutor());
91      }
92  
93      protected void removingPumpTestHook(String partitionId) {
94          // For test use. MUST BE FAST, NON-BLOCKING.
95      }
96  }