diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java index 07b5b2b6bd..bd5ba3a8c4 100644 --- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java +++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java @@ -85,7 +85,7 @@ public class LeaderCommand extends AbstractShellCommand { } private void displayCandidates(Map leaderBoard, - Map> candidates) { + Map candidates) { print("--------------------------------------------------------------"); print(FMT_C, "Topic", "Leader", "Candidates"); print("--------------------------------------------------------------"); @@ -94,13 +94,13 @@ public class LeaderCommand extends AbstractShellCommand { .stream() .sorted(leadershipComparator) .forEach(l -> { - List list = candidates.get(l.topic()); + List list = candidates.get(l.topic()).candidates(); print(FMT_C, l.topic(), l.leader(), - list.remove(0).toString()); + list.get(0).toString()); // formatting hacks to get it into a table - list.forEach(n -> print(FMT_C, " ", " ", n)); + list.subList(1, list.size()).forEach(n -> print(FMT_C, " ", " ", n)); print(FMT_C, " ", " ", " "); }); print("--------------------------------------------------------------"); @@ -139,7 +139,7 @@ public class LeaderCommand extends AbstractShellCommand { print("%s", json(leaderBoard)); } else { if (showCandidates) { - Map> candidates = leaderService.getCandidates(); + Map candidates = leaderService.getCandidates(); displayCandidates(leaderBoard, candidates); } else { displayLeaders(leaderBoard); diff --git a/core/api/src/main/java/org/onosproject/cluster/Leadership.java b/core/api/src/main/java/org/onosproject/cluster/Leadership.java index a0c5e700fc..175de2de1b 100644 --- a/core/api/src/main/java/org/onosproject/cluster/Leadership.java +++ b/core/api/src/main/java/org/onosproject/cluster/Leadership.java @@ -17,6 +17,7 @@ package org.onosproject.cluster; import java.util.Objects; import java.util.List; +import java.util.Optional; import org.joda.time.DateTime; @@ -33,19 +34,21 @@ import com.google.common.collect.ImmutableList; * rest in decreasing preference order. *
  • The epoch is the logical age of a Leadership construct, and should be * used for comparing two Leaderships, but only of the same topic.
  • + *
  • The leader may be null if its accuracy can't be guaranteed. This applies + * to CANDIDATES_CHANGED events and candidate board contents.
  • * */ public class Leadership { private final String topic; - private final NodeId leader; + private final Optional leader; private final List candidates; private final long epoch; private final long electedTime; public Leadership(String topic, NodeId leader, long epoch, long electedTime) { this.topic = topic; - this.leader = leader; + this.leader = Optional.of(leader); this.candidates = ImmutableList.of(leader); this.epoch = epoch; this.electedTime = electedTime; @@ -54,7 +57,16 @@ public class Leadership { public Leadership(String topic, NodeId leader, List candidates, long epoch, long electedTime) { this.topic = topic; - this.leader = leader; + this.leader = Optional.of(leader); + this.candidates = ImmutableList.copyOf(candidates); + this.epoch = epoch; + this.electedTime = electedTime; + } + + public Leadership(String topic, List candidates, + long epoch, long electedTime) { + this.topic = topic; + this.leader = Optional.empty(); this.candidates = ImmutableList.copyOf(candidates); this.epoch = epoch; this.electedTime = electedTime; @@ -74,8 +86,9 @@ public class Leadership { * * @return leader node. */ + // This will return Optional in the future. public NodeId leader() { - return leader; + return leader.orElse(null); } /** diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java index c4f59be7fb..3456e22a6c 100644 --- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java +++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java @@ -43,14 +43,14 @@ public class LeadershipEvent extends AbstractEvent> getCandidates(); + Map getCandidates(); /** * Returns the candidates for a given topic. diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java index 02742d46b1..3c503bbfd9 100644 --- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java +++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java @@ -65,7 +65,7 @@ public class LeadershipServiceAdapter implements LeadershipService { } @Override - public Map> getCandidates() { + public Map getCandidates() { return null; } diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java index 10bf6a46bc..1d918c872f 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java @@ -576,7 +576,7 @@ public class HazelcastLeadershipService implements LeadershipService { } @Override - public Map> getCandidates() { + public Map getCandidates() { return null; } diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java index cf3700b22e..303129d226 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java @@ -12,6 +12,7 @@ import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.onlab.util.KryoNamespace; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.ControllerNode; @@ -88,6 +89,7 @@ public class DistributedLeadershipManager implements LeadershipService { private AbstractListenerRegistry listenerRegistry; private final Map leaderBoard = Maps.newConcurrentMap(); + private final Map candidateBoard = Maps.newConcurrentMap(); private NodeId localNodeId; private Set activeTopics = Sets.newConcurrentHashSet(); @@ -164,16 +166,14 @@ public class DistributedLeadershipManager implements LeadershipService { } @Override - public Map> getCandidates() { - Map> candidates = Maps.newHashMap(); - candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value())); - return ImmutableMap.copyOf(candidates); + public Map getCandidates() { + return ImmutableMap.copyOf(candidateBoard); } @Override public List getCandidates(String path) { - Versioned> candidates = candidateMap.get(path); - return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value()); + Leadership current = candidateBoard.get(path); + return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates()); } @Override @@ -207,13 +207,21 @@ public class DistributedLeadershipManager implements LeadershipService { List candidateList = Lists.newArrayList(candidates.value()); if (!candidateList.contains(localNodeId)) { candidateList.add(localNodeId); - if (!candidateMap.replace(path, candidates.version(), candidateList)) { + if (candidateMap.replace(path, candidates.version(), candidateList)) { + Versioned> newCandidates = candidateMap.get(path); + notifyCandidateAdded( + path, candidateList, newCandidates.version(), newCandidates.creationTime()); + } else { rerunForLeadership(path); return; } } } else { - if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) { + List candidateList = ImmutableList.of(localNodeId); + if ((candidateMap.putIfAbsent(path, candidateList) == null)) { + Versioned> newCandidates = candidateMap.get(path); + notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime()); + } else { rerunForLeadership(path); return; } @@ -247,10 +255,19 @@ public class DistributedLeadershipManager implements LeadershipService { if (!candidateList.remove(localNodeId)) { return; } - boolean success = candidateList.isEmpty() - ? candidateMap.remove(path, candidates.version()) - : candidateMap.replace(path, candidates.version(), candidateList); - if (!success) { + boolean success = false; + if (candidateList.isEmpty()) { + if (candidateMap.remove(path, candidates.version())) { + success = true; + } + } else { + if (candidateMap.replace(path, candidates.version(), candidateList)) { + success = true; + } + } + if (success) { + notifyCandidateRemoved(path, candidateList, candidates.version(), candidates.creationTime()); + } else { log.warn("Failed to withdraw from candidates list. Will retry"); retryWithdraw(path); } @@ -321,21 +338,63 @@ public class DistributedLeadershipManager implements LeadershipService { } } + private void notifyCandidateAdded( + String path, List candidates, long epoch, long electedTime) { + Leadership newInfo = new Leadership(path, candidates, epoch, electedTime); + final MutableBoolean updated = new MutableBoolean(false); + candidateBoard.compute(path, (k, current) -> { + if (current == null || current.epoch() < newInfo.epoch()) { + log.info("updating candidateboard with {}", newInfo); + updated.setTrue(); + return newInfo; + } + return current; + }); + // maybe rethink types of candidates events + if (updated.booleanValue()) { + LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo); + notifyPeers(event); + } + } + + private void notifyCandidateRemoved( + String path, List candidates, long epoch, long electedTime) { + Leadership newInfo = new Leadership(path, candidates, epoch, electedTime); + final MutableBoolean updated = new MutableBoolean(false); + candidateBoard.compute(path, (k, current) -> { + if (current != null && current.epoch() == newInfo.epoch()) { + log.info("updating candidateboard with {}", newInfo); + updated.setTrue(); + if (candidates.isEmpty()) { + return null; + } else { + return newInfo; + } + } + return current; + }); + // maybe rethink types of candidates events + if (updated.booleanValue()) { + LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo); + notifyPeers(event); + } + } + private void notifyNewLeader(String path, NodeId leader, List candidates, long epoch, long electedTime) { Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime); - boolean updatedLeader = false; + final MutableBoolean updatedLeader = new MutableBoolean(false); log.debug("candidates for new Leadership {}", candidates); - synchronized (leaderBoard) { - Leadership currentLeader = leaderBoard.get(path); + leaderBoard.compute(path, (k, currentLeader) -> { if (currentLeader == null || currentLeader.epoch() < epoch) { log.debug("updating leaderboard with new {}", newLeadership); - leaderBoard.put(path, newLeadership); - updatedLeader = true; + updatedLeader.setTrue(); + return newLeadership; } - } + return currentLeader; + }); - if (updatedLeader) { + if (updatedLeader.booleanValue()) { LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership); notifyPeers(event); } @@ -352,21 +411,18 @@ public class DistributedLeadershipManager implements LeadershipService { Versioned> candidates = candidateMap.get(path); Leadership oldLeadership = new Leadership( path, leader, candidates.value(), epoch, electedTime); - boolean updatedLeader = false; - synchronized (leaderBoard) { - Leadership currentLeader = leaderBoard.get(path); + final MutableBoolean updatedLeader = new MutableBoolean(false); + leaderBoard.compute(path, (k, currentLeader) -> { if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) { - leaderBoard.remove(path); - updatedLeader = true; + updatedLeader.setTrue(); + return null; } - } + return currentLeader; + }); - if (updatedLeader) { + if (updatedLeader.booleanValue()) { LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership); - eventDispatcher.post(event); - clusterCommunicator.broadcast(event, - LEADERSHIP_EVENT_MESSAGE_SUBJECT, - SERIALIZER::encode); + notifyPeers(event); } } @@ -385,31 +441,37 @@ public class DistributedLeadershipManager implements LeadershipService { LeadershipEvent.Type eventType = leadershipEvent.type(); String topic = leadershipUpdate.topic(); - boolean updateAccepted = false; - - synchronized (leaderBoard) { - Leadership currentLeadership = leaderBoard.get(topic); - if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) { + MutableBoolean updateAccepted = new MutableBoolean(false); + if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) { + leaderBoard.compute(topic, (k, currentLeadership) -> { if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) { - leaderBoard.put(topic, leadershipUpdate); - updateAccepted = true; + updateAccepted.setTrue(); + return leadershipUpdate; } - } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) { - if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) { - leaderBoard.remove(topic); - updateAccepted = true; + return currentLeadership; + }); + } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) { + leaderBoard.compute(topic, (k, currentLeadership) -> { + if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) { + updateAccepted.setTrue(); + return null; } - } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) { - if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) { - leaderBoard.replace(topic, leadershipUpdate); - updateAccepted = true; + return currentLeadership; + }); + } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) { + candidateBoard.compute(topic, (k, currentInfo) -> { + if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) { + updateAccepted.setTrue(); + return leadershipUpdate; } - } else { - throw new IllegalStateException("Unknown event type."); - } - if (updateAccepted) { - eventDispatcher.post(leadershipEvent); - } + return currentInfo; + }); + } else { + throw new IllegalStateException("Unknown event type."); + } + + if (updateAccepted.booleanValue()) { + eventDispatcher.post(leadershipEvent); } } } @@ -470,6 +532,12 @@ public class DistributedLeadershipManager implements LeadershipService { SERIALIZER::encode); } }); + candidateBoard.forEach((path, leadership) -> { + LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership); + clusterCommunicator.broadcast(event, + LEADERSHIP_EVENT_MESSAGE_SUBJECT, + SERIALIZER::encode); + }); } catch (Exception e) { log.debug("Failed to send leadership updates", e); } diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java index 0cf0625ec7..97e9f2463a 100644 --- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java +++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java @@ -111,7 +111,7 @@ public class SimpleLeadershipManager implements LeadershipService { } @Override - public Map> getCandidates() { + public Map getCandidates() { return null; }