From f2e09cb477f2f591a887d06b873c9b86e81d3ede Mon Sep 17 00:00:00 2001 From: Thomas Vachuska Date: Mon, 6 Nov 2017 15:17:06 -0800 Subject: [PATCH] Adding ability to handle orphaned devices when balancing mastership. Change-Id: I01dd7a3074475d79504d516fbd3fd32ef18770ce --- .../cluster/impl/MastershipManager.java | 183 +++++++++++------- .../cluster/impl/MastershipManagerTest.java | 33 ++++ 2 files changed, 149 insertions(+), 67 deletions(-) diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java index 1a4fc5f020..c744d614d7 100644 --- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java +++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java @@ -57,6 +57,7 @@ import org.slf4j.Logger; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -67,6 +68,7 @@ import java.util.concurrent.CompletableFuture; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Lists.newArrayList; +import static java.util.concurrent.CompletableFuture.allOf; import static org.onlab.metrics.MetricsUtil.startTimer; import static org.onlab.metrics.MetricsUtil.stopTimer; import static org.onosproject.net.MastershipRole.MASTER; @@ -76,13 +78,15 @@ import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE; import static org.slf4j.LoggerFactory.getLogger; - +/** + * Component providing the node-device mastership service. + */ @Component(immediate = true) @Service public class MastershipManager - extends AbstractListenerManager - implements MastershipService, MastershipAdminService, MastershipTermService, - MetricsHelper { + extends AbstractListenerManager + implements MastershipService, MastershipAdminService, MastershipTermService, + MetricsHelper { private static final String NODE_ID_NULL = "Node ID cannot be null"; private static final String DEVICE_ID_NULL = "Device ID cannot be null"; @@ -116,7 +120,7 @@ public class MastershipManager static final boolean DEFAULT_USE_REGION_FOR_BALANCE_ROLES = false; @Property(name = "useRegionForBalanceRoles", boolValue = DEFAULT_USE_REGION_FOR_BALANCE_ROLES, - label = "Use Regions for balancing roles") + label = "Use Regions for balancing roles") protected boolean useRegionForBalanceRoles; private static final boolean DEFAULT_REBALANCE_ROLES_ON_UPGRADE = true; @@ -181,7 +185,7 @@ public class MastershipManager } return eventFuture.thenAccept(this::post) - .thenApply(v -> null); + .thenApply(v -> null); } @Override @@ -196,8 +200,8 @@ public class MastershipManager public CompletableFuture relinquishMastership(DeviceId deviceId) { checkPermission(CLUSTER_WRITE); return store.relinquishRole(localNodeId, deviceId) - .thenAccept(this::post) - .thenApply(v -> null); + .thenAccept(this::post) + .thenApply(v -> null); } @Override @@ -249,15 +253,20 @@ public class MastershipManager public void balanceRoles() { List nodes = newArrayList(clusterService.getNodes()); Map> controllerDevices = new HashMap<>(); + Set orphanedDevices = Sets.newHashSet(); int deviceCount = 0; - // Create buckets reflecting current ownership. + // Create buckets reflecting current ownership; do this irrespective of + // whether the node is active. for (ControllerNode node : nodes) { + Set devicesOf = new HashSet<>(getDevicesOf(node.id())); if (clusterService.getState(node.id()).isActive()) { - Set devicesOf = new HashSet<>(getDevicesOf(node.id())); + log.info("Node {} has {} devices.", node.id(), devicesOf.size()); deviceCount += devicesOf.size(); controllerDevices.put(node, devicesOf); - log.info("Node {} has {} devices.", node.id(), devicesOf.size()); + } else if (!devicesOf.isEmpty()) { + log.warn("Inactive node {} has {} orphaned devices.", node.id(), devicesOf.size()); + orphanedDevices.addAll(getDevicesOf(node.id())); } } @@ -265,11 +274,16 @@ public class MastershipManager return; } - // Now re-balance the buckets until they are roughly even. - List> balanceBucketsFutures = balanceControllerNodes(controllerDevices, deviceCount); + List> balanceBucketsFutures = Lists.newLinkedList(); - CompletableFuture balanceRolesFuture = CompletableFuture.allOf( - balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()])); + // First re-balance the buckets until they are roughly even. + balanceControllerNodes(controllerDevices, deviceCount, balanceBucketsFutures); + + // Then attempt to distribute any orphaned devices among the buckets. + distributeOrphanedDevices(controllerDevices, orphanedDevices, balanceBucketsFutures); + + CompletableFuture balanceRolesFuture = + allOf(balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()])); Futures.getUnchecked(balanceRolesFuture); } @@ -278,32 +292,51 @@ public class MastershipManager * Balances the nodes specified in controllerDevices. * * @param controllerDevices controller nodes to devices map - * @param deviceCount number of devices mastered by controller nodes - * @return list of setRole futures for "moved" devices + * @param deviceCount number of devices mastered by controller nodes + * @param futures list of setRole futures for "moved" devices */ - private List> balanceControllerNodes( - Map> controllerDevices, int deviceCount) { + private void balanceControllerNodes(Map> controllerDevices, + int deviceCount, + List> futures) { // Now re-balance the buckets until they are roughly even. - List> balanceBucketsFutures = Lists.newLinkedList(); int rounds = controllerDevices.keySet().size(); for (int i = 0; i < rounds; i++) { // Iterate over the buckets and find the smallest and the largest. ControllerNode smallest = findBucket(true, controllerDevices); ControllerNode largest = findBucket(false, controllerDevices); - balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount)); + futures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount)); + } + } + + /** + * Uses the set of orphaned devices to even out the load among the controllers. + * + * @param controllerDevices controller nodes to devices map + * @param orphanedDevices set of orphaned devices without an active master + * @param futures list of completable future to track the progress of the balancing operation + */ + private void distributeOrphanedDevices(Map> controllerDevices, + Set orphanedDevices, + List> futures) { + // Now re-distribute the orphaned devices into buckets until they are roughly even. + while (!orphanedDevices.isEmpty()) { + // Iterate over the buckets and find the smallest bucket. + ControllerNode smallest = findBucket(true, controllerDevices); + changeMastership(smallest, controllerDevices.get(smallest), + orphanedDevices, 1, futures); } - return balanceBucketsFutures; } /** * Finds node with the minimum/maximum devices from a list of nodes. * - * @param min true: minimum, false: maximum + * @param min true: minimum, false: maximum * @param controllerDevices controller nodes to devices map * @return controller node with minimum/maximum devices */ + private ControllerNode findBucket(boolean min, - Map> controllerDevices) { + Map> controllerDevices) { int xSize = min ? Integer.MAX_VALUE : -1; ControllerNode xNode = null; for (ControllerNode node : controllerDevices.keySet()) { @@ -319,15 +352,15 @@ public class MastershipManager /** * Balance the node buckets by moving devices from largest to smallest node. * - * @param smallest node that is master of the smallest number of devices - * @param largest node that is master of the largest number of devices + * @param smallest node that is master of the smallest number of devices + * @param largest node that is master of the largest number of devices * @param controllerDevices controller nodes to devices map - * @param deviceCount number of devices mastered by controller nodes + * @param deviceCount number of devices mastered by controller nodes * @return list of setRole futures for "moved" devices */ private CompletableFuture balanceBuckets(ControllerNode smallest, ControllerNode largest, - Map> controllerDevices, - int deviceCount) { + Map> controllerDevices, + int deviceCount) { Collection minBucket = controllerDevices.get(smallest); Collection maxBucket = controllerDevices.get(largest); int bucketCount = controllerDevices.keySet().size(); @@ -340,20 +373,36 @@ public class MastershipManager if (delta > 0) { log.info("Attempting to move {} nodes from {} to {}...", delta, largest.id(), smallest.id()); - - int i = 0; - Iterator it = maxBucket.iterator(); - while (it.hasNext() && i < delta) { - DeviceId deviceId = it.next(); - log.info("Setting {} as the master for {}", smallest.id(), deviceId); - setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER)); - controllerDevices.get(smallest).add(deviceId); - it.remove(); - i++; - } + changeMastership(smallest, minBucket, maxBucket, delta, setRoleFutures); } - return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()])); + return allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()])); + } + + /** + * Changes mastership for the specified number of devices in the given source + * bucket to the specified node and ads those devices to the given target + * bucket. Also adds the futures for tracking the role reassignment progress. + * + * @param toNode target controller node + * @param toBucket target bucket + * @param fromBucket source bucket + * @param count number of devices + * @param futures futures for tracking operation progress + */ + private void changeMastership(ControllerNode toNode, Collection toBucket, + Collection fromBucket, int count, + List> futures) { + int i = 0; + Iterator it = fromBucket.iterator(); + while (it.hasNext() && i < count) { + DeviceId deviceId = it.next(); + log.info("Setting {} as the master for {}", toNode.id(), deviceId); + futures.add(setRole(toNode.id(), deviceId, MASTER)); + toBucket.add(deviceId); + it.remove(); + i++; + } } /** @@ -368,7 +417,7 @@ public class MastershipManager return false; // no balancing was done using regions. } - // handle nodes belonging to regions + // Handle nodes belonging to regions Set nodesInRegions = Sets.newHashSet(); for (Region region : regions) { Map> activeRegionControllers = @@ -376,20 +425,20 @@ public class MastershipManager nodesInRegions.addAll(activeRegionControllers.keySet()); } - // handle nodes not belonging to any region + // Handle nodes not belonging to any region Set nodesNotInRegions = Sets.difference(allControllerDevices.keySet(), nodesInRegions); if (!nodesNotInRegions.isEmpty()) { int deviceCount = 0; Map> controllerDevicesNotInRegions = new HashMap<>(); - for (ControllerNode controllerNode: nodesNotInRegions) { + for (ControllerNode controllerNode : nodesNotInRegions) { controllerDevicesNotInRegions.put(controllerNode, allControllerDevices.get(controllerNode)); deviceCount += allControllerDevices.get(controllerNode).size(); } // Now re-balance the buckets until they are roughly even. - List> balanceBucketsFutures = - balanceControllerNodes(controllerDevicesNotInRegions, deviceCount); + List> balanceBucketsFutures = Lists.newArrayList(); + balanceControllerNodes(controllerDevicesNotInRegions, deviceCount, balanceBucketsFutures); - CompletableFuture balanceRolesFuture = CompletableFuture.allOf( + CompletableFuture balanceRolesFuture = allOf( balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()])); Futures.getUnchecked(balanceRolesFuture); @@ -400,14 +449,15 @@ public class MastershipManager /** * Balances the nodes in specified region. * - * @param region region in which nodes are to be balanced + * @param region region in which nodes are to be balanced * @param allControllerDevices controller nodes to devices map * @return controller nodes that were balanced */ - private Map> balanceRolesInRegion(Region region, - Map> allControllerDevices) { + private Map> + balanceRolesInRegion(Region region, + Map> allControllerDevices) { - // retrieve all devices associated with specified region + // Retrieve all devices associated with specified region Set devicesInRegion = regionService.getRegionDevices(region.id()); log.info("Region {} has {} devices.", region.id(), devicesInRegion.size()); if (devicesInRegion.isEmpty()) { @@ -421,24 +471,22 @@ public class MastershipManager return new HashMap<>(); // for now just leave devices alone } - // get the region's preferred set of masters + // Get the region's preferred set of masters Set devicesInMasters = Sets.newHashSet(); Map> regionalControllerDevices = getRegionsPreferredMasters(region, devicesInMasters, allControllerDevices); // Now re-balance the buckets until they are roughly even. - List> balanceBucketsFutures = - balanceControllerNodes(regionalControllerDevices, devicesInMasters.size()); + List> balanceBucketsFutures = Lists.newArrayList(); + balanceControllerNodes(regionalControllerDevices, devicesInMasters.size(), balanceBucketsFutures); - // handle devices that are not currently mastered by the master node set + // Handle devices that are not currently mastered by the master node set Set devicesNotMasteredWithControllers = Sets.difference(devicesInRegion, devicesInMasters); if (!devicesNotMasteredWithControllers.isEmpty()) { // active controllers in master node set are already balanced, just // assign device mastership in sequence List sorted = new ArrayList<>(regionalControllerDevices.keySet()); - Collections.sort(sorted, (o1, o2) -> - ((Integer) (regionalControllerDevices.get(o1)).size()) - .compareTo((Integer) (regionalControllerDevices.get(o2)).size())); + Collections.sort(sorted, Comparator.comparingInt(o -> (regionalControllerDevices.get(o)).size())); int deviceIndex = 0; for (DeviceId deviceId : devicesNotMasteredWithControllers) { ControllerNode cnode = sorted.get(deviceIndex % sorted.size()); @@ -448,12 +496,12 @@ public class MastershipManager } } - CompletableFuture balanceRolesFuture = CompletableFuture.allOf( - balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()])); + CompletableFuture balanceRolesFuture = + allOf(balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()])); Futures.getUnchecked(balanceRolesFuture); - // update the map before returning + // Update the map before returning regionalControllerDevices.forEach((controllerNode, deviceIds) -> { regionalControllerDevices.put(controllerNode, new HashSet<>(getDevicesOf(controllerNode.id()))); }); @@ -465,17 +513,18 @@ public class MastershipManager * Get region's preferred set of master nodes - the first master node set that has at * least one active node. * - * @param region region for which preferred set of master nodes is requested - * @param devicesInMasters device set to track devices in preferred set of master nodes + * @param region region for which preferred set of master nodes is requested + * @param devicesInMasters device set to track devices in preferred set of master nodes * @param allControllerDevices controller nodes to devices map * @return region's preferred master nodes (and devices that use them as masters) */ - private Map> getRegionsPreferredMasters(Region region, - Set devicesInMasters, - Map> allControllerDevices) { + private Map> + getRegionsPreferredMasters(Region region, + Set devicesInMasters, + Map> allControllerDevices) { Map> regionalControllerDevices = new HashMap<>(); int listIndex = 0; - for (Set masterSet: region.masters()) { + for (Set masterSet : region.masters()) { log.info("Region {} masters set {} has {} nodes.", region.id(), listIndex, masterSet.size()); if (masterSet.isEmpty()) { // nothing on this level diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java index 0f75445b36..50fc37794f 100644 --- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java +++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java @@ -78,6 +78,9 @@ public class MastershipManagerTest { private static final DeviceId DID1 = DeviceId.deviceId("foo:d1"); private static final DeviceId DID2 = DeviceId.deviceId("foo:d2"); private static final DeviceId DID3 = DeviceId.deviceId("foo:d3"); + private static final DeviceId DID4 = DeviceId.deviceId("foo:d4"); + private static final DeviceId DID5 = DeviceId.deviceId("foo:d5"); + private static final DeviceId DID6 = DeviceId.deviceId("foo:d6"); private static final NodeId NID1 = NodeId.nodeId("n1"); private static final NodeId NID2 = NodeId.nodeId("n2"); private static final NodeId NID3 = NodeId.nodeId("n3"); @@ -219,6 +222,36 @@ public class MastershipManagerTest { assertEquals("inconsistent terms: ", 3, ts.getMastershipTerm(DEV_MASTER).termNumber()); } + @Test + public void balanceWithOrphans() { + // Setup cluster of three nodes + testClusterService.put(CNODE1, ControllerNode.State.ACTIVE); + testClusterService.put(CNODE2, ControllerNode.State.INACTIVE); + testClusterService.put(CNODE3, ControllerNode.State.ACTIVE); + + // Pre-assign some devices to each of the node + // Leave some devices as orphans assigned to a downed node + assignRoles(NID1, ImmutableSet.of(DID1, DID2, DID3, DID4)); + assignRoles(NID2, ImmutableSet.of(DID5)); + assignRoles(NID3, ImmutableSet.of(DID6)); + + // Trigger load balancing + mgr.balanceRoles(); + + // Make sure we have a balanced load + // Make sure that we no longer have any orphans + assertEquals("incorrect balance for node 1", 3, mgr.getDevicesOf(NID1).size()); + assertEquals("incorrect balance for node 2", 0, mgr.getDevicesOf(NID2).size()); + assertEquals("incorrect balance for node 3", 3, mgr.getDevicesOf(NID3).size()); + } + + private void assignRoles(NodeId nid, Set deviceIds) { + Set all = ImmutableSet.of(DID1, DID2, DID3, DID4, DID5, DID6); + for (DeviceId did : all) { + mgr.setRole(nid, did, deviceIds.contains(did) ? MASTER : STANDBY); + } + } + @Test public void balanceWithRegion1() { //set up region - 2 sets of masters with 1 node in each