1
2
3
4 package com.microsoft.azure.eventprocessorhost;
5
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 import java.util.concurrent.CompletableFuture;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.function.Consumer;
12
13
14 class PumpManager extends Closable implements Consumer<String> {
15 private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PumpManager.class);
16 protected final HostContext hostContext;
17 protected ConcurrentHashMap<String, PartitionPump> pumpStates;
18
19 PumpManager(HostContext hostContext, Closable parent) {
20 super(parent);
21
22 this.hostContext = hostContext;
23
24 this.pumpStates = new ConcurrentHashMap<String, PartitionPump>();
25 }
26
27 public void addPump(CompleteLease lease) {
28 if (getIsClosingOrClosed()) {
29 TRACE_LOGGER.info(this.hostContext.withHostAndPartition(lease, "Shutting down, not creating new pump"));
30 return;
31 }
32
33 PartitionPump capturedPump = this.pumpStates.get(lease.getPartitionId());
34 if (capturedPump != null) {
35
36
37 TRACE_LOGGER.error(this.hostContext.withHostAndPartition(lease, "throwing away zombie pump"));
38
39
40 this.pumpStates.remove(lease.getPartitionId());
41
42 capturedPump.shutdown(CloseReason.Shutdown);
43 }
44
45 TRACE_LOGGER.info(this.hostContext.withHostAndPartition(lease, "creating new pump"));
46 PartitionPump newPartitionPump = createNewPump(lease);
47 this.pumpStates.put(lease.getPartitionId(), newPartitionPump);
48 newPartitionPump.startPump();
49 }
50
51
52 @Override
53 public void accept(String partitionId) {
54
55 this.pumpStates.remove(partitionId);
56 removingPumpTestHook(partitionId);
57 }
58
59
60 protected PartitionPump createNewPump(CompleteLease lease) {
61 return new PartitionPump(this.hostContext, lease, this, this);
62 }
63
64 public CompletableFuture<Void> removePump(String partitionId, final CloseReason reason) {
65 CompletableFuture<Void> retval = CompletableFuture.completedFuture(null);
66 PartitionPump capturedPump = this.pumpStates.get(partitionId);
67 if (capturedPump != null) {
68 TRACE_LOGGER.info(this.hostContext.withHostAndPartition(partitionId,
69 "closing pump for reason " + reason.toString()));
70 retval = capturedPump.shutdown(reason);
71 } else {
72
73 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId,
74 "no pump found to remove for partition " + partitionId));
75 }
76 return retval;
77 }
78
79 public CompletableFuture<Void> removeAllPumps(CloseReason reason) {
80 setClosing();
81
82 CompletableFuture<?>[] futures = new CompletableFuture<?>[this.pumpStates.size()];
83 int i = 0;
84 for (String partitionId : this.pumpStates.keySet()) {
85 futures[i++] = removePump(partitionId, reason);
86 }
87
88 return CompletableFuture.allOf(futures).whenCompleteAsync((empty, e) -> {
89 setClosed();
90 }, this.hostContext.getExecutor());
91 }
92
93 protected void removingPumpTestHook(String partitionId) {
94
95 }
96 }