diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java index 746142eb53..c3a39771a5 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java @@ -202,6 +202,7 @@ public class DistributedLeadershipStore leaderElector = storageService.leaderElectorBuilder() .withName("onos-leadership-elections") .withElectionTimeout(electionTimeoutMillis) + .withRelaxedReadConsistency() .build() .asLeaderElector(); } diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java index dec602b801..79c9147c78 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java @@ -15,31 +15,29 @@ */ package org.onosproject.store.flow.impl; -import java.util.Objects; -import java.util.function.Consumer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - +import com.google.common.collect.ImmutableList; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; -import org.onosproject.cluster.Leadership; -import org.onosproject.core.VersionService; -import org.onosproject.event.AbstractListenerManager; -import org.onosproject.event.Change; +import org.onosproject.cluster.NodeId; +import org.onosproject.cluster.RoleInfo; +import org.onosproject.event.EventDeliveryService; +import org.onosproject.event.ListenerRegistry; +import org.onosproject.mastership.MastershipEvent; +import org.onosproject.mastership.MastershipListener; +import org.onosproject.mastership.MastershipService; import org.onosproject.net.DeviceId; import org.onosproject.store.flow.ReplicaInfo; import org.onosproject.store.flow.ReplicaInfoEvent; import org.onosproject.store.flow.ReplicaInfoEventListener; import org.onosproject.store.flow.ReplicaInfoService; -import org.onosproject.store.service.CoordinationService; -import org.onosproject.store.service.LeaderElector; import org.slf4j.Logger; +import java.util.Collections; +import java.util.List; import static com.google.common.base.Preconditions.checkNotNull; import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED; import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED; @@ -50,66 +48,38 @@ import static org.slf4j.LoggerFactory.getLogger; */ @Component(immediate = true) @Service -public class ReplicaInfoManager - extends AbstractListenerManager - implements ReplicaInfoService { - - private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = Pattern.compile("device:([^|]+)\\|[^|]+"); +public class ReplicaInfoManager implements ReplicaInfoService { private final Logger log = getLogger(getClass()); - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected CoordinationService coordinationService; + private final MastershipListener mastershipListener = new InternalMastershipListener(); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected VersionService versionService; + protected EventDeliveryService eventDispatcher; - private final Consumer> leadershipChangeListener = change -> { - Leadership oldLeadership = change.oldValue(); - Leadership newLeadership = change.newValue(); + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; - String topic = newLeadership.topic(); - if (!isDeviceMastershipTopic(topic)) { - return; - } - - DeviceId deviceId = extractDeviceIdFromTopic(topic); - ReplicaInfo replicaInfo = buildFromLeadership(newLeadership); - - boolean leaderChanged = !Objects.equals(oldLeadership.leader(), newLeadership.leader()); - boolean candidatesChanged = !Objects.equals(oldLeadership.candidates(), newLeadership.candidates()); - - if (leaderChanged) { - post(new ReplicaInfoEvent(MASTER_CHANGED, deviceId, replicaInfo)); - } - if (candidatesChanged) { - post(new ReplicaInfoEvent(BACKUPS_CHANGED, deviceId, replicaInfo)); - } - }; - - private LeaderElector leaderElector; + protected final ListenerRegistry + listenerRegistry = new ListenerRegistry<>(); @Activate public void activate() { eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry); - leaderElector = coordinationService.leaderElectorBuilder() - .withName("onos-leadership-elections") - .build() - .asLeaderElector(); - leaderElector.addChangeListener(leadershipChangeListener); + mastershipService.addListener(mastershipListener); log.info("Started"); } @Deactivate public void deactivate() { eventDispatcher.removeSink(ReplicaInfoEvent.class); - leaderElector.removeChangeListener(leadershipChangeListener); + mastershipService.removeListener(mastershipListener); log.info("Stopped"); } @Override public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) { - return buildFromLeadership(leaderElector.getLeadership(createDeviceMastershipTopic(deviceId))); + return buildFromRoleInfo(mastershipService.getNodesFor(deviceId)); } @Override @@ -122,27 +92,32 @@ public class ReplicaInfoManager listenerRegistry.removeListener(checkNotNull(listener)); } - String createDeviceMastershipTopic(DeviceId deviceId) { - return String.format("device:%s|%s", deviceId.toString(), versionService.version()); + private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) { + List backups = roles.backups() == null ? + Collections.emptyList() : ImmutableList.copyOf(roles.backups()); + return new ReplicaInfo(roles.master(), backups); } - DeviceId extractDeviceIdFromTopic(String topic) { - Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic); - if (m.matches()) { - return DeviceId.deviceId(m.group(1)); - } else { - throw new IllegalArgumentException("Invalid device mastership topic: " + topic); + final class InternalMastershipListener implements MastershipListener { + + @Override + public void event(MastershipEvent event) { + final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo()); + switch (event.type()) { + case MASTER_CHANGED: + eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED, + event.subject(), + replicaInfo)); + break; + case BACKUPS_CHANGED: + eventDispatcher.post(new ReplicaInfoEvent(BACKUPS_CHANGED, + event.subject(), + replicaInfo)); + break; + default: + break; + } } } - boolean isDeviceMastershipTopic(String topic) { - Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic); - return m.matches(); - } - - static ReplicaInfo buildFromLeadership(Leadership leadership) { - return new ReplicaInfo(leadership.leaderNodeId(), leadership.candidates().stream() - .filter(nodeId -> !Objects.equals(nodeId, leadership.leaderNodeId())) - .collect(Collectors.toList())); - } } diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java index b342fa99ec..9709643354 100644 --- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java +++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java @@ -15,197 +15,153 @@ */ package org.onosproject.store.flow.impl; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.function.Consumer; - -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.onosproject.cluster.Leader; -import org.onosproject.cluster.Leadership; import org.onosproject.cluster.NodeId; +import org.onosproject.cluster.RoleInfo; import org.onosproject.common.event.impl.TestEventDispatcher; -import org.onosproject.core.Version; -import org.onosproject.event.Change; +import org.onosproject.event.ListenerRegistry; +import org.onosproject.mastership.MastershipEvent; +import org.onosproject.mastership.MastershipEvent.Type; +import org.onosproject.mastership.MastershipListener; +import org.onosproject.mastership.MastershipService; +import org.onosproject.mastership.MastershipServiceAdapter; import org.onosproject.net.DeviceId; +import org.onosproject.store.flow.ReplicaInfo; import org.onosproject.store.flow.ReplicaInfoEvent; -import org.onosproject.store.service.AsyncLeaderElector; -import org.onosproject.store.service.CoordinationService; -import org.onosproject.store.service.LeaderElector; -import org.onosproject.store.service.LeaderElectorBuilder; +import org.onosproject.store.flow.ReplicaInfoEventListener; +import org.onosproject.store.flow.ReplicaInfoService; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ReplicaInfoManagerTest { + private static final DeviceId DID1 = DeviceId.deviceId("of:1"); private static final DeviceId DID2 = DeviceId.deviceId("of:2"); private static final NodeId NID1 = new NodeId("foo"); - private static final NodeId NID2 = new NodeId("bar"); - private TestLeaderElector leaderElector; - private ReplicaInfoManager manager; + private ReplicaInfoManager mgr; + private ReplicaInfoService service; + + private ListenerRegistry + mastershipListenerRegistry; + private TestEventDispatcher eventDispatcher; + @Before public void setUp() throws Exception { - leaderElector = new TestLeaderElector(); - manager = new TestReplicaInfoManager(); - manager.versionService = () -> Version.version("1.0.0"); - CoordinationService coordinationService = mock(CoordinationService.class); - AsyncLeaderElector leaderElector = mock(AsyncLeaderElector.class); - expect(leaderElector.asLeaderElector()).andReturn(this.leaderElector).anyTimes(); - expect(coordinationService.leaderElectorBuilder()).andReturn(new LeaderElectorBuilder() { - @Override - public AsyncLeaderElector build() { - return leaderElector; - } - }).anyTimes(); - replay(coordinationService, leaderElector); - manager.coordinationService = coordinationService; + mastershipListenerRegistry = new ListenerRegistry<>(); - manager.activate(); + mgr = new ReplicaInfoManager(); + service = mgr; + + eventDispatcher = new TestEventDispatcher(); + mgr.eventDispatcher = eventDispatcher; + mgr.mastershipService = new TestMastershipService(); + + // register dummy mastership event source + mgr.eventDispatcher.addSink(MastershipEvent.class, mastershipListenerRegistry); + + mgr.activate(); } @After public void tearDown() throws Exception { - manager.deactivate(); + mgr.deactivate(); } @Test - public void testMastershipTopics() throws Exception { - assertEquals("device:of:1|1.0.0", manager.createDeviceMastershipTopic(DID1)); - assertEquals(DID1, manager.extractDeviceIdFromTopic("device:of:1|1.0.0")); - assertTrue(manager.isDeviceMastershipTopic("device:of:1|1.0.0")); - assertFalse(manager.isDeviceMastershipTopic("foo:bar|1.0.0")); - assertFalse(manager.isDeviceMastershipTopic("foo:bar|baz")); - assertFalse(manager.isDeviceMastershipTopic("foobarbaz|1.0.0")); - assertFalse(manager.isDeviceMastershipTopic("foobarbaz")); + public void testGetReplicaInfoFor() { + ReplicaInfo info1 = service.getReplicaInfoFor(DID1); + assertEquals(Optional.of(NID1), info1.master()); + // backups are always empty for now + assertEquals(Collections.emptyList(), info1.backups()); + + ReplicaInfo info2 = service.getReplicaInfoFor(DID2); + assertEquals("There's no master", Optional.empty(), info2.master()); + // backups are always empty for now + assertEquals(Collections.emptyList(), info2.backups()); } @Test - public void testReplicaEvents() throws Exception { - Queue events = new ArrayBlockingQueue<>(2); - manager.addListener(events::add); + public void testReplicaInfoEvent() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + service.addListener(new MasterNodeCheck(latch, DID1, NID1)); - Leadership oldLeadership = new Leadership( - manager.createDeviceMastershipTopic(DID1), - new Leader(NID1, 1, 1), - Lists.newArrayList(NID1)); - Leadership newLeadership = new Leadership( - manager.createDeviceMastershipTopic(DID1), - new Leader(NID2, 2, 1), - Lists.newArrayList(NID2, NID1)); + // fake MastershipEvent + eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1, + new RoleInfo(NID1, new LinkedList<>()))); - leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership); - leaderElector.post(new Change<>(oldLeadership, newLeadership)); - - ReplicaInfoEvent event = events.remove(); - assertEquals(ReplicaInfoEvent.Type.MASTER_CHANGED, event.type()); - assertEquals(NID2, event.replicaInfo().master().get()); - assertEquals(1, event.replicaInfo().backups().size()); - - event = events.remove(); - assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type()); - assertEquals(NID2, event.replicaInfo().master().get()); - assertEquals(1, event.replicaInfo().backups().size()); - - assertEquals(NID2, manager.getReplicaInfoFor(DID1).master().get()); - assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size()); - - oldLeadership = new Leadership( - manager.createDeviceMastershipTopic(DID1), - new Leader(NID1, 1, 1), - Lists.newArrayList(NID1)); - newLeadership = new Leadership( - manager.createDeviceMastershipTopic(DID1), - new Leader(NID1, 1, 1), - Lists.newArrayList(NID1, NID2)); - - leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership); - leaderElector.post(new Change<>(oldLeadership, newLeadership)); - - event = events.remove(); - assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type()); - assertEquals(NID1, event.replicaInfo().master().get()); - assertEquals(1, event.replicaInfo().backups().size()); - - assertEquals(NID1, manager.getReplicaInfoFor(DID1).master().get()); - assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size()); + assertTrue(latch.await(1, TimeUnit.SECONDS)); } - private class TestReplicaInfoManager extends ReplicaInfoManager { - TestReplicaInfoManager() { - eventDispatcher = new TestEventDispatcher(); + + private final class MasterNodeCheck implements ReplicaInfoEventListener { + private final CountDownLatch latch; + private Optional expectedMaster; + private DeviceId expectedDevice; + + + MasterNodeCheck(CountDownLatch latch, DeviceId did, + NodeId nid) { + this.latch = latch; + this.expectedMaster = Optional.ofNullable(nid); + this.expectedDevice = did; + } + + @Override + public void event(ReplicaInfoEvent event) { + assertEquals(expectedDevice, event.subject()); + assertEquals(expectedMaster, event.replicaInfo().master()); + // backups are always empty for now + assertEquals(Collections.emptyList(), event.replicaInfo().backups()); + latch.countDown(); } } - private class TestLeaderElector implements LeaderElector { - private final Map leaderships = Maps.newConcurrentMap(); - private final Set>> listeners = Sets.newConcurrentHashSet(); - @Override - public String name() { - return null; + private final class TestMastershipService + extends MastershipServiceAdapter + implements MastershipService { + + private Map masters; + + TestMastershipService() { + masters = Maps.newHashMap(); + masters.put(DID1, NID1); + // DID2 has no master } @Override - public Leadership run(String topic, NodeId nodeId) { - return null; + public NodeId getMasterFor(DeviceId deviceId) { + return masters.get(deviceId); } @Override - public void withdraw(String topic) { - + public RoleInfo getNodesFor(DeviceId deviceId) { + return new RoleInfo(masters.get(deviceId), Collections.emptyList()); } @Override - public boolean anoint(String topic, NodeId nodeId) { - return false; + public void addListener(MastershipListener listener) { + mastershipListenerRegistry.addListener(listener); } @Override - public boolean promote(String topic, NodeId nodeId) { - return false; - } - - @Override - public void evict(NodeId nodeId) { - - } - - @Override - public Leadership getLeadership(String topic) { - return leaderships.get(topic); - } - - @Override - public Map getLeaderships() { - return leaderships; - } - - @Override - public void addChangeListener(Consumer> consumer) { - listeners.add(consumer); - } - - @Override - public void removeChangeListener(Consumer> consumer) { - listeners.remove(consumer); - } - - void post(Change change) { - listeners.forEach(l -> l.accept(change)); + public void removeListener(MastershipListener listener) { + mastershipListenerRegistry.removeListener(listener); } } + }