mirror of
https://github.com/opennetworkinglab/onos.git
synced 2026-05-05 04:06:49 +02:00
Add atomic mastership/term/backups method to MastershipService
Change-Id: I18c3aeaa5101c9ce08ff38fffd70eaec903a0f3e
This commit is contained in:
parent
aeea0bbc30
commit
0a2bd45ad2
@ -17,6 +17,7 @@
|
||||
package org.onosproject.faultmanagement.impl;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -25,7 +26,6 @@ import org.onlab.packet.ChassisId;
|
||||
import org.onosproject.cfg.ComponentConfigAdapter;
|
||||
import org.onosproject.cfg.ComponentConfigService;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.incubator.net.faultmanagement.alarm.Alarm;
|
||||
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmConsumer;
|
||||
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmId;
|
||||
@ -35,6 +35,7 @@ import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderRegistry
|
||||
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderService;
|
||||
import org.onosproject.incubator.net.faultmanagement.alarm.DefaultAlarm;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipListener;
|
||||
import org.onosproject.mastership.MastershipService;
|
||||
import org.onosproject.mastership.MastershipServiceAdapter;
|
||||
@ -64,6 +65,7 @@ import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
@ -96,7 +98,7 @@ public class PollingAlarmProviderTest {
|
||||
|
||||
private final MastershipEvent mastershipEvent =
|
||||
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, DEVICE_ID,
|
||||
new RoleInfo(nodeId, ImmutableList.of()));
|
||||
new MastershipInfo(1, Optional.of(nodeId), ImmutableMap.of()));
|
||||
|
||||
private final DeviceEvent deviceEvent =
|
||||
new DeviceEvent(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED, device);
|
||||
|
||||
@ -29,9 +29,6 @@ import java.util.Objects;
|
||||
*/
|
||||
public class MastershipEvent extends AbstractEvent<MastershipEvent.Type, DeviceId> {
|
||||
|
||||
//Contains master and standby information.
|
||||
RoleInfo roleInfo;
|
||||
|
||||
/**
|
||||
* Type of mastership events.
|
||||
*/
|
||||
@ -55,45 +52,58 @@ public class MastershipEvent extends AbstractEvent<MastershipEvent.Type, DeviceI
|
||||
SUSPENDED
|
||||
}
|
||||
|
||||
private final MastershipInfo mastershipInfo;
|
||||
|
||||
/**
|
||||
* Creates an event of a given type and for the specified device,
|
||||
* role information, and the current time.
|
||||
*
|
||||
* @param type mastership event type
|
||||
* @param device event device subject
|
||||
* @param info mastership role information
|
||||
* @param type mastership event type
|
||||
* @param device event device subject
|
||||
* @param mastershipInfo mastership info
|
||||
*/
|
||||
public MastershipEvent(Type type, DeviceId device, RoleInfo info) {
|
||||
public MastershipEvent(Type type, DeviceId device, MastershipInfo mastershipInfo) {
|
||||
super(type, device);
|
||||
this.roleInfo = info;
|
||||
this.mastershipInfo = mastershipInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an event of a given type and for the specified device, master,
|
||||
* and time.
|
||||
*
|
||||
* @param type mastership event type
|
||||
* @param device event device subject
|
||||
* @param info role information
|
||||
* @param time occurrence time
|
||||
* @param type mastership event type
|
||||
* @param device event device subject
|
||||
* @param mastershipInfo mastership information
|
||||
* @param time occurrence time
|
||||
*/
|
||||
public MastershipEvent(Type type, DeviceId device, RoleInfo info, long time) {
|
||||
public MastershipEvent(Type type, DeviceId device, MastershipInfo mastershipInfo, long time) {
|
||||
super(type, device, time);
|
||||
this.roleInfo = info;
|
||||
this.mastershipInfo = mastershipInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the mastership info.
|
||||
*
|
||||
* @return the mastership info
|
||||
*/
|
||||
public MastershipInfo mastershipInfo() {
|
||||
return mastershipInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current role state for the subject.
|
||||
*
|
||||
* @return RoleInfo associated with Device ID subject
|
||||
* @deprecated since 1.14
|
||||
*/
|
||||
@Deprecated
|
||||
public RoleInfo roleInfo() {
|
||||
return roleInfo;
|
||||
return new RoleInfo(mastershipInfo.master().orElse(null), mastershipInfo.backups());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(type(), subject(), roleInfo(), time());
|
||||
return Objects.hash(type(), subject(), mastershipInfo(), time());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -105,7 +115,7 @@ public class MastershipEvent extends AbstractEvent<MastershipEvent.Type, DeviceI
|
||||
final MastershipEvent other = (MastershipEvent) obj;
|
||||
return Objects.equals(this.type(), other.type()) &&
|
||||
Objects.equals(this.subject(), other.subject()) &&
|
||||
Objects.equals(this.roleInfo(), other.roleInfo()) &&
|
||||
Objects.equals(this.mastershipInfo(), other.mastershipInfo()) &&
|
||||
Objects.equals(this.time(), other.time());
|
||||
}
|
||||
return false;
|
||||
@ -117,7 +127,7 @@ public class MastershipEvent extends AbstractEvent<MastershipEvent.Type, DeviceI
|
||||
.add("time", Tools.defaultOffsetDataTime(time()))
|
||||
.add("type", type())
|
||||
.add("subject", subject())
|
||||
.add("roleInfo", roleInfo)
|
||||
.add("mastershipInfo", mastershipInfo())
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,124 @@
|
||||
/*
|
||||
* Copyright 2018-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.onosproject.mastership;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.net.MastershipRole;
|
||||
|
||||
import static com.google.common.base.MoreObjects.toStringHelper;
|
||||
|
||||
/**
|
||||
* Mastership info.
|
||||
*/
|
||||
public final class MastershipInfo {
|
||||
private final long term;
|
||||
private final Optional<NodeId> master;
|
||||
private final ImmutableMap<NodeId, MastershipRole> roles;
|
||||
|
||||
public MastershipInfo() {
|
||||
this(0, Optional.empty(), ImmutableMap.of());
|
||||
}
|
||||
|
||||
public MastershipInfo(long term, Optional<NodeId> master, ImmutableMap<NodeId, MastershipRole> roles) {
|
||||
this.term = term;
|
||||
this.master = master;
|
||||
this.roles = roles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the mastership term.
|
||||
*
|
||||
* @return the mastership term
|
||||
*/
|
||||
public long term() {
|
||||
return term;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current master.
|
||||
*
|
||||
* @return the current master
|
||||
*/
|
||||
public Optional<NodeId> master() {
|
||||
return master;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a sorted list of standby nodes.
|
||||
*
|
||||
* @return a sorted list of standby nodes
|
||||
*/
|
||||
public List<NodeId> backups() {
|
||||
return getRoles(MastershipRole.STANDBY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the list of nodes with the given role.
|
||||
*
|
||||
* @param role the role by which to filter nodes
|
||||
* @return an immutable list of nodes with the given role sorted in priority order
|
||||
*/
|
||||
public List<NodeId> getRoles(MastershipRole role) {
|
||||
return ImmutableList.copyOf(roles.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue() == role)
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current role for the given node.
|
||||
*
|
||||
* @param nodeId the node for which to return the current role
|
||||
* @return the current role for the given node
|
||||
*/
|
||||
public MastershipRole getRole(NodeId nodeId) {
|
||||
return roles.get(nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(term, master, roles);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object object) {
|
||||
if (object instanceof MastershipInfo) {
|
||||
MastershipInfo that = (MastershipInfo) object;
|
||||
return this.term == that.term
|
||||
&& Objects.equals(this.master, that.master)
|
||||
&& Objects.equals(this.roles, that.roles);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toStringHelper(this)
|
||||
.add("term", term)
|
||||
.add("master", master)
|
||||
.add("roles", roles)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
@ -119,6 +119,14 @@ public interface MastershipService
|
||||
*/
|
||||
RoleInfo getNodesFor(DeviceId deviceId);
|
||||
|
||||
/**
|
||||
* Returns the mastership info for the given device.
|
||||
*
|
||||
* @param deviceId the device for which to return the mastership info
|
||||
* @return the mastership info for the given device
|
||||
*/
|
||||
MastershipInfo getMastershipFor(DeviceId deviceId);
|
||||
|
||||
/**
|
||||
* Returns the devices for which a controller is master.
|
||||
* <p>
|
||||
|
||||
@ -73,7 +73,6 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
|
||||
*/
|
||||
Set<DeviceId> getDevices(NodeId nodeId);
|
||||
|
||||
|
||||
/**
|
||||
* Sets a device's role for a specified controller instance.
|
||||
*
|
||||
@ -92,6 +91,14 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
|
||||
*/
|
||||
MastershipTerm getTermFor(DeviceId deviceId);
|
||||
|
||||
/**
|
||||
* Returns the mastership info for the given device.
|
||||
*
|
||||
* @param deviceId the device for which to return the mastership info
|
||||
* @return the mastership info for the given device
|
||||
*/
|
||||
MastershipInfo getMastership(DeviceId deviceId);
|
||||
|
||||
/**
|
||||
* Sets a controller instance's mastership role to STANDBY for a device.
|
||||
* If the role is MASTER, another controller instance will be selected
|
||||
|
||||
@ -15,14 +15,15 @@
|
||||
*/
|
||||
package org.onosproject.mastership;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.testing.EqualsTester;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.MastershipRole;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
@ -36,21 +37,33 @@ public class MastershipEventTest {
|
||||
private final DeviceId deviceId2 = DeviceId.deviceId("bar:baz");
|
||||
private final NodeId node1 = new NodeId("1");
|
||||
private final NodeId node2 = new NodeId("2");
|
||||
private final RoleInfo roleInfo1 = new RoleInfo(node1, Arrays.asList(node1, node2));
|
||||
private final RoleInfo roleInfo2 = new RoleInfo(node2, Arrays.asList(node2, node1));
|
||||
private final MastershipInfo mastershipInfo1 = new MastershipInfo(
|
||||
1,
|
||||
Optional.of(node1),
|
||||
ImmutableMap.<NodeId, MastershipRole>builder()
|
||||
.put(node1, MastershipRole.MASTER)
|
||||
.put(node2, MastershipRole.STANDBY)
|
||||
.build());
|
||||
private final MastershipInfo mastershipInfo2 = new MastershipInfo(
|
||||
2,
|
||||
Optional.of(node1),
|
||||
ImmutableMap.<NodeId, MastershipRole>builder()
|
||||
.put(node2, MastershipRole.MASTER)
|
||||
.put(node1, MastershipRole.STANDBY)
|
||||
.build());
|
||||
|
||||
private final MastershipEvent event1 =
|
||||
new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId1, roleInfo1);
|
||||
new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId1, mastershipInfo1);
|
||||
private final MastershipEvent event2 =
|
||||
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo1);
|
||||
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo1);
|
||||
private final MastershipEvent event3 =
|
||||
new MastershipEvent(MastershipEvent.Type.SUSPENDED, deviceId1, roleInfo1);
|
||||
new MastershipEvent(MastershipEvent.Type.SUSPENDED, deviceId1, mastershipInfo1);
|
||||
private final MastershipEvent event4 =
|
||||
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo2, time);
|
||||
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo2, time);
|
||||
private final MastershipEvent sameAsEvent4 =
|
||||
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo2, time);
|
||||
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo2, time);
|
||||
private final MastershipEvent event5 =
|
||||
new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId2, roleInfo1);
|
||||
new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId2, mastershipInfo1);
|
||||
|
||||
/**
|
||||
* Tests for proper operation of equals(), hashCode() and toString() methods.
|
||||
@ -73,12 +86,12 @@ public class MastershipEventTest {
|
||||
public void checkConstruction() {
|
||||
assertThat(event1.type(), is(MastershipEvent.Type.BACKUPS_CHANGED));
|
||||
assertThat(event1.subject(), is(deviceId1));
|
||||
assertThat(event1.roleInfo(), is(roleInfo1));
|
||||
assertThat(event1.mastershipInfo(), is(mastershipInfo1));
|
||||
|
||||
assertThat(event4.time(), is(time));
|
||||
assertThat(event4.type(), is(MastershipEvent.Type.MASTER_CHANGED));
|
||||
assertThat(event4.subject(), is(deviceId1));
|
||||
assertThat(event4.roleInfo(), is(roleInfo2));
|
||||
assertThat(event4.mastershipInfo(), is(mastershipInfo2));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2018-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.onosproject.mastership;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Test;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.net.MastershipRole;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
/**
|
||||
* Mastership info test.
|
||||
*/
|
||||
public class MastershipInfoTest {
|
||||
private final NodeId node1 = new NodeId("1");
|
||||
private final NodeId node2 = new NodeId("2");
|
||||
private final NodeId node3 = new NodeId("3");
|
||||
private final NodeId node4 = new NodeId("4");
|
||||
|
||||
private final MastershipInfo mastershipInfo = new MastershipInfo(
|
||||
1,
|
||||
Optional.of(node1),
|
||||
ImmutableMap.<NodeId, MastershipRole>builder()
|
||||
.put(node1, MastershipRole.MASTER)
|
||||
.put(node2, MastershipRole.STANDBY)
|
||||
.put(node3, MastershipRole.STANDBY)
|
||||
.put(node4, MastershipRole.NONE)
|
||||
.build());
|
||||
|
||||
@Test
|
||||
public void testMastershipInfo() throws Exception {
|
||||
assertEquals(1, mastershipInfo.term());
|
||||
assertEquals(node1, mastershipInfo.master().get());
|
||||
assertEquals(Lists.newArrayList(node1), mastershipInfo.getRoles(MastershipRole.MASTER));
|
||||
assertEquals(Lists.newArrayList(node2, node3), mastershipInfo.backups());
|
||||
assertEquals(Lists.newArrayList(node2, node3), mastershipInfo.getRoles(MastershipRole.STANDBY));
|
||||
assertEquals(Lists.newArrayList(node4), mastershipInfo.getRoles(MastershipRole.NONE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEquals() throws Exception {
|
||||
assertEquals(mastershipInfo, mastershipInfo);
|
||||
assertNotEquals(mastershipInfo, new MastershipInfo(1, Optional.of(node1), ImmutableMap.of()));
|
||||
}
|
||||
}
|
||||
@ -47,6 +47,11 @@ public class MastershipServiceAdapter implements MastershipService {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MastershipInfo getMastershipFor(DeviceId deviceId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<DeviceId> getDevicesOf(NodeId nodeId) {
|
||||
return null;
|
||||
|
||||
@ -27,10 +27,12 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Deactivate;
|
||||
@ -48,6 +50,7 @@ import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.core.Version;
|
||||
import org.onosproject.core.VersionService;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipStore;
|
||||
import org.onosproject.mastership.MastershipStoreDelegate;
|
||||
import org.onosproject.mastership.MastershipTerm;
|
||||
@ -177,7 +180,7 @@ public class SimpleMastershipStore
|
||||
}
|
||||
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -219,8 +222,7 @@ public class SimpleMastershipStore
|
||||
incrementTerm(deviceId);
|
||||
// remove from backup list
|
||||
removeFromBackups(deviceId, node);
|
||||
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
|
||||
getNodes(deviceId)));
|
||||
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
|
||||
return CompletableFuture.completedFuture(MastershipRole.MASTER);
|
||||
}
|
||||
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
|
||||
@ -229,14 +231,12 @@ public class SimpleMastershipStore
|
||||
// no master => become master
|
||||
masterMap.put(deviceId, node);
|
||||
incrementTerm(deviceId);
|
||||
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
|
||||
getNodes(deviceId)));
|
||||
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
|
||||
return CompletableFuture.completedFuture(MastershipRole.MASTER);
|
||||
}
|
||||
// add to backup list
|
||||
if (addToBackup(deviceId, node)) {
|
||||
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
|
||||
getNodes(deviceId)));
|
||||
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
|
||||
}
|
||||
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
|
||||
default:
|
||||
@ -298,6 +298,21 @@ public class SimpleMastershipStore
|
||||
masterMap.get(deviceId), termMap.get(deviceId).get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public MastershipInfo getMastership(DeviceId deviceId) {
|
||||
ImmutableMap.Builder<NodeId, MastershipRole> roleBuilder = ImmutableMap.builder();
|
||||
NodeId master = masterMap.get(deviceId);
|
||||
if (master != null) {
|
||||
roleBuilder.put(master, MastershipRole.MASTER);
|
||||
}
|
||||
backups.getOrDefault(deviceId, Collections.emptyList())
|
||||
.forEach(nodeId -> roleBuilder.put(nodeId, MastershipRole.STANDBY));
|
||||
return new MastershipInfo(
|
||||
termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)).get(),
|
||||
Optional.ofNullable(master),
|
||||
roleBuilder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
|
||||
MastershipRole role = getRole(nodeId, deviceId);
|
||||
@ -309,13 +324,13 @@ public class SimpleMastershipStore
|
||||
masterMap.remove(deviceId);
|
||||
// TODO: Should there be new event type for no MASTER?
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
|
||||
} else {
|
||||
NodeId prevMaster = masterMap.put(deviceId, backup);
|
||||
incrementTerm(deviceId);
|
||||
addToBackup(deviceId, prevMaster);
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
|
||||
}
|
||||
|
||||
case STANDBY:
|
||||
@ -323,7 +338,7 @@ public class SimpleMastershipStore
|
||||
boolean modified = addToBackup(deviceId, nodeId);
|
||||
if (modified) {
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
|
||||
new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
|
||||
}
|
||||
break;
|
||||
|
||||
@ -357,12 +372,12 @@ public class SimpleMastershipStore
|
||||
masterMap.put(deviceId, backup);
|
||||
incrementTerm(deviceId);
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
|
||||
|
||||
case STANDBY:
|
||||
if (removeFromBackups(deviceId, nodeId)) {
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
|
||||
new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ import org.onosproject.core.MetricsHelper;
|
||||
import org.onosproject.event.AbstractListenerManager;
|
||||
import org.onosproject.mastership.MastershipAdminService;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipListener;
|
||||
import org.onosproject.mastership.MastershipService;
|
||||
import org.onosproject.mastership.MastershipStore;
|
||||
@ -238,6 +239,13 @@ public class MastershipManager
|
||||
return store.getNodes(deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MastershipInfo getMastershipFor(DeviceId deviceId) {
|
||||
checkPermission(CLUSTER_READ);
|
||||
checkNotNull(deviceId, DEVICE_ID_NULL);
|
||||
return store.getMastership(deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MastershipTerm getMastershipTerm(DeviceId deviceId) {
|
||||
checkPermission(CLUSTER_READ);
|
||||
|
||||
@ -29,20 +29,32 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
||||
*/
|
||||
public final class ReplicaInfo {
|
||||
|
||||
private final long term;
|
||||
private final Optional<NodeId> master;
|
||||
private final List<NodeId> backups;
|
||||
|
||||
/**
|
||||
* Creates a ReplicaInfo instance.
|
||||
*
|
||||
* @param term monotonically increasing unique mastership term
|
||||
* @param master NodeId of the node where the master copy should be
|
||||
* @param backups list of NodeId, where backup copies should be placed
|
||||
*/
|
||||
public ReplicaInfo(NodeId master, List<NodeId> backups) {
|
||||
public ReplicaInfo(long term, NodeId master, List<NodeId> backups) {
|
||||
this.term = term;
|
||||
this.master = Optional.ofNullable(master);
|
||||
this.backups = checkNotNull(backups);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the mastership term.
|
||||
*
|
||||
* @return the mastership term
|
||||
*/
|
||||
public long term() {
|
||||
return term;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the NodeId, if there is a Node where the master copy should be.
|
||||
*
|
||||
@ -78,6 +90,7 @@ public final class ReplicaInfo {
|
||||
|
||||
// for Serializer
|
||||
private ReplicaInfo() {
|
||||
this.term = 0;
|
||||
this.master = Optional.empty();
|
||||
this.backups = Collections.emptyList();
|
||||
}
|
||||
|
||||
@ -15,18 +15,16 @@
|
||||
*/
|
||||
package org.onosproject.store.flow.impl;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Deactivate;
|
||||
import org.apache.felix.scr.annotations.Reference;
|
||||
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||
import org.apache.felix.scr.annotations.Service;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.event.EventDeliveryService;
|
||||
import org.onosproject.event.ListenerRegistry;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipListener;
|
||||
import org.onosproject.mastership.MastershipService;
|
||||
import org.onosproject.net.DeviceId;
|
||||
@ -36,8 +34,6 @@ import org.onosproject.store.flow.ReplicaInfoEventListener;
|
||||
import org.onosproject.store.flow.ReplicaInfoService;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
|
||||
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
|
||||
@ -79,7 +75,7 @@ public class ReplicaInfoManager implements ReplicaInfoService {
|
||||
|
||||
@Override
|
||||
public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
|
||||
return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
|
||||
return buildFromRoleInfo(mastershipService.getMastershipFor(deviceId));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -92,17 +88,15 @@ public class ReplicaInfoManager implements ReplicaInfoService {
|
||||
listenerRegistry.removeListener(checkNotNull(listener));
|
||||
}
|
||||
|
||||
private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
|
||||
List<NodeId> backups = roles.backups() == null ?
|
||||
Collections.emptyList() : ImmutableList.copyOf(roles.backups());
|
||||
return new ReplicaInfo(roles.master(), backups);
|
||||
private static ReplicaInfo buildFromRoleInfo(MastershipInfo mastership) {
|
||||
return new ReplicaInfo(mastership.term(), mastership.master().orElse(null), mastership.backups());
|
||||
}
|
||||
|
||||
final class InternalMastershipListener implements MastershipListener {
|
||||
|
||||
@Override
|
||||
public void event(MastershipEvent event) {
|
||||
final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
|
||||
final ReplicaInfo replicaInfo = buildFromRoleInfo(event.mastershipInfo());
|
||||
switch (event.type()) {
|
||||
case MASTER_CHANGED:
|
||||
eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
|
||||
|
||||
@ -23,7 +23,8 @@ import static org.slf4j.LoggerFactory.getLogger;
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@ -34,6 +35,7 @@ import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Deactivate;
|
||||
@ -50,6 +52,7 @@ import org.onosproject.cluster.LeadershipService;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipStore;
|
||||
import org.onosproject.mastership.MastershipStoreDelegate;
|
||||
import org.onosproject.mastership.MastershipTerm;
|
||||
@ -62,10 +65,7 @@ import org.onosproject.store.serializers.KryoNamespaces;
|
||||
import org.onosproject.store.service.Serializer;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Implementation of the MastershipStore on top of Leadership Service.
|
||||
@ -158,7 +158,7 @@ public class ConsistentDeviceMastershipStore
|
||||
NodeId leader = leadership == null ? null : leadership.leaderNodeId();
|
||||
List<NodeId> candidates = leadership == null ?
|
||||
ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
|
||||
MastershipRole role = Objects.equal(localNodeId, leader) ?
|
||||
MastershipRole role = Objects.equals(localNodeId, leader) ?
|
||||
MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
|
||||
return CompletableFuture.completedFuture(role);
|
||||
}
|
||||
@ -173,7 +173,7 @@ public class ConsistentDeviceMastershipStore
|
||||
NodeId leader = leadership == null ? null : leadership.leaderNodeId();
|
||||
List<NodeId> candidates = leadership == null ?
|
||||
ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
|
||||
return Objects.equal(nodeId, leader) ?
|
||||
return Objects.equals(nodeId, leader) ?
|
||||
MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
|
||||
}
|
||||
|
||||
@ -187,27 +187,15 @@ public class ConsistentDeviceMastershipStore
|
||||
@Override
|
||||
public RoleInfo getNodes(DeviceId deviceId) {
|
||||
checkArgument(deviceId != null, DEVICE_ID_NULL);
|
||||
Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(deviceId));
|
||||
return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
|
||||
}
|
||||
|
||||
Map<NodeId, MastershipRole> roles = Maps.newHashMap();
|
||||
clusterService.getNodes()
|
||||
.forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
|
||||
|
||||
NodeId master = null;
|
||||
final List<NodeId> standbys = Lists.newLinkedList();
|
||||
|
||||
List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
|
||||
|
||||
for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
|
||||
if (entry.getValue() == MastershipRole.MASTER) {
|
||||
master = entry.getKey();
|
||||
} else if (entry.getValue() == MastershipRole.STANDBY) {
|
||||
standbys.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
|
||||
|
||||
return new RoleInfo(master, sortedStandbyList);
|
||||
@Override
|
||||
public MastershipInfo getMastership(DeviceId deviceId) {
|
||||
checkArgument(deviceId != null, DEVICE_ID_NULL);
|
||||
Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(deviceId));
|
||||
return buildMastershipFromLeadership(leadership);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -263,7 +251,7 @@ public class ConsistentDeviceMastershipStore
|
||||
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
|
||||
|
||||
NodeId newMaster = candidates.stream()
|
||||
.filter(candidate -> !Objects.equal(nodeId, candidate))
|
||||
.filter(candidate -> !Objects.equals(nodeId, candidate))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
log.info("Transitioning to role {} for {}. Next master: {}",
|
||||
@ -304,7 +292,7 @@ public class ConsistentDeviceMastershipStore
|
||||
MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
|
||||
MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
|
||||
leadershipService.withdraw(leadershipTopic);
|
||||
return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
|
||||
return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getMastership(deviceId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -312,6 +300,27 @@ public class ConsistentDeviceMastershipStore
|
||||
// Noop. LeadershipService already takes care of detecting and purging stale locks.
|
||||
}
|
||||
|
||||
private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
|
||||
ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
|
||||
if (leadership.leaderNodeId() != null) {
|
||||
builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
|
||||
}
|
||||
leadership.candidates().stream()
|
||||
.filter(nodeId -> !Objects.equals(leadership.leaderNodeId(), nodeId))
|
||||
.forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
|
||||
clusterService.getNodes().stream()
|
||||
.filter(node -> !Objects.equals(leadership.leaderNodeId(), node.id()))
|
||||
.filter(node -> !leadership.candidates().contains(node.id()))
|
||||
.forEach(node -> builder.put(node.id(), MastershipRole.NONE));
|
||||
|
||||
return new MastershipInfo(
|
||||
leadership.leader() != null ? leadership.leader().term() : 0,
|
||||
leadership.leader() != null
|
||||
? Optional.of(leadership.leader().nodeId())
|
||||
: Optional.empty(),
|
||||
builder.build());
|
||||
}
|
||||
|
||||
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
|
||||
|
||||
@Override
|
||||
@ -328,27 +337,23 @@ public class ConsistentDeviceMastershipStore
|
||||
private void handleEvent(LeadershipEvent event) {
|
||||
Leadership leadership = event.subject();
|
||||
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
|
||||
NodeId master = event.subject().leaderNodeId();
|
||||
List<NodeId> backups = event.subject().candidates()
|
||||
.stream()
|
||||
.filter(n -> !n.equals(master))
|
||||
.collect(Collectors.toList());
|
||||
RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
|
||||
? new RoleInfo(master, backups)
|
||||
: new RoleInfo();
|
||||
MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
|
||||
? buildMastershipFromLeadership(event.subject())
|
||||
: new MastershipInfo();
|
||||
|
||||
switch (event.type()) {
|
||||
case LEADER_AND_CANDIDATES_CHANGED:
|
||||
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
|
||||
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
|
||||
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
|
||||
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
|
||||
break;
|
||||
case LEADER_CHANGED:
|
||||
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
|
||||
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
|
||||
break;
|
||||
case CANDIDATES_CHANGED:
|
||||
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
|
||||
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
|
||||
break;
|
||||
case SERVICE_DISRUPTED:
|
||||
notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
|
||||
notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
|
||||
break;
|
||||
case SERVICE_RESTORED:
|
||||
// Do nothing, wait for updates from peers
|
||||
|
||||
@ -15,32 +15,33 @@
|
||||
*/
|
||||
package org.onosproject.store.flow.impl;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.common.event.impl.TestEventDispatcher;
|
||||
import org.onosproject.event.ListenerRegistry;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipEvent.Type;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipListener;
|
||||
import org.onosproject.mastership.MastershipService;
|
||||
import org.onosproject.mastership.MastershipServiceAdapter;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.MastershipRole;
|
||||
import org.onosproject.store.flow.ReplicaInfo;
|
||||
import org.onosproject.store.flow.ReplicaInfoEvent;
|
||||
import org.onosproject.store.flow.ReplicaInfoEventListener;
|
||||
import org.onosproject.store.flow.ReplicaInfoService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@ -101,7 +102,7 @@ public class ReplicaInfoManagerTest {
|
||||
|
||||
// fake MastershipEvent
|
||||
eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
|
||||
new RoleInfo(NID1, new LinkedList<>())));
|
||||
new MastershipInfo(1, Optional.of(NID1), ImmutableMap.of(NID1, MastershipRole.MASTER))));
|
||||
|
||||
assertTrue(latch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
@ -149,8 +150,11 @@ public class ReplicaInfoManagerTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RoleInfo getNodesFor(DeviceId deviceId) {
|
||||
return new RoleInfo(masters.get(deviceId), Collections.emptyList());
|
||||
public MastershipInfo getMastershipFor(DeviceId deviceId) {
|
||||
return new MastershipInfo(
|
||||
1,
|
||||
Optional.ofNullable(masters.get(deviceId)),
|
||||
ImmutableMap.of(NID1, MastershipRole.MASTER));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -16,7 +16,15 @@
|
||||
package org.onosproject.drivers.netconf;
|
||||
|
||||
import org.onosproject.mastership.MastershipServiceAdapter;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipListener;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.MastershipRole;
|
||||
|
||||
public class MockMastershipService extends MastershipServiceAdapter {
|
||||
|
||||
@ -30,4 +38,57 @@ public class MockMastershipService extends MastershipServiceAdapter {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(MastershipListener listener) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeListener(MastershipListener listener) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public MastershipRole getLocalRole(DeviceId deviceId) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> relinquishMastership(DeviceId deviceId) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeId getMasterFor(DeviceId deviceId) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RoleInfo getNodesFor(DeviceId deviceId) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MastershipInfo getMastershipFor(DeviceId deviceId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<DeviceId> getDevicesOf(NodeId nodeId) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.onosproject.incubator.net.virtual;
|
||||
import org.onosproject.cluster.NodeId;
|
||||
import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipStoreDelegate;
|
||||
import org.onosproject.mastership.MastershipTerm;
|
||||
import org.onosproject.net.DeviceId;
|
||||
@ -73,6 +74,15 @@ public interface VirtualNetworkMastershipStore
|
||||
*/
|
||||
RoleInfo getNodes(NetworkId networkId, DeviceId deviceId);
|
||||
|
||||
/**
|
||||
* Returns the mastership info for a device.
|
||||
*
|
||||
* @param networkId virtual network identifier
|
||||
* @param deviceId the device identifier
|
||||
* @return the mastership info
|
||||
*/
|
||||
MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId);
|
||||
|
||||
/**
|
||||
* Returns the devices that a controller instance is master of.
|
||||
*
|
||||
|
||||
@ -31,6 +31,7 @@ import org.onosproject.incubator.net.virtual.VirtualNetworkService;
|
||||
import org.onosproject.incubator.net.virtual.event.AbstractVirtualListenerManager;
|
||||
import org.onosproject.mastership.MastershipAdminService;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipListener;
|
||||
import org.onosproject.mastership.MastershipService;
|
||||
import org.onosproject.mastership.MastershipStoreDelegate;
|
||||
@ -154,6 +155,12 @@ public class VirtualNetworkMastershipManager
|
||||
return store.getNodes(networkId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MastershipInfo getMastershipFor(DeviceId deviceId) {
|
||||
checkNotNull(deviceId, DEVICE_ID_NULL);
|
||||
return store.getMastership(networkId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<DeviceId> getDevicesOf(NodeId nodeId) {
|
||||
checkNotNull(nodeId, NODE_ID_NULL);
|
||||
|
||||
@ -211,6 +211,5 @@ public class GrpcNbMastershipServiceTest {
|
||||
public RoleInfo getNodesFor(DeviceId deviceId) {
|
||||
return roleInfoMap.get(deviceId);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
|
||||
package org.onosproject.incubator.store.virtual.impl;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Deactivate;
|
||||
@ -34,6 +35,7 @@ import org.onosproject.cluster.RoleInfo;
|
||||
import org.onosproject.incubator.net.virtual.NetworkId;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipStoreDelegate;
|
||||
import org.onosproject.mastership.MastershipTerm;
|
||||
import org.onosproject.net.DeviceId;
|
||||
@ -45,7 +47,7 @@ import org.onosproject.store.service.Serializer;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@ -64,8 +66,6 @@ import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
@Component(immediate = true, enabled = false)
|
||||
@ -188,30 +188,16 @@ public class ConsistentVirtualDeviceMastershipStore
|
||||
public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
|
||||
checkArgument(networkId != null, NETWORK_ID_NULL);
|
||||
checkArgument(deviceId != null, DEVICE_ID_NULL);
|
||||
Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
|
||||
return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
|
||||
}
|
||||
|
||||
Map<NodeId, MastershipRole> roles = Maps.newHashMap();
|
||||
clusterService.getNodes()
|
||||
.forEach((node) -> roles.put(node.id(),
|
||||
getRole(networkId, node.id(), deviceId)));
|
||||
|
||||
NodeId master = null;
|
||||
final List<NodeId> standbys = Lists.newLinkedList();
|
||||
|
||||
List<NodeId> candidates = leadershipService
|
||||
.getCandidates(createDeviceMastershipTopic(networkId, deviceId));
|
||||
|
||||
for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
|
||||
if (entry.getValue() == MastershipRole.MASTER) {
|
||||
master = entry.getKey();
|
||||
} else if (entry.getValue() == MastershipRole.STANDBY) {
|
||||
standbys.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
List<NodeId> sortedStandbyList = candidates.stream()
|
||||
.filter(standbys::contains).collect(Collectors.toList());
|
||||
|
||||
return new RoleInfo(master, sortedStandbyList);
|
||||
@Override
|
||||
public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
|
||||
checkArgument(networkId != null, NETWORK_ID_NULL);
|
||||
checkArgument(deviceId != null, DEVICE_ID_NULL);
|
||||
Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
|
||||
return buildMastershipFromLeadership(leadership);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -322,9 +308,8 @@ public class ConsistentVirtualDeviceMastershipStore
|
||||
MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
|
||||
MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
|
||||
leadershipService.withdraw(leadershipTopic);
|
||||
return CompletableFuture.completedFuture(new MastershipEvent(eventType,
|
||||
deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
|
||||
}
|
||||
|
||||
private CompletableFuture<MastershipEvent>
|
||||
@ -338,6 +323,24 @@ public class ConsistentVirtualDeviceMastershipStore
|
||||
// Noop. LeadershipService already takes care of detecting and purging stale locks.
|
||||
}
|
||||
|
||||
private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
|
||||
ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
|
||||
if (leadership.leaderNodeId() != null) {
|
||||
builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
|
||||
}
|
||||
leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
|
||||
clusterService.getNodes().stream()
|
||||
.filter(node -> !leadership.candidates().contains(node.id()))
|
||||
.forEach(node -> builder.put(node.id(), MastershipRole.NONE));
|
||||
|
||||
return new MastershipInfo(
|
||||
leadership.leader() != null ? leadership.leader().term() : 0,
|
||||
leadership.leader() != null
|
||||
? Optional.of(leadership.leader().nodeId())
|
||||
: Optional.empty(),
|
||||
builder.build());
|
||||
}
|
||||
|
||||
private class InternalDeviceMastershipEventListener
|
||||
implements LeadershipEventListener {
|
||||
|
||||
@ -357,28 +360,23 @@ public class ConsistentVirtualDeviceMastershipStore
|
||||
|
||||
NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
|
||||
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
|
||||
|
||||
RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
|
||||
getNodes(networkId, deviceId) : new RoleInfo();
|
||||
MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
|
||||
? buildMastershipFromLeadership(event.subject())
|
||||
: new MastershipInfo();
|
||||
|
||||
switch (event.type()) {
|
||||
case LEADER_AND_CANDIDATES_CHANGED:
|
||||
notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
|
||||
deviceId, roleInfo));
|
||||
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
|
||||
deviceId, roleInfo));
|
||||
notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
|
||||
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
|
||||
break;
|
||||
case LEADER_CHANGED:
|
||||
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
|
||||
deviceId, roleInfo));
|
||||
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
|
||||
break;
|
||||
case CANDIDATES_CHANGED:
|
||||
notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
|
||||
deviceId, roleInfo));
|
||||
notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
|
||||
break;
|
||||
case SERVICE_DISRUPTED:
|
||||
notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
|
||||
deviceId, roleInfo));
|
||||
notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
|
||||
break;
|
||||
case SERVICE_RESTORED:
|
||||
// Do nothing, wait for updates from peers
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
package org.onosproject.incubator.store.virtual.impl;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
@ -36,6 +37,7 @@ import org.onosproject.core.VersionService;
|
||||
import org.onosproject.incubator.net.virtual.NetworkId;
|
||||
import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipInfo;
|
||||
import org.onosproject.mastership.MastershipStoreDelegate;
|
||||
import org.onosproject.mastership.MastershipTerm;
|
||||
import org.onosproject.net.DeviceId;
|
||||
@ -50,6 +52,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -122,7 +125,7 @@ public class SimpleVirtualMastershipStore
|
||||
// remove from backup list
|
||||
removeFromBackups(networkId, deviceId, node);
|
||||
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
getMastership(networkId, deviceId)));
|
||||
return CompletableFuture.completedFuture(MastershipRole.MASTER);
|
||||
}
|
||||
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
|
||||
@ -132,13 +135,13 @@ public class SimpleVirtualMastershipStore
|
||||
masterMap.put(deviceId, node);
|
||||
incrementTerm(networkId, deviceId);
|
||||
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
getMastership(networkId, deviceId)));
|
||||
return CompletableFuture.completedFuture(MastershipRole.MASTER);
|
||||
}
|
||||
// add to backup list
|
||||
if (addToBackup(networkId, deviceId, node)) {
|
||||
notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
getMastership(networkId, deviceId)));
|
||||
}
|
||||
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
|
||||
default:
|
||||
@ -183,6 +186,27 @@ public class SimpleVirtualMastershipStore
|
||||
backups.getOrDefault(deviceId, ImmutableList.of()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
|
||||
Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
|
||||
Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
|
||||
Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
|
||||
ImmutableMap.Builder<NodeId, MastershipRole> roleBuilder = ImmutableMap.builder();
|
||||
NodeId master = masterMap.get(deviceId);
|
||||
if (master != null) {
|
||||
roleBuilder.put(master, MastershipRole.MASTER);
|
||||
}
|
||||
backups.getOrDefault(master, Collections.emptyList())
|
||||
.forEach(nodeId -> roleBuilder.put(nodeId, MastershipRole.STANDBY));
|
||||
clusterService.getNodes().stream()
|
||||
.filter(node -> !masterMap.containsValue(node.id()))
|
||||
.forEach(node -> roleBuilder.put(node.id(), MastershipRole.NONE));
|
||||
return new MastershipInfo(
|
||||
termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)).get(),
|
||||
Optional.ofNullable(master),
|
||||
roleBuilder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
|
||||
Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
|
||||
@ -219,7 +243,7 @@ public class SimpleVirtualMastershipStore
|
||||
}
|
||||
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(networkId, deviceId)));
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(networkId, deviceId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -249,14 +273,14 @@ public class SimpleVirtualMastershipStore
|
||||
// TODO: Should there be new event type for no MASTER?
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
getMastership(networkId, deviceId)));
|
||||
} else {
|
||||
NodeId prevMaster = masterMap.put(deviceId, backup);
|
||||
incrementTerm(networkId, deviceId);
|
||||
addToBackup(networkId, deviceId, prevMaster);
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
getMastership(networkId, deviceId)));
|
||||
}
|
||||
|
||||
case STANDBY:
|
||||
@ -265,7 +289,7 @@ public class SimpleVirtualMastershipStore
|
||||
if (modified) {
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(BACKUPS_CHANGED, deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
getMastership(networkId, deviceId)));
|
||||
}
|
||||
break;
|
||||
|
||||
@ -314,13 +338,13 @@ public class SimpleVirtualMastershipStore
|
||||
incrementTerm(networkId, deviceId);
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(MASTER_CHANGED, deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
getMastership(networkId, deviceId)));
|
||||
|
||||
case STANDBY:
|
||||
if (removeFromBackups(networkId, deviceId, nodeId)) {
|
||||
return CompletableFuture.completedFuture(
|
||||
new MastershipEvent(BACKUPS_CHANGED, deviceId,
|
||||
getNodes(networkId, deviceId)));
|
||||
getMastership(networkId, deviceId)));
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user