diff --git a/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java b/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java index a08bd358e6..e852cf9fa8 100644 --- a/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java +++ b/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java @@ -145,6 +145,7 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro private static final String DATA_PLANE_IP = "dataPlaneIp"; private static final String DATA_PLANE_INTF = "dataPlaneIntf"; private static final String S_TAG = "stag"; + private static final String VSG_HOST_ID = "vsgHostId"; private static final Ip4Address DEFAULT_DNS = Ip4Address.valueOf("8.8.8.8"); @@ -175,7 +176,6 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro deviceService, driverService, groupService, - mastershipService, DEFAULT_TUNNEL); arpProxy = new CordVtnArpProxy(appId, packetService, hostService); @@ -301,30 +301,28 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro @Override public void updateVirtualSubscriberGateways(HostId vSgHostId, String serviceVlan, Map vSgs) { - Host vSgVm = hostService.getHost(vSgHostId); - - if (vSgVm == null || !vSgVm.annotations().value(S_TAG).equals(serviceVlan)) { + Host vSgHost = hostService.getHost(vSgHostId); + if (vSgHost == null || !vSgHost.annotations().value(S_TAG).equals(serviceVlan)) { log.debug("Invalid vSG updates for {}", serviceVlan); return; } - log.info("Updates vSGs in {} with {}", vSgVm.id(), vSgs.toString()); + log.info("Updates vSGs in {} with {}", vSgHost.id(), vSgs.toString()); vSgs.entrySet().stream() + .filter(entry -> hostService.getHostsByMac(entry.getValue()).isEmpty()) .forEach(entry -> addVirtualSubscriberGateway( - vSgVm, + vSgHost, entry.getKey(), entry.getValue(), serviceVlan)); - hostService.getConnectedHosts(vSgVm.location()).stream() - .filter(host -> !host.mac().equals(vSgVm.mac())) + hostService.getConnectedHosts(vSgHost.location()).stream() + .filter(host -> !host.mac().equals(vSgHost.mac())) .filter(host -> !vSgs.values().contains(host.mac())) .forEach(host -> { log.info("Removed vSG {}", host.toString()); hostProvider.hostVanished(host.id()); }); - - ruleInstaller.populateSubscriberGatewayRules(vSgVm, vSgs.keySet()); } /** @@ -337,16 +335,12 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro */ private void addVirtualSubscriberGateway(Host vSgHost, IpAddress vSgIp, MacAddress vSgMac, String serviceVlan) { - HostId hostId = HostId.hostId(vSgMac); - Host host = hostService.getHost(hostId); - if (host != null) { - log.trace("vSG with {} already exists", vSgMac.toString()); - return; - } + log.info("vSG with IP({}) MAC({}) added", vSgIp.toString(), vSgMac.toString()); - log.info("vSG with IP({}) MAC({}) detected", vSgIp.toString(), vSgMac.toString()); + HostId hostId = HostId.hostId(vSgMac); DefaultAnnotations.Builder annotations = DefaultAnnotations.builder() - .set(S_TAG, serviceVlan); + .set(S_TAG, serviceVlan) + .set(VSG_HOST_ID, vSgHost.id().toString()); HostDescription hostDesc = new DefaultHostDescription( vSgMac, @@ -529,6 +523,11 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro * @param host host */ private void serviceVmAdded(Host host) { + String serviceVlan = host.annotations().value(S_TAG); + if (serviceVlan != null) { + virtualSubscriberGatewayAdded(host, serviceVlan); + } + String vNetId = host.annotations().value(SERVICE_ID); if (vNetId == null) { // ignore this host, it is not the service VM, or it's a vSG @@ -538,8 +537,7 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro OpenstackNetwork vNet = openstackService.network(vNetId); if (vNet == null) { log.warn("Failed to get OpenStack network {} for VM {}({}).", - vNetId, - host.id(), + vNetId, host.id(), host.annotations().value(OPENSTACK_VM_ID)); return; } @@ -572,20 +570,6 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro registerDhcpLease(host, service); ruleInstaller.populateBasicConnectionRules(host, getTunnelIp(host), vNet); - - String serviceVlan = host.annotations().value(S_TAG); - if (serviceVlan != null) { - log.debug("vSG VM detected {}", host.id()); - Map vSgs = getSubscriberGateways(host); - vSgs.entrySet().stream() - .forEach(entry -> addVirtualSubscriberGateway( - host, - entry.getKey(), - entry.getValue(), - serviceVlan)); - - ruleInstaller.populateSubscriberGatewayRules(host, vSgs.keySet()); - } } /** @@ -594,21 +578,21 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro * @param host host */ private void serviceVmRemoved(Host host) { + String serviceVlan = host.annotations().value(S_TAG); + if (serviceVlan != null) { + virtualSubscriberGatewayRemoved(host); + } + String vNetId = host.annotations().value(SERVICE_ID); if (vNetId == null) { // ignore it, it's not the service VM or it's a vSG - String serviceVlan = host.annotations().value(S_TAG); - if (serviceVlan != null) { - log.info("vSG {} removed", host.id()); - } return; } OpenstackNetwork vNet = openstackService.network(vNetId); if (vNet == null) { log.warn("Failed to get OpenStack network {} for VM {}({}).", - vNetId, - host.id(), + vNetId, host.id(), host.annotations().value(OPENSTACK_VM_ID)); return; } @@ -642,6 +626,62 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro } } + + /** + * Handles virtual subscriber gateway VM or container. + * + * @param host new host with stag, it can be vsg VM or vsg + * @param serviceVlan service vlan + */ + private void virtualSubscriberGatewayAdded(Host host, String serviceVlan) { + Map vSgs; + Host vSgHost; + + String vSgHostId = host.annotations().value(VSG_HOST_ID); + if (vSgHostId == null) { + log.debug("vSG VM detected {}", host.id()); + + vSgHost = host; + vSgs = getSubscriberGateways(vSgHost); + vSgs.entrySet().stream().forEach(entry -> addVirtualSubscriberGateway( + vSgHost, + entry.getKey(), + entry.getValue(), + serviceVlan)); + } else { + vSgHost = hostService.getHost(HostId.hostId(vSgHostId)); + if (vSgHost == null) { + return; + } + + log.debug("vSG detected {}", host.id()); + vSgs = getSubscriberGateways(vSgHost); + } + + ruleInstaller.populateSubscriberGatewayRules(vSgHost, vSgs.keySet()); + } + + /** + * Handles virtual subscriber gateway removed. + * + * @param vSg vsg host to remove + */ + private void virtualSubscriberGatewayRemoved(Host vSg) { + String vSgHostId = vSg.annotations().value(VSG_HOST_ID); + if (vSgHostId == null) { + return; + } + + Host vSgHost = hostService.getHost(HostId.hostId(vSgHostId)); + if (vSgHost == null) { + return; + } + + log.info("vSG removed {}", vSg.id()); + Map vSgs = getSubscriberGateways(vSgHost); + ruleInstaller.populateSubscriberGatewayRules(vSgHost, vSgs.keySet()); + } + /** * Sets service network gateway MAC address and sends out gratuitous ARP to all * VMs to update the gateway MAC address. @@ -709,10 +749,14 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro switch (event.type()) { case HOST_ADDED: - eventExecutor.submit(() -> serviceVmAdded(host)); + if (mastershipService.isLocalMaster(host.location().deviceId())) { + eventExecutor.submit(() -> serviceVmAdded(host)); + } break; case HOST_REMOVED: - eventExecutor.submit(() -> serviceVmRemoved(host)); + if (mastershipService.isLocalMaster(host.location().deviceId())) { + eventExecutor.submit(() -> serviceVmRemoved(host)); + } break; default: break; diff --git a/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtnNodeManager.java b/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtnNodeManager.java index d4aceeb5a9..bcd76bc423 100644 --- a/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtnNodeManager.java +++ b/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtnNodeManager.java @@ -27,9 +27,10 @@ import org.onlab.packet.IpAddress; import org.onlab.util.ItemNotFoundException; import org.onlab.util.KryoNamespace; import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; import org.onosproject.core.ApplicationId; import org.onosproject.core.CoreService; -import org.onosproject.mastership.MastershipService; import org.onosproject.net.ConnectPoint; import org.onosproject.net.DefaultAnnotations; import org.onosproject.net.Device; @@ -70,6 +71,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @@ -149,7 +151,7 @@ public class CordVtnNodeManager { protected FlowRuleService flowRuleService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipService mastershipService; + protected LeadershipService leadershipService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected GroupService groupService; @@ -169,6 +171,7 @@ public class CordVtnNodeManager { private ConsistentMap nodeStore; private CordVtnRuleInstaller ruleInstaller; private ApplicationId appId; + private NodeId localNodeId; private enum NodeState implements CordVtnNodeState { @@ -217,6 +220,9 @@ public class CordVtnNodeManager { @Activate protected void active() { appId = coreService.getAppId(CordVtnService.CORDVTN_APP_ID); + localNodeId = clusterService.getLocalNode().id(); + leadershipService.runForLeadership(appId.name()); + nodeStore = storageService.consistentMapBuilder() .withSerializer(Serializer.using(NODE_SERIALIZER.build())) .withName("cordvtn-nodestore") @@ -227,7 +233,6 @@ public class CordVtnNodeManager { deviceService, driverService, groupService, - mastershipService, DEFAULT_TUNNEL); deviceService.addListener(deviceListener); @@ -242,6 +247,7 @@ public class CordVtnNodeManager { eventExecutor.shutdown(); nodeStore.clear(); + leadershipService.withdraw(appId.name()); } /** @@ -285,6 +291,13 @@ public class CordVtnNodeManager { return; } + NodeId leaderNodeId = leadershipService.getLeader(appId.name()); + log.debug("Node init requested, local: {} leader: {}", localNodeId, leaderNodeId); + if (!Objects.equals(localNodeId, leaderNodeId)) { + // only the leader performs node init + return; + } + NodeState state = getNodeState(node); log.debug("Init node: {} state: {}", node.hostname(), state.toString()); state.process(this, node); @@ -839,6 +852,12 @@ public class CordVtnNodeManager { @Override public void event(DeviceEvent event) { + NodeId leaderNodeId = leadershipService.getLeader(appId.name()); + if (!Objects.equals(localNodeId, leaderNodeId)) { + // only the leader processes events + return; + } + Device device = event.subject(); ConnectionHandler handler = (device.type().equals(SWITCH) ? bridgeHandler : ovsdbHandler); diff --git a/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtnRuleInstaller.java b/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtnRuleInstaller.java index 4c67587627..f3877e095f 100644 --- a/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtnRuleInstaller.java +++ b/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtnRuleInstaller.java @@ -30,7 +30,6 @@ import org.onlab.util.ItemNotFoundException; import org.onosproject.core.ApplicationId; import org.onosproject.core.DefaultGroupId; import org.onosproject.core.GroupId; -import org.onosproject.mastership.MastershipService; import org.onosproject.net.Device; import org.onosproject.net.DeviceId; import org.onosproject.net.Host; @@ -120,13 +119,13 @@ public class CordVtnRuleInstaller { private static final String PORT_NAME = "portName"; private static final String DATA_PLANE_INTF = "dataPlaneIntf"; private static final String S_TAG = "stag"; + private static final String OVS_HW_VERSION = "Open vSwitch"; private final ApplicationId appId; private final FlowRuleService flowRuleService; private final DeviceService deviceService; private final DriverService driverService; private final GroupService groupService; - private final MastershipService mastershipService; private final String tunnelType; /** @@ -137,7 +136,6 @@ public class CordVtnRuleInstaller { * @param deviceService device service * @param driverService driver service * @param groupService group service - * @param mastershipService mastership service * @param tunnelType tunnel type */ public CordVtnRuleInstaller(ApplicationId appId, @@ -145,14 +143,12 @@ public class CordVtnRuleInstaller { DeviceService deviceService, DriverService driverService, GroupService groupService, - MastershipService mastershipService, String tunnelType) { this.appId = appId; this.flowRuleService = flowRuleService; this.deviceService = deviceService; this.driverService = driverService; this.groupService = groupService; - this.mastershipService = mastershipService; this.tunnelType = checkNotNull(tunnelType); } @@ -187,10 +183,6 @@ public class CordVtnRuleInstaller { checkNotNull(vNet); DeviceId deviceId = host.location().deviceId(); - if (!mastershipService.isLocalMaster(deviceId)) { - return; - } - PortNumber inPort = host.location().port(); MacAddress dstMac = host.mac(); IpAddress hostIp = host.ipAddresses().stream().findFirst().get(); @@ -225,10 +217,6 @@ public class CordVtnRuleInstaller { PortNumber port = host.location().port(); IpAddress ip = host.ipAddresses().stream().findFirst().orElse(null); - if (!mastershipService.isLocalMaster(deviceId)) { - return; - } - for (FlowRule flowRule : flowRuleService.getFlowRulesById(appId)) { if (flowRule.deviceId().equals(deviceId)) { PortNumber inPort = getInPort(flowRule); @@ -284,6 +272,10 @@ public class CordVtnRuleInstaller { Map> inPorts = Maps.newHashMap(); for (Device device : deviceService.getAvailableDevices(SWITCH)) { + if (!device.hwVersion().equals(OVS_HW_VERSION)) { + continue; + } + GroupId groupId = createServiceGroup(device.id(), pService); outGroups.put(device.id(), groupId); @@ -320,12 +312,16 @@ public class CordVtnRuleInstaller { Map outGroups = Maps.newHashMap(); GroupKey groupKey = new DefaultGroupKey(pService.id().id().getBytes()); - deviceService.getAvailableDevices(SWITCH).forEach(device -> { + for (Device device : deviceService.getAvailableDevices(SWITCH)) { + if (!device.hwVersion().equals(OVS_HW_VERSION)) { + continue; + } + Group group = groupService.getGroup(device.id(), groupKey); if (group != null) { outGroups.put(device.id(), group.id()); } - }); + } for (FlowRule flowRule : flowRuleService.getFlowRulesById(appId)) { IpPrefix dstIp = getDstIpFromSelector(flowRule); @@ -368,11 +364,11 @@ public class CordVtnRuleInstaller { GroupKey groupKey = getGroupKey(service.id()); for (Device device : deviceService.getAvailableDevices(SWITCH)) { - DeviceId deviceId = device.id(); - if (!mastershipService.isLocalMaster(deviceId)) { + if (!device.hwVersion().equals(OVS_HW_VERSION)) { continue; } + DeviceId deviceId = device.id(); Group group = groupService.getGroup(deviceId, groupKey); if (group == null) { log.trace("No group exists for service {} in {}, do nothing.", service.id(), deviceId); @@ -421,10 +417,6 @@ public class CordVtnRuleInstaller { DeviceId deviceId = host.location().deviceId(); IpAddress hostIp = host.ipAddresses().stream().findFirst().get(); - if (!mastershipService.isLocalMaster(deviceId)) { - return; - } - TrafficSelector selector = DefaultTrafficSelector.builder() .matchEthType(Ethernet.TYPE_ARP) .matchArpTpa(mService.serviceIp().getIp4Address()) @@ -520,10 +512,6 @@ public class CordVtnRuleInstaller { */ public void removeManagementNetworkRules(Host host, CordService mService) { checkNotNull(mService); - - if (!mastershipService.isLocalMaster(host.location().deviceId())) { - return; - } // TODO remove management network specific rules } @@ -980,6 +968,10 @@ public class CordVtnRuleInstaller { .build(); for (Device device : deviceService.getAvailableDevices(SWITCH)) { + if (!device.hwVersion().equals(OVS_HW_VERSION)) { + continue; + } + FlowRule flowRuleDirect = DefaultFlowRule.builder() .fromApp(appId) .withSelector(selector) @@ -1011,6 +1003,10 @@ public class CordVtnRuleInstaller { .build(); for (Device device : deviceService.getAvailableDevices(SWITCH)) { + if (!device.hwVersion().equals(OVS_HW_VERSION)) { + continue; + } + FlowRule flowRuleDirect = DefaultFlowRule.builder() .fromApp(appId) .withSelector(selector) @@ -1138,6 +1134,10 @@ public class CordVtnRuleInstaller { processFlowRule(true, flowRule); for (Device device : deviceService.getAvailableDevices(SWITCH)) { + if (!device.hwVersion().equals(OVS_HW_VERSION)) { + continue; + } + if (device.id().equals(deviceId)) { continue; } diff --git a/protocols/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/driver/DefaultOvsdbClient.java b/protocols/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/driver/DefaultOvsdbClient.java index 9b6043d20c..2c7f8e6c84 100644 --- a/protocols/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/driver/DefaultOvsdbClient.java +++ b/protocols/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/driver/DefaultOvsdbClient.java @@ -578,11 +578,6 @@ public class DefaultOvsdbClient return false; } - String bridgeUuid = getBridgeUuid(bridgeName); - if (bridgeUuid != null) { - return false; - } - Bridge bridge = (Bridge) TableGenerator.createTable(dbSchema, OvsdbTable.BRIDGE); Set failMode = new HashSet<>(Arrays.asList("secure")); bridge.setFailMode(failMode); @@ -594,10 +589,15 @@ public class DefaultOvsdbClient options.put("datapath-id", dpid); bridge.setOtherConfig(options); - bridge.setName(bridgeName); - bridgeUuid = insertConfig(OvsdbConstant.BRIDGE, "_uuid", - OvsdbConstant.DATABASENAME, "bridges", - ovsUuid, bridge.getRow()); + String bridgeUuid = getBridgeUuid(bridgeName); + if (bridgeUuid == null) { + bridge.setName(bridgeName); + bridgeUuid = insertConfig(OvsdbConstant.BRIDGE, "_uuid", + OvsdbConstant.DATABASENAME, "bridges", + ovsUuid, bridge.getRow()); + } else { + updateConfig(OvsdbConstant.BRIDGE, "_uuid", bridgeUuid, bridge.getRow()); + } if (bridgeUuid != null) { createPort(bridgeName, bridgeName);