Install rules for handling traffic destined to gateway at k8s node

Change-Id: I3a3ce8ecc581aee7e8e70e338dbf7bf4a6c518db
This commit is contained in:
Jian Li 2019-04-12 13:58:44 +09:00
parent 5c75583b9a
commit 7d111d7a41
8 changed files with 392 additions and 51 deletions

View File

@ -34,6 +34,11 @@ public final class Constants {
public static final String NAT_STATELESS = "stateless";
public static final String DEFAULT_GATEWAY_MAC_STR = "fe:00:00:00:00:02";
public static final String DEFAULT_ARP_MODE_STR = ARP_PROXY_MODE;
public static final String DEFAULT_HOST_MAC_STR = "fe:00:00:00:00:08";
public static final String DEFAULT_SERVICE_IP_NAT_MODE_STR = NAT_STATELESS;
public static final String CONTROLLER_MAC_STR = "fe:00:00:00:00:10";
public static final String SERVICE_FAKE_MAC_STR = "fe:00:00:00:00:20";
public static final MacAddress DEFAULT_GATEWAY_MAC =
MacAddress.valueOf(DEFAULT_GATEWAY_MAC_STR);
@ -59,10 +64,13 @@ public final class Constants {
public static final int PRIORITY_TUNNEL_TAG_RULE = 30000;
public static final int PRIORITY_TRANSLATION_RULE = 30000;
public static final int PRIORITY_CT_HOOK_RULE = 30500;
public static final int PRIORITY_INTER_ROUTING_RULE = 29000;
public static final int PRIORITY_CT_RULE = 32000;
public static final int PRIORITY_CT_DROP_RULE = 32500;
public static final int PRIORITY_NAT_RULE = 30000;
public static final int PRIORITY_GATEWAY_RULE = 30000;
public static final int PRIORITY_SWITCHING_RULE = 30000;
public static final int PRIORITY_CIDR_RULE = 30000;
public static final int PRIORITY_ARP_GATEWAY_RULE = 41000;
public static final int PRIORITY_ARP_SUBNET_RULE = 40000;
public static final int PRIORITY_ARP_CONTROL_RULE = 40000;

View File

@ -17,6 +17,7 @@ package org.onosproject.k8snetworking.impl;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
@ -33,6 +34,7 @@ import org.onosproject.k8snode.api.K8sNodeListener;
import org.onosproject.k8snode.api.K8sNodeService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@ -60,14 +62,20 @@ import static org.onosproject.k8snetworking.api.Constants.DEFAULT_GATEWAY_MAC;
import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE;
import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_SNAT_RULE;
import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR;
import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
import static org.onosproject.k8snetworking.api.Constants.STAT_INBOUND_TABLE;
import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE;
import static org.onosproject.k8snetworking.api.Constants.VTAG_TABLE;
import static org.onosproject.k8snetworking.api.Constants.VTAP_INBOUND_TABLE;
import static org.onosproject.k8snetworking.api.Constants.VTAP_OUTBOUND_TABLE;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
import static org.slf4j.LoggerFactory.getLogger;
/**
@ -88,6 +96,9 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
@ -227,6 +238,9 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
// for ARP and ACL table transition
connectTables(deviceId, ARP_TABLE, JUMP_TABLE);
// for JUMP table transition to routing table
connectTables(deviceId, JUMP_TABLE, ROUTING_TABLE);
// for JUMP table transition
// we need JUMP table for bypassing routing table which contains large
// amount of flow rules which might cause performance degradation during
@ -280,22 +294,40 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
applyRule(flowRule, true);
}
private void setAnyRoutingRule(IpPrefix srcIpPrefix, K8sNetwork k8sNetwork) {
private void setAnyRoutingRule(IpPrefix srcIpPrefix, MacAddress mac,
K8sNetwork k8sNetwork) {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPSrc(srcIpPrefix)
.matchIPDst(IpPrefix.valueOf(k8sNetwork.cidr()));
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setTunnelId(Long.valueOf(k8sNetwork.segmentId()))
.transition(STAT_OUTBOUND_TABLE);
for (K8sNode node : k8sNodeService.completeNodes()) {
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setTunnelId(Long.valueOf(k8sNetwork.segmentId()));
if (node.hostname().equals(k8sNetwork.name())) {
if (mac != null) {
tBuilder.setEthSrc(mac);
}
tBuilder.transition(STAT_OUTBOUND_TABLE);
} else {
PortNumber portNum = tunnelPortNumByNetId(k8sNetwork.networkId(),
k8sNetworkService, node);
K8sNode localNode = k8sNodeService.node(k8sNetwork.name());
tBuilder.extension(buildExtension(
deviceService,
node.intgBridge(),
localNode.dataIp().getIp4Address()),
node.intgBridge())
.setOutput(portNum);
}
FlowRule flowRule = DefaultFlowRule.builder()
.forDevice(node.intgBridge())
.withSelector(sBuilder.build())
.withTreatment(tBuilder.build())
.withPriority(HIGH_PRIORITY)
.withPriority(PRIORITY_CIDR_RULE)
.fromApp(appId)
.makePermanent()
.forTable(ROUTING_TABLE)
@ -304,34 +336,41 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
}
}
private void setGroupingRule(IpPrefix srcPrefix) {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPSrc(srcPrefix);
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.transition(ROUTING_TABLE);
for (K8sNode node : k8sNodeService.completeNodes()) {
FlowRule flowRule = DefaultFlowRule.builder()
.forDevice(node.intgBridge())
.withSelector(sBuilder.build())
.withTreatment(tBuilder.build())
.withPriority(PRIORITY_CT_RULE)
.fromApp(appId)
.makePermanent()
.forTable(JUMP_TABLE)
.build();
applyRule(flowRule, true);
}
}
private void setupTransientRoutingRule() {
setGroupingRule(IpPrefix.valueOf(SHIFTED_IP_CIDR));
}
private void setupServiceRoutingRule(K8sNetwork k8sNetwork) {
setAnyRoutingRule(IpPrefix.valueOf(SERVICE_IP_CIDR), k8sNetwork);
setGroupingRule(IpPrefix.valueOf(SERVICE_IP_CIDR));
setAnyRoutingRule(IpPrefix.valueOf(SERVICE_IP_CIDR),
MacAddress.valueOf(SERVICE_FAKE_MAC_STR), k8sNetwork);
}
private void setupHostRoutingRule(K8sNetwork k8sNetwork) {
setAnyRoutingRule(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32), k8sNetwork);
}
private void setupGatewayRoutingRule(K8sNetwork k8sNetwork) {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32));
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setOutput(PortNumber.LOCAL);
for (K8sNode node : k8sNodeService.completeNodes()) {
FlowRule flowRule = DefaultFlowRule.builder()
.forDevice(node.intgBridge())
.withSelector(sBuilder.build())
.withTreatment(tBuilder.build())
.withPriority(HIGH_PRIORITY)
.fromApp(appId)
.makePermanent()
.forTable(ROUTING_TABLE)
.build();
applyRule(flowRule, true);
}
setAnyRoutingRule(IpPrefix.valueOf(
k8sNetwork.gatewayIp(), 32), null, k8sNetwork);
}
private class InternalK8sNodeListener implements K8sNodeListener {
@ -360,10 +399,11 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
}
initializePipeline(node);
setupTransientRoutingRule();
k8sNetworkService.networks().forEach(n -> {
setupHostRoutingRule(n);
setupServiceRoutingRule(n);
setupGatewayRoutingRule(n);
});
}
}
@ -394,7 +434,6 @@ public class K8sFlowRuleManager implements K8sFlowRuleService {
}
setupHostRoutingRule(network);
setupGatewayRoutingRule(network);
setupServiceRoutingRule(network);
}
}

View File

@ -92,6 +92,7 @@ import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR;
@ -101,6 +102,7 @@ import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
import static org.onosproject.k8snetworking.api.Constants.SRC;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
@ -261,13 +263,16 @@ public class K8sServiceHandler {
private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
String srcCidr = k8sNetworkService.network(
k8sNodeService.node(deviceId).hostname()).cidr();
k8sNetworkService.networks().forEach(n -> {
setSrcDstCidrRules(deviceId, n.cidr(), SERVICE_IP_CIDR, JUMP_TABLE,
setSrcDstCidrRules(deviceId, n.cidr(), SERVICE_IP_CIDR, null, JUMP_TABLE,
SERVICE_TABLE, PRIORITY_CT_RULE, install);
setSrcDstCidrRules(deviceId, n.cidr(), SHIFTED_IP_CIDR, JUMP_TABLE,
setSrcDstCidrRules(deviceId, n.cidr(), SHIFTED_IP_CIDR, null, JUMP_TABLE,
POD_TABLE, PRIORITY_CT_RULE, install);
setSrcDstCidrRules(deviceId, n.cidr(), n.cidr(), JUMP_TABLE,
ROUTING_TABLE, PRIORITY_CT_RULE, install);
setSrcDstCidrRules(deviceId, srcCidr, n.cidr(), n.segmentId(), ROUTING_TABLE,
STAT_OUTBOUND_TABLE, PRIORITY_INTER_ROUTING_RULE, install);
});
// setup load balancing rules using group table
@ -277,7 +282,7 @@ public class K8sServiceHandler {
}
private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
String dstCidr, int installTable,
String dstCidr, String segId, int installTable,
int transitTable, int priority, boolean install) {
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
@ -285,15 +290,18 @@ public class K8sServiceHandler {
.matchIPDst(IpPrefix.valueOf(dstCidr))
.build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.transition(transitTable)
.build();
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
if (segId != null) {
tBuilder.setTunnelId(Long.valueOf(segId));
}
tBuilder.transition(transitTable);
k8sFlowRuleService.setRule(
appId,
deviceId,
selector,
treatment,
tBuilder.build(),
priority,
installTable,
install);
@ -385,11 +393,8 @@ public class K8sServiceHandler {
ServicePort sp) {
List<GroupBucket> bkts = Lists.newArrayList();
ExtensionTreatment resubmitTreatment = buildResubmitExtension(
deviceService.getDevice(deviceId), ROUTING_TABLE);
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setIpDst(IpAddress.valueOf(podIpStr))
.extension(resubmitTreatment, deviceId);
.setIpDst(IpAddress.valueOf(podIpStr));
if (TCP.equals(sp.getProtocol())) {
tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
@ -397,6 +402,10 @@ public class K8sServiceHandler {
tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
}
ExtensionTreatment resubmitTreatment = buildResubmitExtension(
deviceService.getDevice(deviceId), ROUTING_TABLE);
tBuilder.extension(resubmitTreatment, deviceId);
bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
return bkts;
@ -414,11 +423,9 @@ public class K8sServiceHandler {
List<GroupBucket> bkts = Lists.newArrayList();
epas.forEach(epa -> {
String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
ExtensionTreatment resubmitTreatment = buildResubmitExtension(
deviceService.getDevice(deviceId), ROUTING_TABLE);
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setIpDst(IpAddress.valueOf(podIp))
.extension(resubmitTreatment, deviceId);
.setIpDst(IpAddress.valueOf(podIp));
if (TCP.equals(sp.getProtocol())) {
tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
@ -426,6 +433,10 @@ public class K8sServiceHandler {
tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
}
ExtensionTreatment resubmitTreatment = buildResubmitExtension(
deviceService.getDevice(deviceId), ROUTING_TABLE);
tBuilder.extension(resubmitTreatment, deviceId);
bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
});
spGrpBkts.put(sp, bkts);

View File

@ -32,6 +32,7 @@ import org.onosproject.core.CoreService;
import org.onosproject.k8snetworking.api.K8sFlowRuleService;
import org.onosproject.k8snetworking.api.K8sNetworkService;
import org.onosproject.k8snetworking.api.K8sPort;
import org.onosproject.k8snetworking.api.K8sServiceService;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeEvent;
import org.onosproject.k8snode.api.K8sNodeListener;
@ -62,6 +63,7 @@ import java.util.Dictionary;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
@ -70,6 +72,7 @@ import static org.onosproject.k8snetworking.api.Constants.ARP_PROXY_MODE;
import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE;
import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE_DEFAULT;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC;
@ -93,6 +96,9 @@ public class K8sSwitchingArpHandler {
private static final String API_SERVER_CLUSTER_IP = "10.96.0.1";
private static final String GATEWAY_MAC = "gatewayMac";
private static final String ARP_MODE = "arpMode";
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@ -123,6 +129,9 @@ public class K8sSwitchingArpHandler {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sFlowRuleService k8sFlowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sServiceService k8sServiceService;
/** Fake MAC address for virtual network subnet gateway. */
private String gatewayMac = GATEWAY_MAC_DEFAULT;
@ -215,7 +224,7 @@ public class K8sSwitchingArpHandler {
.filter(n -> n.gatewayIp().equals(targetIp))
.count();
if (gwIpCnt > 0 || targetIp.equals(IpAddress.valueOf(API_SERVER_CLUSTER_IP))) {
if (gwIpCnt > 0) {
replyMac = gwMacAddress;
}
@ -233,6 +242,15 @@ public class K8sSwitchingArpHandler {
}
}
if (replyMac == null) {
Set<String> serviceIps = k8sServiceService.services().stream()
.map(s -> s.getSpec().getClusterIP())
.collect(Collectors.toSet());
if (serviceIps.contains(targetIp.toString())) {
replyMac = MacAddress.valueOf(SERVICE_FAKE_MAC_STR);
}
}
if (replyMac == null) {
log.debug("Failed to find MAC address for {}", targetIp);
return;

View File

@ -0,0 +1,230 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.k8snetworking.impl;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snetworking.api.K8sFlowRuleService;
import org.onosproject.k8snetworking.api.K8sNetwork;
import org.onosproject.k8snetworking.api.K8sNetworkEvent;
import org.onosproject.k8snetworking.api.K8sNetworkListener;
import org.onosproject.k8snetworking.api.K8sNetworkService;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeEvent;
import org.onosproject.k8snode.api.K8sNodeListener;
import org.onosproject.k8snode.api.K8sNodeService;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.packet.PacketService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_GATEWAY_RULE;
import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Populates switching flow rules on OVS for providing the connectivity between
* container and network gateway.
*/
@Component(immediate = true)
public class K8sSwitchingGatewayHandler {
private final Logger log = getLogger(getClass());
private static final int GW_IP_PREFIX = 32;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sFlowRuleService k8sFlowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sNetworkService k8sNetworkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sNodeService k8sNodeService;
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler"));
private final InternalK8sNetworkListener k8sNetworkListener =
new InternalK8sNetworkListener();
private final InternalK8sNodeListener k8sNodeListener =
new InternalK8sNodeListener();
private ApplicationId appId;
private NodeId localNodeId;
@Activate
protected void activate() {
appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
k8sNetworkService.addListener(k8sNetworkListener);
k8sNodeService.addListener(k8sNodeListener);
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
log.info("Started");
}
@Deactivate
protected void deactivate() {
k8sNodeService.removeListener(k8sNodeListener);
k8sNetworkService.removeListener(k8sNetworkListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
private void setGatewayRule(K8sNetwork k8sNetwork, boolean install) {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), GW_IP_PREFIX));
for (K8sNode node : k8sNodeService.completeNodes()) {
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
if (node.hostname().equals(k8sNetwork.name())) {
tBuilder.setEthDst(node.intBridgeMac())
.setOutput(PortNumber.LOCAL);
} else {
PortNumber portNum = tunnelPortNumByNetId(k8sNetwork.networkId(),
k8sNetworkService, node);
K8sNode localNode = k8sNodeService.node(k8sNetwork.name());
tBuilder.extension(buildExtension(
deviceService,
node.intgBridge(),
localNode.dataIp().getIp4Address()),
node.intgBridge())
.setOutput(portNum);
}
k8sFlowRuleService.setRule(
appId,
node.intgBridge(),
sBuilder.build(),
tBuilder.build(),
PRIORITY_GATEWAY_RULE,
ROUTING_TABLE,
install);
}
}
private class InternalK8sNetworkListener implements K8sNetworkListener {
private boolean isRelevantHelper() {
return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
@Override
public void event(K8sNetworkEvent event) {
switch (event.type()) {
case K8S_NETWORK_CREATED:
case K8S_NETWORK_UPDATED:
eventExecutor.execute(() -> processNetworkCreation(event));
break;
case K8S_NETWORK_REMOVED:
eventExecutor.execute(() -> processNetworkRemoval(event));
break;
default:
break;
}
}
private void processNetworkCreation(K8sNetworkEvent event) {
if (!isRelevantHelper()) {
return;
}
setGatewayRule(event.subject(), true);
}
private void processNetworkRemoval(K8sNetworkEvent event) {
if (!isRelevantHelper()) {
return;
}
setGatewayRule(event.subject(), false);
}
}
private class InternalK8sNodeListener implements K8sNodeListener {
private boolean isRelevantHelper() {
return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
@Override
public void event(K8sNodeEvent event) {
switch (event.type()) {
case K8S_NODE_COMPLETE:
eventExecutor.execute(() -> processNodeCompletion(event.subject()));
break;
case K8S_NODE_INCOMPLETE:
default:
break;
}
}
private void processNodeCompletion(K8sNode node) {
log.info("COMPLETE node {} is detected", node.hostname());
if (!isRelevantHelper()) {
return;
}
k8sNetworkService.networks().forEach(n -> setGatewayRule(n, true));
}
}
}

View File

@ -1,4 +1,7 @@
COMPILE_DEPS = CORE_DEPS
COMPILE_DEPS = CORE_DEPS + [
"//protocols/ovsdb/api:onos-protocols-ovsdb-api",
"//protocols/ovsdb/rfc:onos-protocols-ovsdb-rfc",
]
TEST_DEPS = TEST_ADAPTERS + [
"//core/api:onos-api-tests",

View File

@ -18,10 +18,16 @@ package org.onosproject.k8snode.api;
import com.google.common.base.MoreObjects;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.ovsdb.controller.OvsdbClientService;
import org.onosproject.ovsdb.controller.OvsdbController;
import org.onosproject.ovsdb.controller.OvsdbNodeId;
import org.onosproject.ovsdb.rfc.notation.OvsdbMap;
import org.onosproject.ovsdb.rfc.table.Interface;
import java.util.Objects;
@ -37,6 +43,9 @@ import static org.onosproject.net.AnnotationKeys.PORT_NAME;
*/
public class DefaultK8sNode implements K8sNode {
private static final int DEFAULT_OVSDB_PORT = 6640;
private static final String MAC_ADDRESS = "mac_address";
private final String hostname;
private final Type type;
private final DeviceId intgBridge;
@ -153,6 +162,21 @@ public class DefaultK8sNode implements K8sNode {
return port != null ? port.number() : null;
}
@Override
public MacAddress intBridgeMac() {
OvsdbController ovsdbController =
DefaultServiceDirectory.getService(OvsdbController.class);
OvsdbNodeId ovsdb = new OvsdbNodeId(this.managementIp, DEFAULT_OVSDB_PORT);
OvsdbClientService client = ovsdbController.getOvsdbClient(ovsdb);
if (client == null) {
return null;
}
Interface iface = client.getInterface(INTEGRATION_BRIDGE);
OvsdbMap data = (OvsdbMap) iface.getExternalIdsColumn().data();
return MacAddress.valueOf((String) data.map().get(MAC_ADDRESS));
}
@Override
public boolean equals(Object obj) {
if (this == obj) {

View File

@ -16,6 +16,7 @@
package org.onosproject.k8snode.api;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
@ -132,6 +133,13 @@ public interface K8sNode {
*/
PortNumber intBridgePortNum();
/**
* Returns the integration bridge's MAC address.
*
* @return MAC address; null if the MAC address does not exist
*/
MacAddress intBridgeMac();
/**
* Builder of new node entity.
*/