Ensure mastership elections are cached in flow rule store

Change-Id: Iff1b1d743a38310e76a5c87605480e9faa5eab2b
(cherry picked from commit d334841f7067def3bad1c618a27a75d28d6df004)
This commit is contained in:
Jordan Halterman 2018-05-15 00:52:31 -07:00 committed by Ray Milkey
parent a49c60ace7
commit e74e629406
3 changed files with 139 additions and 207 deletions

View File

@ -202,6 +202,7 @@ public class DistributedLeadershipStore
leaderElector = storageService.leaderElectorBuilder() leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections") .withName("onos-leadership-elections")
.withElectionTimeout(electionTimeoutMillis) .withElectionTimeout(electionTimeoutMillis)
.withRelaxedReadConsistency()
.build() .build()
.asLeaderElector(); .asLeaderElector();
} }

View File

@ -15,31 +15,29 @@
*/ */
package org.onosproject.store.flow.impl; package org.onosproject.store.flow.impl;
import java.util.Objects; import com.google.common.collect.ImmutableList;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service; import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.Leadership; import org.onosproject.cluster.NodeId;
import org.onosproject.core.VersionService; import org.onosproject.cluster.RoleInfo;
import org.onosproject.event.AbstractListenerManager; import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.Change; import org.onosproject.event.ListenerRegistry;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId; import org.onosproject.net.DeviceId;
import org.onosproject.store.flow.ReplicaInfo; import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent; import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener; import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService; import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.service.CoordinationService;
import org.onosproject.store.service.LeaderElector;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED; import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED; import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@ -50,66 +48,38 @@ import static org.slf4j.LoggerFactory.getLogger;
*/ */
@Component(immediate = true) @Component(immediate = true)
@Service @Service
public class ReplicaInfoManager public class ReplicaInfoManager implements ReplicaInfoService {
extends AbstractListenerManager<ReplicaInfoEvent, ReplicaInfoEventListener>
implements ReplicaInfoService {
private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = Pattern.compile("device:([^|]+)\\|[^|]+");
private final Logger log = getLogger(getClass()); private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) private final MastershipListener mastershipListener = new InternalMastershipListener();
protected CoordinationService coordinationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VersionService versionService; protected EventDeliveryService eventDispatcher;
private final Consumer<Change<Leadership>> leadershipChangeListener = change -> { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Leadership oldLeadership = change.oldValue(); protected MastershipService mastershipService;
Leadership newLeadership = change.newValue();
String topic = newLeadership.topic(); protected final ListenerRegistry<ReplicaInfoEvent, ReplicaInfoEventListener>
if (!isDeviceMastershipTopic(topic)) { listenerRegistry = new ListenerRegistry<>();
return;
}
DeviceId deviceId = extractDeviceIdFromTopic(topic);
ReplicaInfo replicaInfo = buildFromLeadership(newLeadership);
boolean leaderChanged = !Objects.equals(oldLeadership.leader(), newLeadership.leader());
boolean candidatesChanged = !Objects.equals(oldLeadership.candidates(), newLeadership.candidates());
if (leaderChanged) {
post(new ReplicaInfoEvent(MASTER_CHANGED, deviceId, replicaInfo));
}
if (candidatesChanged) {
post(new ReplicaInfoEvent(BACKUPS_CHANGED, deviceId, replicaInfo));
}
};
private LeaderElector leaderElector;
@Activate @Activate
public void activate() { public void activate() {
eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry); eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry);
leaderElector = coordinationService.leaderElectorBuilder() mastershipService.addListener(mastershipListener);
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
log.info("Started"); log.info("Started");
} }
@Deactivate @Deactivate
public void deactivate() { public void deactivate() {
eventDispatcher.removeSink(ReplicaInfoEvent.class); eventDispatcher.removeSink(ReplicaInfoEvent.class);
leaderElector.removeChangeListener(leadershipChangeListener); mastershipService.removeListener(mastershipListener);
log.info("Stopped"); log.info("Stopped");
} }
@Override @Override
public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) { public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
return buildFromLeadership(leaderElector.getLeadership(createDeviceMastershipTopic(deviceId))); return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
} }
@Override @Override
@ -122,27 +92,32 @@ public class ReplicaInfoManager
listenerRegistry.removeListener(checkNotNull(listener)); listenerRegistry.removeListener(checkNotNull(listener));
} }
String createDeviceMastershipTopic(DeviceId deviceId) { private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
return String.format("device:%s|%s", deviceId.toString(), versionService.version()); List<NodeId> backups = roles.backups() == null ?
Collections.emptyList() : ImmutableList.copyOf(roles.backups());
return new ReplicaInfo(roles.master(), backups);
} }
DeviceId extractDeviceIdFromTopic(String topic) { final class InternalMastershipListener implements MastershipListener {
Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
if (m.matches()) { @Override
return DeviceId.deviceId(m.group(1)); public void event(MastershipEvent event) {
} else { final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
throw new IllegalArgumentException("Invalid device mastership topic: " + topic); switch (event.type()) {
case MASTER_CHANGED:
eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
event.subject(),
replicaInfo));
break;
case BACKUPS_CHANGED:
eventDispatcher.post(new ReplicaInfoEvent(BACKUPS_CHANGED,
event.subject(),
replicaInfo));
break;
default:
break;
}
} }
} }
boolean isDeviceMastershipTopic(String topic) {
Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
return m.matches();
}
static ReplicaInfo buildFromLeadership(Leadership leadership) {
return new ReplicaInfo(leadership.leaderNodeId(), leadership.candidates().stream()
.filter(nodeId -> !Objects.equals(nodeId, leadership.leaderNodeId()))
.collect(Collectors.toList()));
}
} }

View File

@ -15,197 +15,153 @@
*/ */
package org.onosproject.store.flow.impl; package org.onosproject.store.flow.impl;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Consumer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId; import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
import org.onosproject.common.event.impl.TestEventDispatcher; import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.core.Version; import org.onosproject.event.ListenerRegistry;
import org.onosproject.event.Change; import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipEvent.Type;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DeviceId; import org.onosproject.net.DeviceId;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent; import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.service.AsyncLeaderElector; import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.service.CoordinationService; import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.LeaderElectorBuilder; import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class ReplicaInfoManagerTest { public class ReplicaInfoManagerTest {
private static final DeviceId DID1 = DeviceId.deviceId("of:1"); private static final DeviceId DID1 = DeviceId.deviceId("of:1");
private static final DeviceId DID2 = DeviceId.deviceId("of:2"); private static final DeviceId DID2 = DeviceId.deviceId("of:2");
private static final NodeId NID1 = new NodeId("foo"); private static final NodeId NID1 = new NodeId("foo");
private static final NodeId NID2 = new NodeId("bar");
private TestLeaderElector leaderElector; private ReplicaInfoManager mgr;
private ReplicaInfoManager manager; private ReplicaInfoService service;
private ListenerRegistry<MastershipEvent, MastershipListener>
mastershipListenerRegistry;
private TestEventDispatcher eventDispatcher;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
leaderElector = new TestLeaderElector(); mastershipListenerRegistry = new ListenerRegistry<>();
manager = new TestReplicaInfoManager();
manager.versionService = () -> Version.version("1.0.0");
CoordinationService coordinationService = mock(CoordinationService.class);
AsyncLeaderElector leaderElector = mock(AsyncLeaderElector.class);
expect(leaderElector.asLeaderElector()).andReturn(this.leaderElector).anyTimes();
expect(coordinationService.leaderElectorBuilder()).andReturn(new LeaderElectorBuilder() {
@Override
public AsyncLeaderElector build() {
return leaderElector;
}
}).anyTimes();
replay(coordinationService, leaderElector);
manager.coordinationService = coordinationService;
manager.activate(); mgr = new ReplicaInfoManager();
service = mgr;
eventDispatcher = new TestEventDispatcher();
mgr.eventDispatcher = eventDispatcher;
mgr.mastershipService = new TestMastershipService();
// register dummy mastership event source
mgr.eventDispatcher.addSink(MastershipEvent.class, mastershipListenerRegistry);
mgr.activate();
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
manager.deactivate(); mgr.deactivate();
} }
@Test @Test
public void testMastershipTopics() throws Exception { public void testGetReplicaInfoFor() {
assertEquals("device:of:1|1.0.0", manager.createDeviceMastershipTopic(DID1)); ReplicaInfo info1 = service.getReplicaInfoFor(DID1);
assertEquals(DID1, manager.extractDeviceIdFromTopic("device:of:1|1.0.0")); assertEquals(Optional.of(NID1), info1.master());
assertTrue(manager.isDeviceMastershipTopic("device:of:1|1.0.0")); // backups are always empty for now
assertFalse(manager.isDeviceMastershipTopic("foo:bar|1.0.0")); assertEquals(Collections.emptyList(), info1.backups());
assertFalse(manager.isDeviceMastershipTopic("foo:bar|baz"));
assertFalse(manager.isDeviceMastershipTopic("foobarbaz|1.0.0")); ReplicaInfo info2 = service.getReplicaInfoFor(DID2);
assertFalse(manager.isDeviceMastershipTopic("foobarbaz")); assertEquals("There's no master", Optional.empty(), info2.master());
// backups are always empty for now
assertEquals(Collections.emptyList(), info2.backups());
} }
@Test @Test
public void testReplicaEvents() throws Exception { public void testReplicaInfoEvent() throws InterruptedException {
Queue<ReplicaInfoEvent> events = new ArrayBlockingQueue<>(2); final CountDownLatch latch = new CountDownLatch(1);
manager.addListener(events::add); service.addListener(new MasterNodeCheck(latch, DID1, NID1));
Leadership oldLeadership = new Leadership( // fake MastershipEvent
manager.createDeviceMastershipTopic(DID1), eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
new Leader(NID1, 1, 1), new RoleInfo(NID1, new LinkedList<>())));
Lists.newArrayList(NID1));
Leadership newLeadership = new Leadership(
manager.createDeviceMastershipTopic(DID1),
new Leader(NID2, 2, 1),
Lists.newArrayList(NID2, NID1));
leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership); assertTrue(latch.await(1, TimeUnit.SECONDS));
leaderElector.post(new Change<>(oldLeadership, newLeadership));
ReplicaInfoEvent event = events.remove();
assertEquals(ReplicaInfoEvent.Type.MASTER_CHANGED, event.type());
assertEquals(NID2, event.replicaInfo().master().get());
assertEquals(1, event.replicaInfo().backups().size());
event = events.remove();
assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
assertEquals(NID2, event.replicaInfo().master().get());
assertEquals(1, event.replicaInfo().backups().size());
assertEquals(NID2, manager.getReplicaInfoFor(DID1).master().get());
assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
oldLeadership = new Leadership(
manager.createDeviceMastershipTopic(DID1),
new Leader(NID1, 1, 1),
Lists.newArrayList(NID1));
newLeadership = new Leadership(
manager.createDeviceMastershipTopic(DID1),
new Leader(NID1, 1, 1),
Lists.newArrayList(NID1, NID2));
leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
leaderElector.post(new Change<>(oldLeadership, newLeadership));
event = events.remove();
assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
assertEquals(NID1, event.replicaInfo().master().get());
assertEquals(1, event.replicaInfo().backups().size());
assertEquals(NID1, manager.getReplicaInfoFor(DID1).master().get());
assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
} }
private class TestReplicaInfoManager extends ReplicaInfoManager {
TestReplicaInfoManager() { private final class MasterNodeCheck implements ReplicaInfoEventListener {
eventDispatcher = new TestEventDispatcher(); private final CountDownLatch latch;
private Optional<NodeId> expectedMaster;
private DeviceId expectedDevice;
MasterNodeCheck(CountDownLatch latch, DeviceId did,
NodeId nid) {
this.latch = latch;
this.expectedMaster = Optional.ofNullable(nid);
this.expectedDevice = did;
}
@Override
public void event(ReplicaInfoEvent event) {
assertEquals(expectedDevice, event.subject());
assertEquals(expectedMaster, event.replicaInfo().master());
// backups are always empty for now
assertEquals(Collections.emptyList(), event.replicaInfo().backups());
latch.countDown();
} }
} }
private class TestLeaderElector implements LeaderElector {
private final Map<String, Leadership> leaderships = Maps.newConcurrentMap();
private final Set<Consumer<Change<Leadership>>> listeners = Sets.newConcurrentHashSet();
@Override private final class TestMastershipService
public String name() { extends MastershipServiceAdapter
return null; implements MastershipService {
private Map<DeviceId, NodeId> masters;
TestMastershipService() {
masters = Maps.newHashMap();
masters.put(DID1, NID1);
// DID2 has no master
} }
@Override @Override
public Leadership run(String topic, NodeId nodeId) { public NodeId getMasterFor(DeviceId deviceId) {
return null; return masters.get(deviceId);
} }
@Override @Override
public void withdraw(String topic) { public RoleInfo getNodesFor(DeviceId deviceId) {
return new RoleInfo(masters.get(deviceId), Collections.emptyList());
} }
@Override @Override
public boolean anoint(String topic, NodeId nodeId) { public void addListener(MastershipListener listener) {
return false; mastershipListenerRegistry.addListener(listener);
} }
@Override @Override
public boolean promote(String topic, NodeId nodeId) { public void removeListener(MastershipListener listener) {
return false; mastershipListenerRegistry.removeListener(listener);
}
@Override
public void evict(NodeId nodeId) {
}
@Override
public Leadership getLeadership(String topic) {
return leaderships.get(topic);
}
@Override
public Map<String, Leadership> getLeaderships() {
return leaderships;
}
@Override
public void addChangeListener(Consumer<Change<Leadership>> consumer) {
listeners.add(consumer);
}
@Override
public void removeChangeListener(Consumer<Change<Leadership>> consumer) {
listeners.remove(consumer);
}
void post(Change<Leadership> change) {
listeners.forEach(l -> l.accept(change));
} }
} }
} }