From 7d111d7a417e21be85ae68852129261ce37eeeaf Mon Sep 17 00:00:00 2001 From: Jian Li Date: Fri, 12 Apr 2019 13:58:44 +0900 Subject: [PATCH] Install rules for handling traffic destined to gateway at k8s node Change-Id: I3a3ce8ecc581aee7e8e70e338dbf7bf4a6c518db --- .../k8snetworking/api/Constants.java | 8 + .../impl/K8sFlowRuleManager.java | 103 +++++--- .../k8snetworking/impl/K8sServiceHandler.java | 45 ++-- .../impl/K8sSwitchingArpHandler.java | 20 +- .../impl/K8sSwitchingGatewayHandler.java | 230 ++++++++++++++++++ apps/k8s-node/api/BUILD | 5 +- .../k8snode/api/DefaultK8sNode.java | 24 ++ .../org/onosproject/k8snode/api/K8sNode.java | 8 + 8 files changed, 392 insertions(+), 51 deletions(-) create mode 100644 apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingGatewayHandler.java diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java index 50170fa6c0..1c3ee7f36e 100644 --- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java +++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java @@ -34,6 +34,11 @@ public final class Constants { public static final String NAT_STATELESS = "stateless"; public static final String DEFAULT_GATEWAY_MAC_STR = "fe:00:00:00:00:02"; + public static final String DEFAULT_ARP_MODE_STR = ARP_PROXY_MODE; + public static final String DEFAULT_HOST_MAC_STR = "fe:00:00:00:00:08"; + public static final String DEFAULT_SERVICE_IP_NAT_MODE_STR = NAT_STATELESS; + public static final String CONTROLLER_MAC_STR = "fe:00:00:00:00:10"; + public static final String SERVICE_FAKE_MAC_STR = "fe:00:00:00:00:20"; public static final MacAddress DEFAULT_GATEWAY_MAC = MacAddress.valueOf(DEFAULT_GATEWAY_MAC_STR); @@ -59,10 +64,13 @@ public final class Constants { public static final int PRIORITY_TUNNEL_TAG_RULE = 30000; public static final int PRIORITY_TRANSLATION_RULE = 30000; public static final int PRIORITY_CT_HOOK_RULE = 30500; + public static final int PRIORITY_INTER_ROUTING_RULE = 29000; public static final int PRIORITY_CT_RULE = 32000; public static final int PRIORITY_CT_DROP_RULE = 32500; public static final int PRIORITY_NAT_RULE = 30000; + public static final int PRIORITY_GATEWAY_RULE = 30000; public static final int PRIORITY_SWITCHING_RULE = 30000; + public static final int PRIORITY_CIDR_RULE = 30000; public static final int PRIORITY_ARP_GATEWAY_RULE = 41000; public static final int PRIORITY_ARP_SUBNET_RULE = 40000; public static final int PRIORITY_ARP_CONTROL_RULE = 40000; diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java index 678a2264ba..f6f103338a 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java @@ -17,6 +17,7 @@ package org.onosproject.k8snetworking.impl; import org.onlab.packet.Ethernet; import org.onlab.packet.IpPrefix; +import org.onlab.packet.MacAddress; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.LeadershipService; import org.onosproject.cluster.NodeId; @@ -33,6 +34,7 @@ import org.onosproject.k8snode.api.K8sNodeListener; import org.onosproject.k8snode.api.K8sNodeService; import org.onosproject.net.DeviceId; import org.onosproject.net.PortNumber; +import org.onosproject.net.device.DeviceService; import org.onosproject.net.flow.DefaultFlowRule; import org.onosproject.net.flow.DefaultTrafficSelector; import org.onosproject.net.flow.DefaultTrafficTreatment; @@ -60,14 +62,20 @@ import static org.onosproject.k8snetworking.api.Constants.DEFAULT_GATEWAY_MAC; import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE; import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE; import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID; +import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE; +import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE; import static org.onosproject.k8snetworking.api.Constants.PRIORITY_SNAT_RULE; import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE; +import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR; import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR; +import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR; import static org.onosproject.k8snetworking.api.Constants.STAT_INBOUND_TABLE; import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE; import static org.onosproject.k8snetworking.api.Constants.VTAG_TABLE; import static org.onosproject.k8snetworking.api.Constants.VTAP_INBOUND_TABLE; import static org.onosproject.k8snetworking.api.Constants.VTAP_OUTBOUND_TABLE; +import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId; +import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension; import static org.slf4j.LoggerFactory.getLogger; /** @@ -88,6 +96,9 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { @Reference(cardinality = ReferenceCardinality.MANDATORY) protected CoreService coreService; + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected DeviceService deviceService; + @Reference(cardinality = ReferenceCardinality.MANDATORY) protected ClusterService clusterService; @@ -227,6 +238,9 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { // for ARP and ACL table transition connectTables(deviceId, ARP_TABLE, JUMP_TABLE); + // for JUMP table transition to routing table + connectTables(deviceId, JUMP_TABLE, ROUTING_TABLE); + // for JUMP table transition // we need JUMP table for bypassing routing table which contains large // amount of flow rules which might cause performance degradation during @@ -280,22 +294,40 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { applyRule(flowRule, true); } - private void setAnyRoutingRule(IpPrefix srcIpPrefix, K8sNetwork k8sNetwork) { + private void setAnyRoutingRule(IpPrefix srcIpPrefix, MacAddress mac, + K8sNetwork k8sNetwork) { TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder() .matchEthType(Ethernet.TYPE_IPV4) .matchIPSrc(srcIpPrefix) .matchIPDst(IpPrefix.valueOf(k8sNetwork.cidr())); - TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() - .setTunnelId(Long.valueOf(k8sNetwork.segmentId())) - .transition(STAT_OUTBOUND_TABLE); - for (K8sNode node : k8sNodeService.completeNodes()) { + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() + .setTunnelId(Long.valueOf(k8sNetwork.segmentId())); + + if (node.hostname().equals(k8sNetwork.name())) { + if (mac != null) { + tBuilder.setEthSrc(mac); + } + tBuilder.transition(STAT_OUTBOUND_TABLE); + } else { + PortNumber portNum = tunnelPortNumByNetId(k8sNetwork.networkId(), + k8sNetworkService, node); + K8sNode localNode = k8sNodeService.node(k8sNetwork.name()); + + tBuilder.extension(buildExtension( + deviceService, + node.intgBridge(), + localNode.dataIp().getIp4Address()), + node.intgBridge()) + .setOutput(portNum); + } + FlowRule flowRule = DefaultFlowRule.builder() .forDevice(node.intgBridge()) .withSelector(sBuilder.build()) .withTreatment(tBuilder.build()) - .withPriority(HIGH_PRIORITY) + .withPriority(PRIORITY_CIDR_RULE) .fromApp(appId) .makePermanent() .forTable(ROUTING_TABLE) @@ -304,34 +336,41 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { } } + private void setGroupingRule(IpPrefix srcPrefix) { + TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder() + .matchEthType(Ethernet.TYPE_IPV4) + .matchIPSrc(srcPrefix); + + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() + .transition(ROUTING_TABLE); + + for (K8sNode node : k8sNodeService.completeNodes()) { + FlowRule flowRule = DefaultFlowRule.builder() + .forDevice(node.intgBridge()) + .withSelector(sBuilder.build()) + .withTreatment(tBuilder.build()) + .withPriority(PRIORITY_CT_RULE) + .fromApp(appId) + .makePermanent() + .forTable(JUMP_TABLE) + .build(); + applyRule(flowRule, true); + } + } + + private void setupTransientRoutingRule() { + setGroupingRule(IpPrefix.valueOf(SHIFTED_IP_CIDR)); + } + private void setupServiceRoutingRule(K8sNetwork k8sNetwork) { - setAnyRoutingRule(IpPrefix.valueOf(SERVICE_IP_CIDR), k8sNetwork); + setGroupingRule(IpPrefix.valueOf(SERVICE_IP_CIDR)); + setAnyRoutingRule(IpPrefix.valueOf(SERVICE_IP_CIDR), + MacAddress.valueOf(SERVICE_FAKE_MAC_STR), k8sNetwork); } private void setupHostRoutingRule(K8sNetwork k8sNetwork) { - setAnyRoutingRule(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32), k8sNetwork); - } - - private void setupGatewayRoutingRule(K8sNetwork k8sNetwork) { - TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder() - .matchEthType(Ethernet.TYPE_IPV4) - .matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32)); - - TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() - .setOutput(PortNumber.LOCAL); - - for (K8sNode node : k8sNodeService.completeNodes()) { - FlowRule flowRule = DefaultFlowRule.builder() - .forDevice(node.intgBridge()) - .withSelector(sBuilder.build()) - .withTreatment(tBuilder.build()) - .withPriority(HIGH_PRIORITY) - .fromApp(appId) - .makePermanent() - .forTable(ROUTING_TABLE) - .build(); - applyRule(flowRule, true); - } + setAnyRoutingRule(IpPrefix.valueOf( + k8sNetwork.gatewayIp(), 32), null, k8sNetwork); } private class InternalK8sNodeListener implements K8sNodeListener { @@ -360,10 +399,11 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { } initializePipeline(node); + setupTransientRoutingRule(); + k8sNetworkService.networks().forEach(n -> { setupHostRoutingRule(n); setupServiceRoutingRule(n); - setupGatewayRoutingRule(n); }); } } @@ -394,7 +434,6 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { } setupHostRoutingRule(network); - setupGatewayRoutingRule(network); setupServiceRoutingRule(network); } } diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java index 6a5f942449..5809aabd22 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java @@ -92,6 +92,7 @@ import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS; import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE; import static org.onosproject.k8snetworking.api.Constants.POD_TABLE; import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE; +import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE; import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE; import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE; import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR; @@ -101,6 +102,7 @@ import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX; import static org.onosproject.k8snetworking.api.Constants.SRC; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT; +import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE; import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap; import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG; import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket; @@ -261,13 +263,16 @@ public class K8sServiceHandler { private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) { + String srcCidr = k8sNetworkService.network( + k8sNodeService.node(deviceId).hostname()).cidr(); + k8sNetworkService.networks().forEach(n -> { - setSrcDstCidrRules(deviceId, n.cidr(), SERVICE_IP_CIDR, JUMP_TABLE, + setSrcDstCidrRules(deviceId, n.cidr(), SERVICE_IP_CIDR, null, JUMP_TABLE, SERVICE_TABLE, PRIORITY_CT_RULE, install); - setSrcDstCidrRules(deviceId, n.cidr(), SHIFTED_IP_CIDR, JUMP_TABLE, + setSrcDstCidrRules(deviceId, n.cidr(), SHIFTED_IP_CIDR, null, JUMP_TABLE, POD_TABLE, PRIORITY_CT_RULE, install); - setSrcDstCidrRules(deviceId, n.cidr(), n.cidr(), JUMP_TABLE, - ROUTING_TABLE, PRIORITY_CT_RULE, install); + setSrcDstCidrRules(deviceId, srcCidr, n.cidr(), n.segmentId(), ROUTING_TABLE, + STAT_OUTBOUND_TABLE, PRIORITY_INTER_ROUTING_RULE, install); }); // setup load balancing rules using group table @@ -277,7 +282,7 @@ public class K8sServiceHandler { } private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr, - String dstCidr, int installTable, + String dstCidr, String segId, int installTable, int transitTable, int priority, boolean install) { TrafficSelector selector = DefaultTrafficSelector.builder() .matchEthType(Ethernet.TYPE_IPV4) @@ -285,15 +290,18 @@ public class K8sServiceHandler { .matchIPDst(IpPrefix.valueOf(dstCidr)) .build(); - TrafficTreatment treatment = DefaultTrafficTreatment.builder() - .transition(transitTable) - .build(); + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder(); + + if (segId != null) { + tBuilder.setTunnelId(Long.valueOf(segId)); + } + tBuilder.transition(transitTable); k8sFlowRuleService.setRule( appId, deviceId, selector, - treatment, + tBuilder.build(), priority, installTable, install); @@ -385,11 +393,8 @@ public class K8sServiceHandler { ServicePort sp) { List bkts = Lists.newArrayList(); - ExtensionTreatment resubmitTreatment = buildResubmitExtension( - deviceService.getDevice(deviceId), ROUTING_TABLE); TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() - .setIpDst(IpAddress.valueOf(podIpStr)) - .extension(resubmitTreatment, deviceId); + .setIpDst(IpAddress.valueOf(podIpStr)); if (TCP.equals(sp.getProtocol())) { tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal())); @@ -397,6 +402,10 @@ public class K8sServiceHandler { tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal())); } + ExtensionTreatment resubmitTreatment = buildResubmitExtension( + deviceService.getDevice(deviceId), ROUTING_TABLE); + tBuilder.extension(resubmitTreatment, deviceId); + bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1)); return bkts; @@ -414,11 +423,9 @@ public class K8sServiceHandler { List bkts = Lists.newArrayList(); epas.forEach(epa -> { String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa); - ExtensionTreatment resubmitTreatment = buildResubmitExtension( - deviceService.getDevice(deviceId), ROUTING_TABLE); + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() - .setIpDst(IpAddress.valueOf(podIp)) - .extension(resubmitTreatment, deviceId); + .setIpDst(IpAddress.valueOf(podIp)); if (TCP.equals(sp.getProtocol())) { tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal())); @@ -426,6 +433,10 @@ public class K8sServiceHandler { tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal())); } + ExtensionTreatment resubmitTreatment = buildResubmitExtension( + deviceService.getDevice(deviceId), ROUTING_TABLE); + tBuilder.extension(resubmitTreatment, deviceId); + bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1)); }); spGrpBkts.put(sp, bkts); diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java index a518971a8e..89680b0e20 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java @@ -32,6 +32,7 @@ import org.onosproject.core.CoreService; import org.onosproject.k8snetworking.api.K8sFlowRuleService; import org.onosproject.k8snetworking.api.K8sNetworkService; import org.onosproject.k8snetworking.api.K8sPort; +import org.onosproject.k8snetworking.api.K8sServiceService; import org.onosproject.k8snode.api.K8sNode; import org.onosproject.k8snode.api.K8sNodeEvent; import org.onosproject.k8snode.api.K8sNodeListener; @@ -62,6 +63,7 @@ import java.util.Dictionary; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.onlab.util.Tools.groupedThreads; @@ -70,6 +72,7 @@ import static org.onosproject.k8snetworking.api.Constants.ARP_PROXY_MODE; import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE; import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID; import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE; +import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE_DEFAULT; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC; @@ -93,6 +96,9 @@ public class K8sSwitchingArpHandler { private static final String API_SERVER_CLUSTER_IP = "10.96.0.1"; + private static final String GATEWAY_MAC = "gatewayMac"; + private static final String ARP_MODE = "arpMode"; + @Reference(cardinality = ReferenceCardinality.MANDATORY) protected CoreService coreService; @@ -123,6 +129,9 @@ public class K8sSwitchingArpHandler { @Reference(cardinality = ReferenceCardinality.MANDATORY) protected K8sFlowRuleService k8sFlowRuleService; + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sServiceService k8sServiceService; + /** Fake MAC address for virtual network subnet gateway. */ private String gatewayMac = GATEWAY_MAC_DEFAULT; @@ -215,7 +224,7 @@ public class K8sSwitchingArpHandler { .filter(n -> n.gatewayIp().equals(targetIp)) .count(); - if (gwIpCnt > 0 || targetIp.equals(IpAddress.valueOf(API_SERVER_CLUSTER_IP))) { + if (gwIpCnt > 0) { replyMac = gwMacAddress; } @@ -233,6 +242,15 @@ public class K8sSwitchingArpHandler { } } + if (replyMac == null) { + Set serviceIps = k8sServiceService.services().stream() + .map(s -> s.getSpec().getClusterIP()) + .collect(Collectors.toSet()); + if (serviceIps.contains(targetIp.toString())) { + replyMac = MacAddress.valueOf(SERVICE_FAKE_MAC_STR); + } + } + if (replyMac == null) { log.debug("Failed to find MAC address for {}", targetIp); return; diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingGatewayHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingGatewayHandler.java new file mode 100644 index 0000000000..235897dc5f --- /dev/null +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingGatewayHandler.java @@ -0,0 +1,230 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snetworking.impl; + +import org.onlab.packet.Ethernet; +import org.onlab.packet.IpPrefix; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.k8snetworking.api.K8sFlowRuleService; +import org.onosproject.k8snetworking.api.K8sNetwork; +import org.onosproject.k8snetworking.api.K8sNetworkEvent; +import org.onosproject.k8snetworking.api.K8sNetworkListener; +import org.onosproject.k8snetworking.api.K8sNetworkService; +import org.onosproject.k8snode.api.K8sNode; +import org.onosproject.k8snode.api.K8sNodeEvent; +import org.onosproject.k8snode.api.K8sNodeListener; +import org.onosproject.k8snode.api.K8sNodeService; +import org.onosproject.net.PortNumber; +import org.onosproject.net.device.DeviceService; +import org.onosproject.net.driver.DriverService; +import org.onosproject.net.flow.DefaultTrafficSelector; +import org.onosproject.net.flow.DefaultTrafficTreatment; +import org.onosproject.net.flow.TrafficSelector; +import org.onosproject.net.flow.TrafficTreatment; +import org.onosproject.net.packet.PacketService; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.slf4j.Logger; + +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID; +import static org.onosproject.k8snetworking.api.Constants.PRIORITY_GATEWAY_RULE; +import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE; +import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId; +import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Populates switching flow rules on OVS for providing the connectivity between + * container and network gateway. + */ +@Component(immediate = true) +public class K8sSwitchingGatewayHandler { + + private final Logger log = getLogger(getClass()); + + private static final int GW_IP_PREFIX = 32; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected CoreService coreService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected LeadershipService leadershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected DeviceService deviceService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected DriverService driverService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected PacketService packetService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sFlowRuleService k8sFlowRuleService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sNetworkService k8sNetworkService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sNodeService k8sNodeService; + + private final ExecutorService eventExecutor = newSingleThreadExecutor( + groupedThreads(this.getClass().getSimpleName(), "event-handler")); + private final InternalK8sNetworkListener k8sNetworkListener = + new InternalK8sNetworkListener(); + private final InternalK8sNodeListener k8sNodeListener = + new InternalK8sNodeListener(); + + private ApplicationId appId; + private NodeId localNodeId; + + @Activate + protected void activate() { + appId = coreService.registerApplication(K8S_NETWORKING_APP_ID); + k8sNetworkService.addListener(k8sNetworkListener); + k8sNodeService.addListener(k8sNodeListener); + localNodeId = clusterService.getLocalNode().id(); + leadershipService.runForLeadership(appId.name()); + + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + k8sNodeService.removeListener(k8sNodeListener); + k8sNetworkService.removeListener(k8sNetworkListener); + leadershipService.withdraw(appId.name()); + eventExecutor.shutdown(); + + log.info("Stopped"); + } + + private void setGatewayRule(K8sNetwork k8sNetwork, boolean install) { + TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder() + .matchEthType(Ethernet.TYPE_IPV4) + .matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), GW_IP_PREFIX)); + + for (K8sNode node : k8sNodeService.completeNodes()) { + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder(); + + if (node.hostname().equals(k8sNetwork.name())) { + tBuilder.setEthDst(node.intBridgeMac()) + .setOutput(PortNumber.LOCAL); + } else { + PortNumber portNum = tunnelPortNumByNetId(k8sNetwork.networkId(), + k8sNetworkService, node); + K8sNode localNode = k8sNodeService.node(k8sNetwork.name()); + + tBuilder.extension(buildExtension( + deviceService, + node.intgBridge(), + localNode.dataIp().getIp4Address()), + node.intgBridge()) + .setOutput(portNum); + } + + k8sFlowRuleService.setRule( + appId, + node.intgBridge(), + sBuilder.build(), + tBuilder.build(), + PRIORITY_GATEWAY_RULE, + ROUTING_TABLE, + install); + } + } + + private class InternalK8sNetworkListener implements K8sNetworkListener { + + private boolean isRelevantHelper() { + return Objects.equals(localNodeId, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(K8sNetworkEvent event) { + switch (event.type()) { + case K8S_NETWORK_CREATED: + case K8S_NETWORK_UPDATED: + eventExecutor.execute(() -> processNetworkCreation(event)); + break; + case K8S_NETWORK_REMOVED: + eventExecutor.execute(() -> processNetworkRemoval(event)); + break; + default: + break; + } + } + + private void processNetworkCreation(K8sNetworkEvent event) { + if (!isRelevantHelper()) { + return; + } + + setGatewayRule(event.subject(), true); + } + + private void processNetworkRemoval(K8sNetworkEvent event) { + if (!isRelevantHelper()) { + return; + } + + setGatewayRule(event.subject(), false); + } + } + + private class InternalK8sNodeListener implements K8sNodeListener { + private boolean isRelevantHelper() { + return Objects.equals(localNodeId, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(K8sNodeEvent event) { + switch (event.type()) { + case K8S_NODE_COMPLETE: + eventExecutor.execute(() -> processNodeCompletion(event.subject())); + break; + case K8S_NODE_INCOMPLETE: + default: + break; + } + } + + private void processNodeCompletion(K8sNode node) { + log.info("COMPLETE node {} is detected", node.hostname()); + + if (!isRelevantHelper()) { + return; + } + + k8sNetworkService.networks().forEach(n -> setGatewayRule(n, true)); + } + } +} diff --git a/apps/k8s-node/api/BUILD b/apps/k8s-node/api/BUILD index 36d2da128f..55de911322 100644 --- a/apps/k8s-node/api/BUILD +++ b/apps/k8s-node/api/BUILD @@ -1,4 +1,7 @@ -COMPILE_DEPS = CORE_DEPS +COMPILE_DEPS = CORE_DEPS + [ + "//protocols/ovsdb/api:onos-protocols-ovsdb-api", + "//protocols/ovsdb/rfc:onos-protocols-ovsdb-rfc", +] TEST_DEPS = TEST_ADAPTERS + [ "//core/api:onos-api-tests", diff --git a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java index 3e26a50d81..c7c08d31bb 100644 --- a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java +++ b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java @@ -18,10 +18,16 @@ package org.onosproject.k8snode.api; import com.google.common.base.MoreObjects; import org.onlab.osgi.DefaultServiceDirectory; import org.onlab.packet.IpAddress; +import org.onlab.packet.MacAddress; import org.onosproject.net.DeviceId; import org.onosproject.net.Port; import org.onosproject.net.PortNumber; import org.onosproject.net.device.DeviceService; +import org.onosproject.ovsdb.controller.OvsdbClientService; +import org.onosproject.ovsdb.controller.OvsdbController; +import org.onosproject.ovsdb.controller.OvsdbNodeId; +import org.onosproject.ovsdb.rfc.notation.OvsdbMap; +import org.onosproject.ovsdb.rfc.table.Interface; import java.util.Objects; @@ -37,6 +43,9 @@ import static org.onosproject.net.AnnotationKeys.PORT_NAME; */ public class DefaultK8sNode implements K8sNode { + private static final int DEFAULT_OVSDB_PORT = 6640; + private static final String MAC_ADDRESS = "mac_address"; + private final String hostname; private final Type type; private final DeviceId intgBridge; @@ -153,6 +162,21 @@ public class DefaultK8sNode implements K8sNode { return port != null ? port.number() : null; } + @Override + public MacAddress intBridgeMac() { + OvsdbController ovsdbController = + DefaultServiceDirectory.getService(OvsdbController.class); + OvsdbNodeId ovsdb = new OvsdbNodeId(this.managementIp, DEFAULT_OVSDB_PORT); + OvsdbClientService client = ovsdbController.getOvsdbClient(ovsdb); + if (client == null) { + return null; + } + + Interface iface = client.getInterface(INTEGRATION_BRIDGE); + OvsdbMap data = (OvsdbMap) iface.getExternalIdsColumn().data(); + return MacAddress.valueOf((String) data.map().get(MAC_ADDRESS)); + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java index 671b37a724..2c539f39f5 100644 --- a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java +++ b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java @@ -16,6 +16,7 @@ package org.onosproject.k8snode.api; import org.onlab.packet.IpAddress; +import org.onlab.packet.MacAddress; import org.onosproject.net.DeviceId; import org.onosproject.net.PortNumber; @@ -132,6 +133,13 @@ public interface K8sNode { */ PortNumber intBridgePortNum(); + /** + * Returns the integration bridge's MAC address. + * + * @return MAC address; null if the MAC address does not exist + */ + MacAddress intBridgeMac(); + /** * Builder of new node entity. */