diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java index 85834a70f0..763decad40 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java @@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -89,7 +90,7 @@ public class DistributedLeadershipManager implements LeadershipService { private ScheduledExecutorService electionRunner; private ScheduledExecutorService lockExecutor; private ScheduledExecutorService staleLeadershipPurgeExecutor; - private ScheduledExecutorService leadershipStatusBroadcaster; + private ScheduledExecutorService leadershipRefresher; private ConsistentMap leaderMap; private ConsistentMap> candidateMap; @@ -106,7 +107,7 @@ public class DistributedLeadershipManager implements LeadershipService { // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS) private static final int WAIT_BEFORE_RETRY_MILLIS = 150; private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; - private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2; + private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2; private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2; private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false); @@ -135,8 +136,8 @@ public class DistributedLeadershipManager implements LeadershipService { 4, groupedThreads("onos/store/leadership", "election-thread-%d")); staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor( groupedThreads("onos/store/leadership", "stale-leadership-evictor")); - leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor( - groupedThreads("onos/store/leadership", "peer-updater")); + leadershipRefresher = Executors.newSingleThreadScheduledExecutor( + groupedThreads("onos/store/leadership", "refresh-thread")); clusterCommunicator.addSubscriber( LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::decode, @@ -148,8 +149,8 @@ public class DistributedLeadershipManager implements LeadershipService { electionRunner.scheduleWithFixedDelay( this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS); - leadershipStatusBroadcaster.scheduleWithFixedDelay( - this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS); + leadershipRefresher.scheduleWithFixedDelay( + this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS); listenerRegistry = new ListenerRegistry<>(); eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); @@ -173,7 +174,7 @@ public class DistributedLeadershipManager implements LeadershipService { messageHandlingExecutor.shutdown(); lockExecutor.shutdown(); staleLeadershipPurgeExecutor.shutdown(); - leadershipStatusBroadcaster.shutdown(); + leadershipRefresher.shutdown(); log.info("Stopped"); } @@ -458,6 +459,7 @@ public class DistributedLeadershipManager implements LeadershipService { leaderBoard.compute(topic, (k, currentLeadership) -> { if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) { updateAccepted.set(true); + // FIXME: Removing entries from leaderboard is not safe and should be visited. return null; } return currentLeadership; @@ -579,18 +581,47 @@ public class DistributedLeadershipManager implements LeadershipService { } } - private void sendLeadershipStatus() { + private void refreshLeaderBoard() { try { - leaderBoard.forEach((path, leadership) -> { - if (leadership.leader().equals(localNodeId)) { - LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership); - clusterCommunicator.broadcast(event, - LEADERSHIP_EVENT_MESSAGE_SUBJECT, - SERIALIZER::encode); + Map newLeaderBoard = Maps.newHashMap(); + leaderMap.entrySet().forEach(entry -> { + String path = entry.getKey(); + Versioned leader = entry.getValue(); + Leadership leadership = new Leadership(path, + leader.value(), + leader.version(), + leader.creationTime()); + newLeaderBoard.put(path, leadership); + }); + + // first take snapshot of current leader board. + Map currentLeaderBoard = ImmutableMap.copyOf(leaderBoard); + + MapDifference diff = Maps.difference(currentLeaderBoard, newLeaderBoard); + + // evict stale leaders + diff.entriesOnlyOnLeft().forEach((path, leadership) -> { + log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership); + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership)); + }); + + // add missing leaders + diff.entriesOnlyOnRight().forEach((path, leadership) -> { + log.debug("Adding {} to leaderboard. It is now the active leader.", leadership); + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership)); + }); + + // add updated leaders + diff.entriesDiffering().forEach((path, difference) -> { + Leadership current = difference.leftValue(); + Leadership updated = difference.rightValue(); + if (current.epoch() < updated.epoch()) { + log.debug("Updated {} in leaderboard.", updated); + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated)); } }); } catch (Exception e) { - log.debug("Failed to send leadership updates", e); + log.debug("Failed to refresh leader board", e); } }