PartitionBasedLoadBalancer.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventhubs;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.OWNER_ID_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
/**
* This class is responsible for balancing the load of processing events from all partitions of an Event Hub by
* distributing the number of partitions uniformly among all the active {@link EventProcessorClient EventProcessors}.
* <p>
* This load balancer will retrieve partition ownership details from the {@link CheckpointStore} to find the number of
* active {@link EventProcessorClient EventProcessorCients}. It uses the last modified time to decide if an
* EventProcessor is active. If a partition ownership entry has not be updated for a specified duration of time, the
* owner of that partition is considered inactive and the partition is available for other EventProcessors to own.
* </p>
*/
final class PartitionBasedLoadBalancer {
private static final Random RANDOM = new Random();
private final ClientLogger logger = new ClientLogger(PartitionBasedLoadBalancer.class);
private final String eventHubName;
private final String consumerGroupName;
private final CheckpointStore checkpointStore;
private final EventHubAsyncClient eventHubAsyncClient;
private final String ownerId;
private final long inactiveTimeLimitInSeconds;
private final PartitionPumpManager partitionPumpManager;
private final String fullyQualifiedNamespace;
private final Consumer<ErrorContext> processError;
private final PartitionContext partitionAgnosticContext;
private final AtomicBoolean isLoadBalancerRunning = new AtomicBoolean();
private final LoadBalancingStrategy loadBalancingStrategy;
private final AtomicBoolean morePartitionsToClaim = new AtomicBoolean();
private final AtomicReference<List<String>> partitionsCache = new AtomicReference<>(new ArrayList<>());
/**
* Creates an instance of PartitionBasedLoadBalancer for the given Event Hub name and consumer group.
* @param checkpointStore The partition manager that this load balancer will use to read/update ownership details.
* @param eventHubAsyncClient The asynchronous Event Hub client used to consume events.
* @param eventHubName The Event Hub name the {@link EventProcessorClient} is associated with.
* @param consumerGroupName The consumer group name the {@link EventProcessorClient} is associated with.
* @param ownerId The identifier of the {@link EventProcessorClient} that owns this load balancer.
* @param inactiveTimeLimitInSeconds The time in seconds to wait for an update on an ownership record before
* assuming the owner of the partition is inactive.
* @param partitionPumpManager The partition pump manager that keeps track of all EventHubConsumers and partitions
* that this {@link EventProcessorClient} is processing.
* @param processError The callback that will be called when an error occurs while running the load balancer.
* @param loadBalancingStrategy The load balancing strategy to use.
*/
PartitionBasedLoadBalancer(final CheckpointStore checkpointStore,
final EventHubAsyncClient eventHubAsyncClient, final String fullyQualifiedNamespace,
final String eventHubName, final String consumerGroupName, final String ownerId,
final long inactiveTimeLimitInSeconds, final PartitionPumpManager partitionPumpManager,
final Consumer<ErrorContext> processError, LoadBalancingStrategy loadBalancingStrategy) {
this.checkpointStore = checkpointStore;
this.eventHubAsyncClient = eventHubAsyncClient;
this.fullyQualifiedNamespace = fullyQualifiedNamespace;
this.eventHubName = eventHubName;
this.consumerGroupName = consumerGroupName;
this.ownerId = ownerId;
this.inactiveTimeLimitInSeconds = inactiveTimeLimitInSeconds;
this.partitionPumpManager = partitionPumpManager;
this.processError = processError;
this.partitionAgnosticContext = new PartitionContext(fullyQualifiedNamespace, eventHubName,
consumerGroupName, "NONE");
this.loadBalancingStrategy = loadBalancingStrategy;
}
/**
* This is the main method responsible for load balancing. This method is expected to be invoked by the {@link
* EventProcessorClient} periodically. Every call to this method will result in this {@link EventProcessorClient}
* owning <b>at most one</b> new partition.
* <p>
* The load is considered balanced when no active EventProcessor owns 2 partitions more than any other active
* EventProcessor. Given that each invocation to this method results in ownership claim of at most one partition,
* this algorithm converges gradually towards a steady state.
* </p>
* When a new partition is claimed, this method is also responsible for starting a partition pump that creates an
* {@link EventHubConsumerAsyncClient} for processing events from that partition.
*/
void loadBalance() {
if (!isLoadBalancerRunning.compareAndSet(false, true)) {
logger.info("Load balancer already running");
return;
}
logger.atInfo()
.addKeyValue(OWNER_ID_KEY, this.ownerId)
.log("Starting load balancer.");
/*
* Retrieve current partition ownership details from the datastore.
*/
final Mono<Map<String, PartitionOwnership>> partitionOwnershipMono = checkpointStore
.listOwnership(fullyQualifiedNamespace, eventHubName, consumerGroupName)
.timeout(Duration.ofMinutes(1))
.collectMap(PartitionOwnership::getPartitionId, Function.identity());
/*
* Retrieve the list of partition ids from the Event Hub.
*/
Mono<List<String>> partitionsMono;
if (CoreUtils.isNullOrEmpty(partitionsCache.get())) {
// Call Event Hubs service to get the partition ids if the cache is empty
logger.atInfo()
.addKeyValue(ENTITY_PATH_KEY, eventHubName)
.log("Getting partitions from Event Hubs service.");
partitionsMono = eventHubAsyncClient
.getPartitionIds()
.timeout(Duration.ofMinutes(1))
.collectList();
} else {
partitionsMono = Mono.just(partitionsCache.get());
// we have the partitions, the client can be closed now
closeClient();
}
Mono.zip(partitionOwnershipMono, partitionsMono)
.flatMap(this::loadBalance)
.then()
.repeat(() -> LoadBalancingStrategy.GREEDY == loadBalancingStrategy && morePartitionsToClaim.get())
.subscribe(ignored -> { },
ex -> {
logger.warning(Messages.LOAD_BALANCING_FAILED, ex);
ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex);
processError.accept(errorContext);
isLoadBalancerRunning.set(false);
morePartitionsToClaim.set(false);
},
() -> logger.info("Load balancing completed successfully"));
}
/*
* This method works with the given partition ownership details and Event Hub partitions to evaluate whether the
* current Event Processor should take on the responsibility of processing more partitions.
*/
private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, List<String>> tuple) {
return Mono.fromRunnable(() -> {
logger.info("Starting next iteration of load balancer");
Map<String, PartitionOwnership> partitionOwnershipMap = tuple.getT1();
List<String> partitionIds = tuple.getT2();
if (CoreUtils.isNullOrEmpty(partitionIds)) {
// This may be due to an error when getting Event Hub metadata.
throw logger.logExceptionAsError(Exceptions.propagate(
new IllegalStateException("There are no partitions in Event Hub " + eventHubName)));
}
partitionsCache.set(partitionIds);
int numberOfPartitions = partitionIds.size();
logger.info("Number of ownership records {}, number of partitions {}", partitionOwnershipMap.size(),
numberOfPartitions);
if (!isValid(partitionOwnershipMap)) {
// User data is corrupt.
throw logger.logExceptionAsError(Exceptions.propagate(
new IllegalStateException("Invalid partitionOwnership data from CheckpointStore")));
}
/*
* Remove all partitions' ownership that have not been modified for a configuration period of time. This
* means that the previous EventProcessor that owned the partition is probably down and the partition is now
* eligible to be claimed by other EventProcessors.
*/
Map<String, PartitionOwnership> activePartitionOwnershipMap = removeInactivePartitionOwnerships(
partitionOwnershipMap);
logger.info("Number of active ownership records {}", activePartitionOwnershipMap.size());
/*
* Create a map of owner id and a list of partitions it owns
*/
Map<String, List<PartitionOwnership>> ownerPartitionMap = activePartitionOwnershipMap.values()
.stream()
.collect(
Collectors.groupingBy(PartitionOwnership::getOwnerId, mapping(Function.identity(), toList())));
// add the current event processor to the map if it doesn't exist
ownerPartitionMap.putIfAbsent(this.ownerId, new ArrayList<>());
if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
logger.verbose("Current partition distribution {}", format(ownerPartitionMap));
}
if (CoreUtils.isNullOrEmpty(activePartitionOwnershipMap)) {
/*
* If the active partition ownership map is empty, this is the first time an event processor is
* running or all Event Processors are down for this Event Hub, consumer group combination. All
* partitions in this Event Hub are available to claim. Choose a random partition to claim ownership.
*/
claimOwnership(partitionOwnershipMap, ownerPartitionMap,
partitionIds.get(RANDOM.nextInt(numberOfPartitions)));
return;
}
/*
* Find the minimum number of partitions every event processor should own when the load is
* evenly distributed.
*/
int numberOfActiveEventProcessors = ownerPartitionMap.size();
logger.info("Number of active event processors {}", ownerPartitionMap.size());
int minPartitionsPerEventProcessor = numberOfPartitions / numberOfActiveEventProcessors;
/*
* If the number of partitions in Event Hub is not evenly divisible by number of active event processors,
* a few Event Processors may own 1 additional partition than the minimum when the load is balanced.
* Calculate the number of event processors that can own additional partition.
*/
int numberOfEventProcessorsWithAdditionalPartition = numberOfPartitions % numberOfActiveEventProcessors;
logger.info("Expected min partitions per event processor = {}, expected number of event "
+ "processors with additional partition = {}", minPartitionsPerEventProcessor,
numberOfEventProcessorsWithAdditionalPartition);
if (isLoadBalanced(minPartitionsPerEventProcessor, numberOfEventProcessorsWithAdditionalPartition,
ownerPartitionMap)) {
// If the partitions are evenly distributed among all active event processors, no change required.
logger.info("Load is balanced with this event processor owning {} partitions",
ownerPartitionMap.get(ownerId).size());
renewOwnership(partitionOwnershipMap);
return;
}
if (!shouldOwnMorePartitions(minPartitionsPerEventProcessor, ownerPartitionMap)) {
// This event processor already has enough partitions and shouldn't own more.
logger.info("This event processor owns {} partitions and shouldn't own more",
ownerPartitionMap.get(ownerId).size());
renewOwnership(partitionOwnershipMap);
return;
}
// If we have reached this stage, this event processor has to claim/steal ownership of at least 1
// more partition
logger.info(
"Load is unbalanced and this event processor owns {} partitions and should own more partitions",
ownerPartitionMap.get(ownerId).size());
/*
* If some partitions are unclaimed, this could be because an event processor is down and
* it's partitions are now available for others to own or because event processors are just
* starting up and gradually claiming partitions to own or new partitions were added to Event Hub.
* Find any partition that is not actively owned and claim it.
*
* OR
*
* Find a partition to steal from another event processor. Pick the event processor that has owns the
* highest number of partitions.
*/
String partitionToClaim = partitionIds.parallelStream()
.filter(partitionId -> !activePartitionOwnershipMap.containsKey(partitionId))
.findAny()
.orElseGet(() -> {
logger.info("No unclaimed partitions, stealing from another event processor");
return findPartitionToSteal(ownerPartitionMap);
});
claimOwnership(partitionOwnershipMap, ownerPartitionMap, partitionToClaim);
});
}
/*
* Closes the client used by load balancer to get the partitions.
*/
private void closeClient() {
try {
// this is an idempotent operation, calling close on an already closed client is just a no-op.
this.eventHubAsyncClient.close();
} catch (Exception ex) {
logger.warning("Failed to close the client", ex);
}
}
/*
* This method renews the ownership of currently owned partitions
*/
private void renewOwnership(Map<String, PartitionOwnership> partitionOwnershipMap) {
morePartitionsToClaim.set(false);
// renew ownership of already owned partitions
checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet()
.stream()
.filter(
partitionId -> partitionOwnershipMap.containsKey(partitionId) && partitionOwnershipMap.get(partitionId)
.getOwnerId().equals(this.ownerId))
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()))
.subscribe(partitionPumpManager::verifyPartitionConnection,
ex -> {
logger.error("Error renewing partition ownership", ex);
isLoadBalancerRunning.set(false);
},
() -> isLoadBalancerRunning.set(false));
}
private String format(Map<String, List<PartitionOwnership>> ownerPartitionMap) {
return ownerPartitionMap.entrySet()
.stream()
.map(entry -> {
StringBuilder sb = new StringBuilder();
sb.append(entry.getKey()).append("=[");
sb.append(entry.getValue().stream().map(po -> po.getPartitionId()).collect(Collectors.joining(",")));
sb.append("]");
return sb.toString();
}).collect(Collectors.joining(";"));
}
/*
* Check if partition ownership data is valid before proceeding with load balancing.
*/
private boolean isValid(final Map<String, PartitionOwnership> partitionOwnershipMap) {
return partitionOwnershipMap.values()
.stream()
.noneMatch(partitionOwnership -> {
return partitionOwnership.getEventHubName() == null
|| !partitionOwnership.getEventHubName().equals(this.eventHubName)
|| partitionOwnership.getConsumerGroup() == null
|| !partitionOwnership.getConsumerGroup().equals(this.consumerGroupName)
|| partitionOwnership.getPartitionId() == null
|| partitionOwnership.getLastModifiedTime() == null
|| partitionOwnership.getETag() == null;
});
}
/*
* Find the event processor that owns the maximum number of partitions and steal a random partition
* from it.
*/
private String findPartitionToSteal(final Map<String, List<PartitionOwnership>> ownerPartitionMap) {
Map.Entry<String, List<PartitionOwnership>> ownerWithMaxPartitions = ownerPartitionMap.entrySet()
.stream()
.max(Comparator.comparingInt(entry -> entry.getValue().size()))
.get();
int numberOfPartitions = ownerWithMaxPartitions.getValue().size();
logger.atInfo()
.addKeyValue(OWNER_ID_KEY, ownerWithMaxPartitions.getKey())
.log("Owner owns {} partitions, stealing a partition from it.", numberOfPartitions);
return ownerWithMaxPartitions.getValue().get(RANDOM.nextInt(numberOfPartitions)).getPartitionId();
}
/*
* When the load is balanced, all active event processors own at least {@code minPartitionsPerEventProcessor}
* and only {@code numberOfEventProcessorsWithAdditionalPartition} event processors will own 1 additional
* partition.
*/
private boolean isLoadBalanced(final int minPartitionsPerEventProcessor,
final int numberOfEventProcessorsWithAdditionalPartition,
final Map<String, List<PartitionOwnership>> ownerPartitionMap) {
int count = 0;
for (List<PartitionOwnership> partitionOwnership : ownerPartitionMap.values()) {
int numberOfPartitions = partitionOwnership.size();
if (numberOfPartitions < minPartitionsPerEventProcessor
|| numberOfPartitions > minPartitionsPerEventProcessor + 1) {
return false;
}
if (numberOfPartitions == minPartitionsPerEventProcessor + 1) {
count++;
}
}
return count == numberOfEventProcessorsWithAdditionalPartition;
}
/*
* This method is called after determining that the load is not balanced. This method will evaluate
* if the current event processor should own more partitions. Specifically, this method returns true if the
* current event processor owns less than the minimum number of partitions or if it owns the minimum number
* and no other event processor owns lesser number of partitions than this event processor.
*/
private boolean shouldOwnMorePartitions(final int minPartitionsPerEventProcessor,
final Map<String, List<PartitionOwnership>> ownerPartitionMap) {
int numberOfPartitionsOwned = ownerPartitionMap.get(this.ownerId).size();
int leastPartitionsOwnedByAnyEventProcessor =
ownerPartitionMap.values().stream().min(Comparator.comparingInt(List::size)).get().size();
return numberOfPartitionsOwned < minPartitionsPerEventProcessor
|| numberOfPartitionsOwned == leastPartitionsOwnedByAnyEventProcessor;
}
/*
* This method will create a new map of partition id and PartitionOwnership containing only those partitions
* that are actively owned. All entries in the original map returned by CheckpointStore that haven't been
* modified for a duration of time greater than the allowed inactivity time limit are assumed to be owned by
* dead event processors. These will not be included in the map returned by this method.
*/
private Map<String, PartitionOwnership> removeInactivePartitionOwnerships(
final Map<String, PartitionOwnership> partitionOwnershipMap) {
return partitionOwnershipMap
.entrySet()
.stream()
.filter(entry -> {
return (System.currentTimeMillis() - entry.getValue().getLastModifiedTime() < TimeUnit.SECONDS
.toMillis(inactiveTimeLimitInSeconds))
&& !CoreUtils.isNullOrEmpty(entry.getValue().getOwnerId());
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}
private void claimOwnership(final Map<String, PartitionOwnership> partitionOwnershipMap, Map<String,
List<PartitionOwnership>> ownerPartitionsMap, final String partitionIdToClaim) {
logger.atInfo()
.addKeyValue(PARTITION_ID_KEY, partitionIdToClaim)
.log("Attempting to claim ownership of partition.");
PartitionOwnership ownershipRequest = createPartitionOwnershipRequest(partitionOwnershipMap,
partitionIdToClaim);
List<PartitionOwnership> partitionsToClaim = new ArrayList<>();
partitionsToClaim.add(ownershipRequest);
partitionsToClaim.addAll(partitionPumpManager.getPartitionPumps()
.keySet()
.stream()
.filter(
partitionId -> partitionOwnershipMap.containsKey(partitionId) && partitionOwnershipMap.get(partitionId)
.getOwnerId().equals(this.ownerId))
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()));
morePartitionsToClaim.set(true);
checkpointStore
.claimOwnership(partitionsToClaim)
.doOnNext(partitionOwnership -> logger.atInfo()
.addKeyValue(PARTITION_ID_KEY, partitionOwnership.getPartitionId())
.log("Successfully claimed ownership."))
.doOnError(ex -> logger
.atWarning()
.addKeyValue(PARTITION_ID_KEY, ownershipRequest.getPartitionId())
.log(Messages.FAILED_TO_CLAIM_OWNERSHIP, ex))
.collectList()
.zipWhen(ownershipList -> checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName,
consumerGroupName)
.collectMap(checkpoint -> checkpoint.getPartitionId(), Function.identity()))
.subscribe(ownedPartitionCheckpointsTuple -> {
ownedPartitionCheckpointsTuple.getT1()
.stream()
.forEach(po -> partitionPumpManager.startPartitionPump(po,
ownedPartitionCheckpointsTuple.getT2().get(po.getPartitionId())));
},
ex -> {
logger.warning("Error while listing checkpoints", ex);
ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex);
processError.accept(errorContext);
if (loadBalancingStrategy == LoadBalancingStrategy.BALANCED) {
isLoadBalancerRunning.set(false);
}
throw logger.logExceptionAsError(new IllegalStateException("Error while listing checkpoints", ex));
},
() -> {
if (loadBalancingStrategy == LoadBalancingStrategy.BALANCED) {
isLoadBalancerRunning.set(false);
}
});
}
private PartitionOwnership createPartitionOwnershipRequest(
final Map<String, PartitionOwnership> partitionOwnershipMap,
final String partitionIdToClaim) {
PartitionOwnership previousPartitionOwnership = partitionOwnershipMap.get(partitionIdToClaim);
PartitionOwnership partitionOwnershipRequest = new PartitionOwnership()
.setFullyQualifiedNamespace(this.fullyQualifiedNamespace)
.setOwnerId(this.ownerId)
.setPartitionId(partitionIdToClaim)
.setConsumerGroup(this.consumerGroupName)
.setEventHubName(this.eventHubName)
.setETag(previousPartitionOwnership == null ? null : previousPartitionOwnership.getETag());
return partitionOwnershipRequest;
}
}