diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java index 0f47c089e2..dec602b801 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java @@ -15,29 +15,31 @@ */ package org.onosproject.store.flow.impl; -import com.google.common.collect.ImmutableList; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; -import org.onosproject.cluster.NodeId; -import org.onosproject.cluster.RoleInfo; -import org.onosproject.event.EventDeliveryService; -import org.onosproject.event.ListenerRegistry; -import org.onosproject.mastership.MastershipEvent; -import org.onosproject.mastership.MastershipListener; -import org.onosproject.mastership.MastershipService; +import org.onosproject.cluster.Leadership; +import org.onosproject.core.VersionService; +import org.onosproject.event.AbstractListenerManager; +import org.onosproject.event.Change; import org.onosproject.net.DeviceId; import org.onosproject.store.flow.ReplicaInfo; import org.onosproject.store.flow.ReplicaInfoEvent; import org.onosproject.store.flow.ReplicaInfoEventListener; import org.onosproject.store.flow.ReplicaInfoService; +import org.onosproject.store.service.CoordinationService; +import org.onosproject.store.service.LeaderElector; import org.slf4j.Logger; -import java.util.Collections; -import java.util.List; import static com.google.common.base.Preconditions.checkNotNull; import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED; import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED; @@ -48,38 +50,66 @@ import static org.slf4j.LoggerFactory.getLogger; */ @Component(immediate = true) @Service -public class ReplicaInfoManager implements ReplicaInfoService { +public class ReplicaInfoManager + extends AbstractListenerManager + implements ReplicaInfoService { + + private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = Pattern.compile("device:([^|]+)\\|[^|]+"); private final Logger log = getLogger(getClass()); - private final MastershipListener mastershipListener = new InternalMastershipListener(); + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected CoordinationService coordinationService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected EventDeliveryService eventDispatcher; + protected VersionService versionService; - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipService mastershipService; + private final Consumer> leadershipChangeListener = change -> { + Leadership oldLeadership = change.oldValue(); + Leadership newLeadership = change.newValue(); - protected final ListenerRegistry - listenerRegistry = new ListenerRegistry<>(); + String topic = newLeadership.topic(); + if (!isDeviceMastershipTopic(topic)) { + return; + } + + DeviceId deviceId = extractDeviceIdFromTopic(topic); + ReplicaInfo replicaInfo = buildFromLeadership(newLeadership); + + boolean leaderChanged = !Objects.equals(oldLeadership.leader(), newLeadership.leader()); + boolean candidatesChanged = !Objects.equals(oldLeadership.candidates(), newLeadership.candidates()); + + if (leaderChanged) { + post(new ReplicaInfoEvent(MASTER_CHANGED, deviceId, replicaInfo)); + } + if (candidatesChanged) { + post(new ReplicaInfoEvent(BACKUPS_CHANGED, deviceId, replicaInfo)); + } + }; + + private LeaderElector leaderElector; @Activate public void activate() { eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry); - mastershipService.addListener(mastershipListener); + leaderElector = coordinationService.leaderElectorBuilder() + .withName("onos-leadership-elections") + .build() + .asLeaderElector(); + leaderElector.addChangeListener(leadershipChangeListener); log.info("Started"); } @Deactivate public void deactivate() { eventDispatcher.removeSink(ReplicaInfoEvent.class); - mastershipService.removeListener(mastershipListener); + leaderElector.removeChangeListener(leadershipChangeListener); log.info("Stopped"); } @Override public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) { - return buildFromRoleInfo(mastershipService.getNodesFor(deviceId)); + return buildFromLeadership(leaderElector.getLeadership(createDeviceMastershipTopic(deviceId))); } @Override @@ -92,32 +122,27 @@ public class ReplicaInfoManager implements ReplicaInfoService { listenerRegistry.removeListener(checkNotNull(listener)); } - private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) { - List backups = roles.backups() == null ? - Collections.emptyList() : ImmutableList.copyOf(roles.backups()); - return new ReplicaInfo(roles.master(), backups); + String createDeviceMastershipTopic(DeviceId deviceId) { + return String.format("device:%s|%s", deviceId.toString(), versionService.version()); } - final class InternalMastershipListener implements MastershipListener { - - @Override - public void event(MastershipEvent event) { - final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo()); - switch (event.type()) { - case MASTER_CHANGED: - eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED, - event.subject(), - replicaInfo)); - break; - case BACKUPS_CHANGED: - eventDispatcher.post(new ReplicaInfoEvent(BACKUPS_CHANGED, - event.subject(), - replicaInfo)); - break; - default: - break; - } + DeviceId extractDeviceIdFromTopic(String topic) { + Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic); + if (m.matches()) { + return DeviceId.deviceId(m.group(1)); + } else { + throw new IllegalArgumentException("Invalid device mastership topic: " + topic); } } + boolean isDeviceMastershipTopic(String topic) { + Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic); + return m.matches(); + } + + static ReplicaInfo buildFromLeadership(Leadership leadership) { + return new ReplicaInfo(leadership.leaderNodeId(), leadership.candidates().stream() + .filter(nodeId -> !Objects.equals(nodeId, leadership.leaderNodeId())) + .collect(Collectors.toList())); + } } diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java index dd822df130..b342fa99ec 100644 --- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java +++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java @@ -15,153 +15,197 @@ */ package org.onosproject.store.flow.impl; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.function.Consumer; + +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.onosproject.cluster.Leader; +import org.onosproject.cluster.Leadership; import org.onosproject.cluster.NodeId; -import org.onosproject.cluster.RoleInfo; import org.onosproject.common.event.impl.TestEventDispatcher; -import org.onosproject.event.ListenerRegistry; -import org.onosproject.mastership.MastershipEvent; -import org.onosproject.mastership.MastershipEvent.Type; -import org.onosproject.mastership.MastershipListener; -import org.onosproject.mastership.MastershipService; -import org.onosproject.mastership.MastershipServiceAdapter; +import org.onosproject.core.Version; +import org.onosproject.event.Change; import org.onosproject.net.DeviceId; -import org.onosproject.store.flow.ReplicaInfo; import org.onosproject.store.flow.ReplicaInfoEvent; -import org.onosproject.store.flow.ReplicaInfoEventListener; -import org.onosproject.store.flow.ReplicaInfoService; - -import java.util.Collections; -import java.util.LinkedList; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.onosproject.store.service.AsyncLeaderElector; +import org.onosproject.store.service.CoordinationService; +import org.onosproject.store.service.LeaderElector; +import org.onosproject.store.service.LeaderElectorBuilder; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ReplicaInfoManagerTest { - private static final DeviceId DID1 = DeviceId.deviceId("of:1"); private static final DeviceId DID2 = DeviceId.deviceId("of:2"); private static final NodeId NID1 = new NodeId("foo"); + private static final NodeId NID2 = new NodeId("bar"); - private ReplicaInfoManager mgr; - private ReplicaInfoService service; - - private ListenerRegistry - mastershipListenerRegistry; - private TestEventDispatcher eventDispatcher; - + private TestLeaderElector leaderElector; + private ReplicaInfoManager manager; @Before public void setUp() throws Exception { - mastershipListenerRegistry = new ListenerRegistry<>(); + leaderElector = new TestLeaderElector(); + manager = new TestReplicaInfoManager(); + manager.versionService = () -> Version.version("1.0.0"); + CoordinationService coordinationService = mock(CoordinationService.class); + AsyncLeaderElector leaderElector = mock(AsyncLeaderElector.class); + expect(leaderElector.asLeaderElector()).andReturn(this.leaderElector).anyTimes(); + expect(coordinationService.leaderElectorBuilder()).andReturn(new LeaderElectorBuilder() { + @Override + public AsyncLeaderElector build() { + return leaderElector; + } + }).anyTimes(); + replay(coordinationService, leaderElector); + manager.coordinationService = coordinationService; - mgr = new ReplicaInfoManager(); - service = mgr; - - eventDispatcher = new TestEventDispatcher(); - mgr.eventDispatcher = eventDispatcher; - mgr.mastershipService = new TestMastershipService(); - - // register dummy mastership event source - mgr.eventDispatcher.addSink(MastershipEvent.class, mastershipListenerRegistry); - - mgr.activate(); + manager.activate(); } @After public void tearDown() throws Exception { - mgr.deactivate(); + manager.deactivate(); } @Test - public void testGetReplicaInfoFor() { - ReplicaInfo info1 = service.getReplicaInfoFor(DID1); - assertEquals(Optional.of(NID1), info1.master()); - // backups are always empty for now - assertEquals(Collections.emptyList(), info1.backups()); - - ReplicaInfo info2 = service.getReplicaInfoFor(DID2); - assertEquals("There's no master", Optional.empty(), info2.master()); - // backups are always empty for now - assertEquals(Collections.emptyList(), info2.backups()); + public void testMastershipTopics() throws Exception { + assertEquals("device:of:1|1.0.0", manager.createDeviceMastershipTopic(DID1)); + assertEquals(DID1, manager.extractDeviceIdFromTopic("device:of:1|1.0.0")); + assertTrue(manager.isDeviceMastershipTopic("device:of:1|1.0.0")); + assertFalse(manager.isDeviceMastershipTopic("foo:bar|1.0.0")); + assertFalse(manager.isDeviceMastershipTopic("foo:bar|baz")); + assertFalse(manager.isDeviceMastershipTopic("foobarbaz|1.0.0")); + assertFalse(manager.isDeviceMastershipTopic("foobarbaz")); } @Test - public void testReplicaInfoEvent() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - service.addListener(new MasterNodeCheck(latch, DID1, NID1)); + public void testReplicaEvents() throws Exception { + Queue events = new ArrayBlockingQueue<>(2); + manager.addListener(events::add); - // fake MastershipEvent - eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1, - new RoleInfo(NID1, new LinkedList<>()))); + Leadership oldLeadership = new Leadership( + manager.createDeviceMastershipTopic(DID1), + new Leader(NID1, 1, 1), + Lists.newArrayList(NID1)); + Leadership newLeadership = new Leadership( + manager.createDeviceMastershipTopic(DID1), + new Leader(NID2, 2, 1), + Lists.newArrayList(NID2, NID1)); - assertTrue(latch.await(1, TimeUnit.SECONDS)); + leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership); + leaderElector.post(new Change<>(oldLeadership, newLeadership)); + + ReplicaInfoEvent event = events.remove(); + assertEquals(ReplicaInfoEvent.Type.MASTER_CHANGED, event.type()); + assertEquals(NID2, event.replicaInfo().master().get()); + assertEquals(1, event.replicaInfo().backups().size()); + + event = events.remove(); + assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type()); + assertEquals(NID2, event.replicaInfo().master().get()); + assertEquals(1, event.replicaInfo().backups().size()); + + assertEquals(NID2, manager.getReplicaInfoFor(DID1).master().get()); + assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size()); + + oldLeadership = new Leadership( + manager.createDeviceMastershipTopic(DID1), + new Leader(NID1, 1, 1), + Lists.newArrayList(NID1)); + newLeadership = new Leadership( + manager.createDeviceMastershipTopic(DID1), + new Leader(NID1, 1, 1), + Lists.newArrayList(NID1, NID2)); + + leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership); + leaderElector.post(new Change<>(oldLeadership, newLeadership)); + + event = events.remove(); + assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type()); + assertEquals(NID1, event.replicaInfo().master().get()); + assertEquals(1, event.replicaInfo().backups().size()); + + assertEquals(NID1, manager.getReplicaInfoFor(DID1).master().get()); + assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size()); } - - private final class MasterNodeCheck implements ReplicaInfoEventListener { - private final CountDownLatch latch; - private Optional expectedMaster; - private DeviceId expectedDevice; - - - MasterNodeCheck(CountDownLatch latch, DeviceId did, - NodeId nid) { - this.latch = latch; - this.expectedMaster = Optional.ofNullable(nid); - this.expectedDevice = did; - } - - @Override - public void event(ReplicaInfoEvent event) { - assertEquals(expectedDevice, event.subject()); - assertEquals(expectedMaster, event.replicaInfo().master()); - // backups are always empty for now - assertEquals(Collections.emptyList(), event.replicaInfo().backups()); - latch.countDown(); + private class TestReplicaInfoManager extends ReplicaInfoManager { + TestReplicaInfoManager() { + eventDispatcher = new TestEventDispatcher(); } } + private class TestLeaderElector implements LeaderElector { + private final Map leaderships = Maps.newConcurrentMap(); + private final Set>> listeners = Sets.newConcurrentHashSet(); - private final class TestMastershipService - extends MastershipServiceAdapter - implements MastershipService { - - private Map masters; - - TestMastershipService() { - masters = Maps.newHashMap(); - masters.put(DID1, NID1); - // DID2 has no master + @Override + public String name() { + return null; } @Override - public NodeId getMasterFor(DeviceId deviceId) { - return masters.get(deviceId); + public Leadership run(String topic, NodeId nodeId) { + return null; } @Override - public RoleInfo getNodesFor(DeviceId deviceId) { - return new RoleInfo(masters.get(deviceId), Collections.emptyList()); + public void withdraw(String topic) { + } @Override - public void addListener(MastershipListener listener) { - mastershipListenerRegistry.addListener(listener); + public boolean anoint(String topic, NodeId nodeId) { + return false; } @Override - public void removeListener(MastershipListener listener) { - mastershipListenerRegistry.removeListener(listener); + public boolean promote(String topic, NodeId nodeId) { + return false; + } + + @Override + public void evict(NodeId nodeId) { + + } + + @Override + public Leadership getLeadership(String topic) { + return leaderships.get(topic); + } + + @Override + public Map getLeaderships() { + return leaderships; + } + + @Override + public void addChangeListener(Consumer> consumer) { + listeners.add(consumer); + } + + @Override + public void removeChangeListener(Consumer> consumer) { + listeners.remove(consumer); + } + + void post(Change change) { + listeners.forEach(l -> l.accept(change)); } } - }