PartitionBasedLoadBalancer.java

  1. // Copyright (c) Microsoft Corporation. All rights reserved.
  2. // Licensed under the MIT License.

  3. package com.azure.messaging.eventhubs;

  4. import com.azure.core.util.CoreUtils;
  5. import com.azure.core.util.logging.ClientLogger;
  6. import com.azure.core.util.logging.LogLevel;
  7. import com.azure.messaging.eventhubs.models.ErrorContext;
  8. import com.azure.messaging.eventhubs.models.PartitionContext;
  9. import com.azure.messaging.eventhubs.models.PartitionOwnership;

  10. import java.time.Duration;
  11. import java.util.concurrent.atomic.AtomicBoolean;
  12. import reactor.core.Exceptions;
  13. import reactor.core.publisher.Mono;
  14. import reactor.util.function.Tuple2;

  15. import java.util.ArrayList;
  16. import java.util.Comparator;
  17. import java.util.List;
  18. import java.util.Map;
  19. import java.util.Map.Entry;
  20. import java.util.Random;
  21. import java.util.concurrent.TimeUnit;
  22. import java.util.concurrent.atomic.AtomicReference;
  23. import java.util.function.Consumer;
  24. import java.util.function.Function;
  25. import java.util.stream.Collectors;

  26. import static com.azure.messaging.eventhubs.implementation.ClientConstants.ENTITY_PATH_KEY;
  27. import static com.azure.messaging.eventhubs.implementation.ClientConstants.OWNER_ID_KEY;
  28. import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;
  29. import static java.util.stream.Collectors.mapping;
  30. import static java.util.stream.Collectors.toList;

  31. /**
  32.  * This class is responsible for balancing the load of processing events from all partitions of an Event Hub by
  33.  * distributing the number of partitions uniformly among all the  active {@link EventProcessorClient EventProcessors}.
  34.  * <p>
  35.  * This load balancer will retrieve partition ownership details from the {@link CheckpointStore} to find the number of
  36.  * active {@link EventProcessorClient EventProcessorCients}. It uses the last modified time to decide if an
  37.  * EventProcessor is active. If a partition ownership entry has not be updated for a specified duration of time, the
  38.  * owner of that partition is considered inactive and the partition is available for other EventProcessors to own.
  39.  * </p>
  40.  */
  41. final class PartitionBasedLoadBalancer {

  42.     private static final Random RANDOM = new Random();
  43.     private final ClientLogger logger = new ClientLogger(PartitionBasedLoadBalancer.class);

  44.     private final String eventHubName;
  45.     private final String consumerGroupName;
  46.     private final CheckpointStore checkpointStore;
  47.     private final EventHubAsyncClient eventHubAsyncClient;
  48.     private final String ownerId;
  49.     private final long inactiveTimeLimitInSeconds;
  50.     private final PartitionPumpManager partitionPumpManager;
  51.     private final String fullyQualifiedNamespace;
  52.     private final Consumer<ErrorContext> processError;
  53.     private final PartitionContext partitionAgnosticContext;
  54.     private final AtomicBoolean isLoadBalancerRunning = new AtomicBoolean();
  55.     private final LoadBalancingStrategy loadBalancingStrategy;
  56.     private final AtomicBoolean morePartitionsToClaim = new AtomicBoolean();
  57.     private final AtomicReference<List<String>> partitionsCache = new AtomicReference<>(new ArrayList<>());

  58.     /**
  59.      * Creates an instance of PartitionBasedLoadBalancer for the given Event Hub name and consumer group.
  60.      * @param checkpointStore The partition manager that this load balancer will use to read/update ownership details.
  61.      * @param eventHubAsyncClient The asynchronous Event Hub client used to consume events.
  62.      * @param eventHubName The Event Hub name the {@link EventProcessorClient} is associated with.
  63.      * @param consumerGroupName The consumer group name the {@link EventProcessorClient} is associated with.
  64.      * @param ownerId The identifier of the {@link EventProcessorClient} that owns this load balancer.
  65.      * @param inactiveTimeLimitInSeconds The time in seconds to wait for an update on an ownership record before
  66. * assuming the owner of the partition is inactive.
  67.      * @param partitionPumpManager The partition pump manager that keeps track of all EventHubConsumers and partitions
  68. * that this {@link EventProcessorClient} is processing.
  69.      * @param processError The callback that will be called when an error occurs while running the load balancer.
  70.      * @param loadBalancingStrategy The load balancing strategy to use.
  71.      */
  72.     PartitionBasedLoadBalancer(final CheckpointStore checkpointStore,
  73.         final EventHubAsyncClient eventHubAsyncClient, final String fullyQualifiedNamespace,
  74.         final String eventHubName, final String consumerGroupName, final String ownerId,
  75.         final long inactiveTimeLimitInSeconds, final PartitionPumpManager partitionPumpManager,
  76.         final Consumer<ErrorContext> processError, LoadBalancingStrategy loadBalancingStrategy) {
  77.         this.checkpointStore = checkpointStore;
  78.         this.eventHubAsyncClient = eventHubAsyncClient;
  79.         this.fullyQualifiedNamespace = fullyQualifiedNamespace;
  80.         this.eventHubName = eventHubName;
  81.         this.consumerGroupName = consumerGroupName;
  82.         this.ownerId = ownerId;
  83.         this.inactiveTimeLimitInSeconds = inactiveTimeLimitInSeconds;
  84.         this.partitionPumpManager = partitionPumpManager;
  85.         this.processError = processError;
  86.         this.partitionAgnosticContext = new PartitionContext(fullyQualifiedNamespace, eventHubName,
  87.             consumerGroupName, "NONE");
  88.         this.loadBalancingStrategy = loadBalancingStrategy;
  89.     }

  90.     /**
  91.      * This is the main method responsible for load balancing. This method is expected to be invoked by the {@link
  92.      * EventProcessorClient} periodically. Every call to this method will result in this {@link EventProcessorClient}
  93.      * owning <b>at most one</b> new partition.
  94.      * <p>
  95.      * The load is considered balanced when no active EventProcessor owns 2 partitions more than any other active
  96.      * EventProcessor. Given that each invocation to this method results in ownership claim of at most one partition,
  97.      * this algorithm converges gradually towards a steady state.
  98.      * </p>
  99.      * When a new partition is claimed, this method is also responsible for starting a partition pump that creates an
  100.      * {@link EventHubConsumerAsyncClient} for processing events from that partition.
  101.      */
  102.     void loadBalance() {

  103.         if (!isLoadBalancerRunning.compareAndSet(false, true)) {
  104.             logger.info("Load balancer already running");
  105.             return;
  106.         }

  107.         logger.atInfo()
  108.             .addKeyValue(OWNER_ID_KEY, this.ownerId)
  109.             .log("Starting load balancer.");
  110.         /*
  111.          * Retrieve current partition ownership details from the datastore.
  112.          */
  113.         final Mono<Map<String, PartitionOwnership>> partitionOwnershipMono = checkpointStore
  114.             .listOwnership(fullyQualifiedNamespace, eventHubName, consumerGroupName)
  115.             .timeout(Duration.ofMinutes(1))
  116.             .collectMap(PartitionOwnership::getPartitionId, Function.identity());

  117.         /*
  118.          * Retrieve the list of partition ids from the Event Hub.
  119.          */
  120.         Mono<List<String>> partitionsMono;
  121.         if (CoreUtils.isNullOrEmpty(partitionsCache.get())) {
  122.             // Call Event Hubs service to get the partition ids if the cache is empty
  123.             logger.atInfo()
  124.                 .addKeyValue(ENTITY_PATH_KEY, eventHubName)
  125.                 .log("Getting partitions from Event Hubs service.");

  126.             partitionsMono = eventHubAsyncClient
  127.                 .getPartitionIds()
  128.                 .timeout(Duration.ofMinutes(1))
  129.                 .collectList();
  130.         } else {
  131.             partitionsMono = Mono.just(partitionsCache.get());
  132.             // we have the partitions, the client can be closed now
  133.             closeClient();
  134.         }

  135.         Mono.zip(partitionOwnershipMono, partitionsMono)
  136.             .flatMap(this::loadBalance)
  137.             .then()
  138.             .repeat(() -> LoadBalancingStrategy.GREEDY == loadBalancingStrategy && morePartitionsToClaim.get())
  139.             .subscribe(ignored -> { },
  140.                 ex -> {
  141.                     logger.warning(Messages.LOAD_BALANCING_FAILED, ex);
  142.                     ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex);
  143.                     processError.accept(errorContext);
  144.                     isLoadBalancerRunning.set(false);
  145.                     morePartitionsToClaim.set(false);
  146.                 },
  147.                 () -> logger.info("Load balancing completed successfully"));

  148.     }

  149.     /*
  150.      * This method works with the given partition ownership details and Event Hub partitions to evaluate whether the
  151.      * current Event Processor should take on the responsibility of processing more partitions.
  152.      */
  153.     private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, List<String>> tuple) {
  154.         return Mono.fromRunnable(() -> {
  155.             logger.info("Starting next iteration of load balancer");
  156.             Map<String, PartitionOwnership> partitionOwnershipMap = tuple.getT1();

  157.             List<String> partitionIds = tuple.getT2();

  158.             if (CoreUtils.isNullOrEmpty(partitionIds)) {
  159.                 // This may be due to an error when getting Event Hub metadata.
  160.                 throw logger.logExceptionAsError(Exceptions.propagate(
  161.                     new IllegalStateException("There are no partitions in Event Hub " + eventHubName)));
  162.             }
  163.             partitionsCache.set(partitionIds);
  164.             int numberOfPartitions = partitionIds.size();
  165.             logger.info("Number of ownership records {}, number of partitions {}", partitionOwnershipMap.size(),
  166.                 numberOfPartitions);

  167.             if (!isValid(partitionOwnershipMap)) {
  168.                 // User data is corrupt.
  169.                 throw logger.logExceptionAsError(Exceptions.propagate(
  170.                     new IllegalStateException("Invalid partitionOwnership data from CheckpointStore")));
  171.             }

  172.             /*
  173.              * Remove all partitions' ownership that have not been modified for a configuration period of time. This
  174.              * means that the previous EventProcessor that owned the partition is probably down and the partition is now
  175.              * eligible to be claimed by other EventProcessors.
  176.              */
  177.             Map<String, PartitionOwnership> activePartitionOwnershipMap = removeInactivePartitionOwnerships(
  178.                 partitionOwnershipMap);
  179.             logger.info("Number of active ownership records {}", activePartitionOwnershipMap.size());

  180.             /*
  181.              * Create a map of owner id and a list of partitions it owns
  182.              */
  183.             Map<String, List<PartitionOwnership>> ownerPartitionMap = activePartitionOwnershipMap.values()
  184.                 .stream()
  185.                 .collect(
  186.                     Collectors.groupingBy(PartitionOwnership::getOwnerId, mapping(Function.identity(), toList())));

  187.             // add the current event processor to the map if it doesn't exist
  188.             ownerPartitionMap.putIfAbsent(this.ownerId, new ArrayList<>());

  189.             if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
  190.                 logger.verbose("Current partition distribution {}", format(ownerPartitionMap));
  191.             }

  192.             if (CoreUtils.isNullOrEmpty(activePartitionOwnershipMap)) {
  193.                 /*
  194.                  * If the active partition ownership map is empty, this is the first time an event processor is
  195.                  * running or all Event Processors are down for this Event Hub, consumer group combination. All
  196.                  * partitions in this Event Hub are available to claim. Choose a random partition to claim ownership.
  197.                  */
  198.                 claimOwnership(partitionOwnershipMap, ownerPartitionMap,
  199.                     partitionIds.get(RANDOM.nextInt(numberOfPartitions)));
  200.                 return;
  201.             }

  202.             /*
  203.              * Find the minimum number of partitions every event processor should own when the load is
  204.              * evenly distributed.
  205.              */
  206.             int numberOfActiveEventProcessors = ownerPartitionMap.size();
  207.             logger.info("Number of active event processors {}", ownerPartitionMap.size());

  208.             int minPartitionsPerEventProcessor = numberOfPartitions / numberOfActiveEventProcessors;

  209.             /*
  210.              * If the number of partitions in Event Hub is not evenly divisible by number of active event processors,
  211.              * a few Event Processors may own 1 additional partition than the minimum when the load is balanced.
  212.              * Calculate the number of event processors that can own additional partition.
  213.              */
  214.             int numberOfEventProcessorsWithAdditionalPartition = numberOfPartitions % numberOfActiveEventProcessors;

  215.             logger.info("Expected min partitions per event processor = {}, expected number of event "
  216.                     + "processors with additional partition = {}", minPartitionsPerEventProcessor,
  217.                 numberOfEventProcessorsWithAdditionalPartition);

  218.             if (isLoadBalanced(minPartitionsPerEventProcessor, numberOfEventProcessorsWithAdditionalPartition,
  219.                 ownerPartitionMap)) {
  220.                 // If the partitions are evenly distributed among all active event processors, no change required.
  221.                 logger.info("Load is balanced with this event processor owning {} partitions",
  222.                     ownerPartitionMap.get(ownerId).size());
  223.                 renewOwnership(partitionOwnershipMap);
  224.                 return;
  225.             }

  226.             if (!shouldOwnMorePartitions(minPartitionsPerEventProcessor, ownerPartitionMap)) {
  227.                 // This event processor already has enough partitions and shouldn't own more.
  228.                 logger.info("This event processor owns {} partitions and shouldn't own more",
  229.                     ownerPartitionMap.get(ownerId).size());
  230.                 renewOwnership(partitionOwnershipMap);
  231.                 return;
  232.             }

  233.             // If we have reached this stage, this event processor has to claim/steal ownership of at least 1
  234.             // more partition
  235.             logger.info(
  236.                 "Load is unbalanced and this event processor owns {} partitions and should own more partitions",
  237.                 ownerPartitionMap.get(ownerId).size());
  238.             /*
  239.              * If some partitions are unclaimed, this could be because an event processor is down and
  240.              * it's partitions are now available for others to own or because event processors are just
  241.              * starting up and gradually claiming partitions to own or new partitions were added to Event Hub.
  242.              * Find any partition that is not actively owned and claim it.
  243.              *
  244.              * OR
  245.              *
  246.              * Find a partition to steal from another event processor. Pick the event processor that has owns the
  247.              * highest number of partitions.
  248.              */
  249.             String partitionToClaim = partitionIds.parallelStream()
  250.                 .filter(partitionId -> !activePartitionOwnershipMap.containsKey(partitionId))
  251.                 .findAny()
  252.                 .orElseGet(() -> {
  253.                     logger.info("No unclaimed partitions, stealing from another event processor");
  254.                     return findPartitionToSteal(ownerPartitionMap);
  255.                 });

  256.             claimOwnership(partitionOwnershipMap, ownerPartitionMap, partitionToClaim);
  257.         });
  258.     }

  259.     /*
  260.      * Closes the client used by load balancer to get the partitions.
  261.      */
  262.     private void closeClient() {
  263.         try {
  264.             // this is an idempotent operation, calling close on an already closed client is just a no-op.
  265.             this.eventHubAsyncClient.close();
  266.         } catch (Exception ex) {
  267.             logger.warning("Failed to close the client", ex);
  268.         }
  269.     }

  270.     /*
  271.      * This method renews the ownership of currently owned partitions
  272.      */
  273.     private void renewOwnership(Map<String, PartitionOwnership> partitionOwnershipMap) {
  274.         morePartitionsToClaim.set(false);
  275.         // renew ownership of already owned partitions
  276.         checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet()
  277.             .stream()
  278.             .filter(
  279.                 partitionId -> partitionOwnershipMap.containsKey(partitionId) && partitionOwnershipMap.get(partitionId)
  280.                     .getOwnerId().equals(this.ownerId))
  281.             .map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
  282.             .collect(Collectors.toList()))
  283.             .subscribe(partitionPumpManager::verifyPartitionConnection,
  284.                 ex -> {
  285.                     logger.error("Error renewing partition ownership", ex);
  286.                     isLoadBalancerRunning.set(false);
  287.                 },
  288.                 () -> isLoadBalancerRunning.set(false));
  289.     }

  290.     private String format(Map<String, List<PartitionOwnership>> ownerPartitionMap) {
  291.         return ownerPartitionMap.entrySet()
  292.             .stream()
  293.             .map(entry -> {
  294.                 StringBuilder sb = new StringBuilder();
  295.                 sb.append(entry.getKey()).append("=[");
  296.                 sb.append(entry.getValue().stream().map(po -> po.getPartitionId()).collect(Collectors.joining(",")));
  297.                 sb.append("]");
  298.                 return sb.toString();
  299.             }).collect(Collectors.joining(";"));
  300.     }

  301.     /*
  302.      * Check if partition ownership data is valid before proceeding with load balancing.
  303.      */
  304.     private boolean isValid(final Map<String, PartitionOwnership> partitionOwnershipMap) {
  305.         return partitionOwnershipMap.values()
  306.             .stream()
  307.             .noneMatch(partitionOwnership -> {
  308.                 return partitionOwnership.getEventHubName() == null
  309.                     || !partitionOwnership.getEventHubName().equals(this.eventHubName)
  310.                     || partitionOwnership.getConsumerGroup() == null
  311.                     || !partitionOwnership.getConsumerGroup().equals(this.consumerGroupName)
  312.                     || partitionOwnership.getPartitionId() == null
  313.                     || partitionOwnership.getLastModifiedTime() == null
  314.                     || partitionOwnership.getETag() == null;
  315.             });
  316.     }

  317.     /*
  318.      * Find the event processor that owns the maximum number of partitions and steal a random partition
  319.      * from it.
  320.      */
  321.     private String findPartitionToSteal(final Map<String, List<PartitionOwnership>> ownerPartitionMap) {
  322.         Map.Entry<String, List<PartitionOwnership>> ownerWithMaxPartitions = ownerPartitionMap.entrySet()
  323.             .stream()
  324.             .max(Comparator.comparingInt(entry -> entry.getValue().size()))
  325.             .get();
  326.         int numberOfPartitions = ownerWithMaxPartitions.getValue().size();

  327.         logger.atInfo()
  328.             .addKeyValue(OWNER_ID_KEY, ownerWithMaxPartitions.getKey())
  329.             .log("Owner owns {} partitions, stealing a partition from it.", numberOfPartitions);

  330.         return ownerWithMaxPartitions.getValue().get(RANDOM.nextInt(numberOfPartitions)).getPartitionId();
  331.     }

  332.     /*
  333.      * When the load is balanced, all active event processors own at least {@code minPartitionsPerEventProcessor}
  334.      * and only {@code numberOfEventProcessorsWithAdditionalPartition} event processors will own 1 additional
  335.      * partition.
  336.      */
  337.     private boolean isLoadBalanced(final int minPartitionsPerEventProcessor,
  338.         final int numberOfEventProcessorsWithAdditionalPartition,
  339.         final Map<String, List<PartitionOwnership>> ownerPartitionMap) {

  340.         int count = 0;
  341.         for (List<PartitionOwnership> partitionOwnership : ownerPartitionMap.values()) {
  342.             int numberOfPartitions = partitionOwnership.size();
  343.             if (numberOfPartitions < minPartitionsPerEventProcessor
  344.                 || numberOfPartitions > minPartitionsPerEventProcessor + 1) {
  345.                 return false;
  346.             }

  347.             if (numberOfPartitions == minPartitionsPerEventProcessor + 1) {
  348.                 count++;
  349.             }
  350.         }
  351.         return count == numberOfEventProcessorsWithAdditionalPartition;
  352.     }

  353.     /*
  354.      * This method is called after determining that the load is not balanced. This method will evaluate
  355.      * if the current event processor should own more partitions. Specifically, this method returns true if the
  356.      * current event processor owns less than the minimum number of partitions or if it owns the minimum number
  357.      * and no other event processor owns lesser number of partitions than this event processor.
  358.      */
  359.     private boolean shouldOwnMorePartitions(final int minPartitionsPerEventProcessor,
  360.         final Map<String, List<PartitionOwnership>> ownerPartitionMap) {

  361.         int numberOfPartitionsOwned = ownerPartitionMap.get(this.ownerId).size();

  362.         int leastPartitionsOwnedByAnyEventProcessor =
  363.             ownerPartitionMap.values().stream().min(Comparator.comparingInt(List::size)).get().size();

  364.         return numberOfPartitionsOwned < minPartitionsPerEventProcessor
  365.             || numberOfPartitionsOwned == leastPartitionsOwnedByAnyEventProcessor;
  366.     }

  367.     /*
  368.      * This method will create a new map of partition id and PartitionOwnership containing only those partitions
  369.      * that are actively owned. All entries in the original map returned by CheckpointStore that haven't been
  370.      * modified for a duration of time greater than the allowed inactivity time limit are assumed to be owned by
  371.      * dead event processors. These will not be included in the map returned by this method.
  372.      */
  373.     private Map<String, PartitionOwnership> removeInactivePartitionOwnerships(
  374.         final Map<String, PartitionOwnership> partitionOwnershipMap) {
  375.         return partitionOwnershipMap
  376.             .entrySet()
  377.             .stream()
  378.             .filter(entry -> {
  379.                 return (System.currentTimeMillis() - entry.getValue().getLastModifiedTime() < TimeUnit.SECONDS
  380.                     .toMillis(inactiveTimeLimitInSeconds))
  381.                     && !CoreUtils.isNullOrEmpty(entry.getValue().getOwnerId());
  382.             }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
  383.     }

  384.     private void claimOwnership(final Map<String, PartitionOwnership> partitionOwnershipMap, Map<String,
  385.         List<PartitionOwnership>> ownerPartitionsMap, final String partitionIdToClaim) {
  386.         logger.atInfo()
  387.             .addKeyValue(PARTITION_ID_KEY, partitionIdToClaim)
  388.             .log("Attempting to claim ownership of partition.");

  389.         PartitionOwnership ownershipRequest = createPartitionOwnershipRequest(partitionOwnershipMap,
  390.             partitionIdToClaim);

  391.         List<PartitionOwnership> partitionsToClaim = new ArrayList<>();
  392.         partitionsToClaim.add(ownershipRequest);
  393.         partitionsToClaim.addAll(partitionPumpManager.getPartitionPumps()
  394.             .keySet()
  395.             .stream()
  396.             .filter(
  397.                 partitionId -> partitionOwnershipMap.containsKey(partitionId) && partitionOwnershipMap.get(partitionId)
  398.                     .getOwnerId().equals(this.ownerId))
  399.             .map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
  400.             .collect(Collectors.toList()));

  401.         morePartitionsToClaim.set(true);
  402.         checkpointStore
  403.             .claimOwnership(partitionsToClaim)
  404.             .doOnNext(partitionOwnership -> logger.atInfo()
  405.                     .addKeyValue(PARTITION_ID_KEY, partitionOwnership.getPartitionId())
  406.                     .log("Successfully claimed ownership."))
  407.             .doOnError(ex -> logger
  408.                 .atWarning()
  409.                 .addKeyValue(PARTITION_ID_KEY, ownershipRequest.getPartitionId())
  410.                 .log(Messages.FAILED_TO_CLAIM_OWNERSHIP, ex))
  411.             .collectList()
  412.             .zipWhen(ownershipList -> checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName,
  413.                 consumerGroupName)
  414.                 .collectMap(checkpoint -> checkpoint.getPartitionId(), Function.identity()))
  415.             .subscribe(ownedPartitionCheckpointsTuple -> {
  416.                 ownedPartitionCheckpointsTuple.getT1()
  417.                     .stream()
  418.                     .forEach(po -> partitionPumpManager.startPartitionPump(po,
  419.                         ownedPartitionCheckpointsTuple.getT2().get(po.getPartitionId())));
  420.             },
  421.                 ex -> {
  422.                     logger.warning("Error while listing checkpoints", ex);
  423.                     ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex);
  424.                     processError.accept(errorContext);
  425.                     if (loadBalancingStrategy == LoadBalancingStrategy.BALANCED) {
  426.                         isLoadBalancerRunning.set(false);
  427.                     }
  428.                     throw logger.logExceptionAsError(new IllegalStateException("Error while listing checkpoints", ex));
  429.                 },
  430.                 () -> {
  431.                     if (loadBalancingStrategy == LoadBalancingStrategy.BALANCED) {
  432.                         isLoadBalancerRunning.set(false);
  433.                     }
  434.                 });
  435.     }

  436.     private PartitionOwnership createPartitionOwnershipRequest(
  437.         final Map<String, PartitionOwnership> partitionOwnershipMap,
  438.         final String partitionIdToClaim) {
  439.         PartitionOwnership previousPartitionOwnership = partitionOwnershipMap.get(partitionIdToClaim);
  440.         PartitionOwnership partitionOwnershipRequest = new PartitionOwnership()
  441.             .setFullyQualifiedNamespace(this.fullyQualifiedNamespace)
  442.             .setOwnerId(this.ownerId)
  443.             .setPartitionId(partitionIdToClaim)
  444.             .setConsumerGroup(this.consumerGroupName)
  445.             .setEventHubName(this.eventHubName)
  446.             .setETag(previousPartitionOwnership == null ? null : previousPartitionOwnership.getETag());
  447.         return partitionOwnershipRequest;
  448.     }
  449. }