From 1b08d650d06d60a00ca6cf12ddd13d8340b87bfe Mon Sep 17 00:00:00 2001 From: Jian Li Date: Thu, 2 May 2019 17:28:09 +0900 Subject: [PATCH] Support to learn external gateway MAC at controller Change-Id: I72c13133708de1ac86e26160397233518489d46b --- .../impl/K8sRoutingArpHandler.java | 233 ++++++++++++++++++ .../impl/K8sRoutingSnatHandler.java | 10 +- .../impl/K8sSwitchingArpHandler.java | 2 + .../k8snode/api/DefaultK8sNode.java | 49 +++- .../org/onosproject/k8snode/api/K8sNode.java | 16 ++ 5 files changed, 296 insertions(+), 14 deletions(-) create mode 100644 apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java new file mode 100644 index 0000000000..f11aab44e9 --- /dev/null +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java @@ -0,0 +1,233 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snetworking.impl; + +import org.onlab.packet.ARP; +import org.onlab.packet.Ethernet; +import org.onlab.packet.Ip4Address; +import org.onlab.packet.IpAddress; +import org.onlab.packet.MacAddress; +import org.onlab.packet.VlanId; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.k8snetworking.api.K8sFlowRuleService; +import org.onosproject.k8snode.api.K8sNode; +import org.onosproject.k8snode.api.K8sNodeAdminService; +import org.onosproject.k8snode.api.K8sNodeEvent; +import org.onosproject.k8snode.api.K8sNodeListener; +import org.onosproject.net.flow.DefaultTrafficSelector; +import org.onosproject.net.flow.DefaultTrafficTreatment; +import org.onosproject.net.flow.TrafficSelector; +import org.onosproject.net.flow.TrafficTreatment; +import org.onosproject.net.packet.DefaultOutboundPacket; +import org.onosproject.net.packet.InboundPacket; +import org.onosproject.net.packet.PacketContext; +import org.onosproject.net.packet.PacketProcessor; +import org.onosproject.net.packet.PacketService; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import static java.lang.Thread.sleep; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE; +import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID; +import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_REPLY_RULE; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Handles ARP request/reply from external gateway. + */ +@Component(immediate = true) +public class K8sRoutingArpHandler { + private final Logger log = getLogger(getClass()); + + private static final long SLEEP_MS = 5000; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected CoreService coreService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected PacketService packetService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected LeadershipService leadershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sNodeAdminService k8sNodeService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sFlowRuleService k8sFlowRuleService; + + private ApplicationId appId; + private NodeId localNodeId; + + private final InternalK8sNodeListener k8sNodeListener = new InternalK8sNodeListener(); + private final ExecutorService eventExecutor = newSingleThreadExecutor( + groupedThreads(this.getClass().getSimpleName(), "event-handler", log)); + + private final PacketProcessor packetProcessor = new InternalPacketProcessor(); + + @Activate + protected void activate() { + appId = coreService.registerApplication(K8S_NETWORKING_APP_ID); + localNodeId = clusterService.getLocalNode().id(); + leadershipService.runForLeadership(appId.name()); + k8sNodeService.addListener(k8sNodeListener); + packetService.addProcessor(packetProcessor, PacketProcessor.director(1)); + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + k8sNodeService.removeListener(k8sNodeListener); + packetService.removeProcessor(packetProcessor); + leadershipService.withdraw(appId.name()); + eventExecutor.shutdown(); + log.info("Stopped"); + } + + private void processArpPacket(PacketContext context, Ethernet ethernet) { + ARP arp = (ARP) ethernet.getPayload(); + + if (arp.getOpCode() == ARP.OP_REPLY) { + IpAddress spa = Ip4Address.valueOf(arp.getSenderProtocolAddress()); + MacAddress sha = MacAddress.valueOf(arp.getSenderHardwareAddress()); + + log.info("ARP reply from external gateway ip: {}, mac: {}", spa, sha); + + Set gatewayIps = k8sNodeService.completeNodes().stream() + .map(K8sNode::extGatewayIp).collect(Collectors.toSet()); + + if (!gatewayIps.contains(spa)) { + return; + } + + k8sNodeService.completeNodes().stream() + .filter(n -> n.extGatewayMac() == null) + .forEach(n -> { + K8sNode updated = n.updateExtGatewayMac(sha); + k8sNodeService.updateNode(updated); + }); + } + } + + private void sendArpRequest(K8sNode k8sNode) { + MacAddress bridgeMac = k8sNode.extBridgeMac(); + IpAddress bridgeIp = k8sNode.extBridgeIp(); + IpAddress extGatewayIp = k8sNode.extGatewayIp(); + Ethernet ethRequest = ARP.buildArpRequest(bridgeMac.toBytes(), bridgeIp.toOctets(), + extGatewayIp.toOctets(), VlanId.NO_VID); + + TrafficTreatment treatment = DefaultTrafficTreatment.builder() + .setOutput(k8sNode.extBridgePortNum()) + .build(); + + packetService.emit(new DefaultOutboundPacket( + k8sNode.extBridge(), + treatment, + ByteBuffer.wrap(ethRequest.serialize()))); + } + + private void setArpReplyRule(K8sNode k8sNode, boolean install) { + TrafficSelector selector = DefaultTrafficSelector.builder() + .matchEthType(Ethernet.TYPE_ARP) + .matchArpOp(ARP.OP_REPLY) + .matchArpSpa(Ip4Address.valueOf(k8sNode.extGatewayIp().toString())) + .build(); + + TrafficTreatment treatment = DefaultTrafficTreatment.builder() + .punt() + .build(); + + k8sFlowRuleService.setRule( + appId, + k8sNode.extBridge(), + selector, + treatment, + PRIORITY_ARP_REPLY_RULE, + EXT_ENTRY_TABLE, + install + ); + } + + private class InternalK8sNodeListener implements K8sNodeListener { + + private boolean isRelevantHelper() { + return Objects.equals(localNodeId, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(K8sNodeEvent event) { + switch (event.type()) { + case K8S_NODE_COMPLETE: + eventExecutor.execute(() -> processNodeCompletion(event.subject())); + break; + case K8S_NODE_INCOMPLETE: + default: + break; + } + } + + private void processNodeCompletion(K8sNode k8sNode) { + if (!isRelevantHelper()) { + return; + } + + setArpReplyRule(k8sNode, true); + + try { + sleep(SLEEP_MS); + } catch (InterruptedException e) { + log.error("Exception caused during ARP requesting..."); + } + + sendArpRequest(k8sNode); + } + } + + private class InternalPacketProcessor implements PacketProcessor { + + @Override + public void process(PacketContext context) { + if (context.isHandled()) { + return; + } + + InboundPacket pkt = context.inPacket(); + Ethernet ethernet = pkt.parsed(); + if (ethernet != null && ethernet.getEtherType() == Ethernet.TYPE_ARP) { + eventExecutor.execute(() -> processArpPacket(context, ethernet)); + } + } + } +} diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingSnatHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingSnatHandler.java index 04913b61fc..fccbaafce0 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingSnatHandler.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingSnatHandler.java @@ -315,6 +315,9 @@ public class K8sRoutingSnatHandler { case K8S_NODE_COMPLETE: eventExecutor.execute(() -> processNodeCompletion(event.subject())); break; + case K8S_NODE_UPDATED: + eventExecutor.execute(() -> processNodeUpdate(event.subject())); + break; case K8S_NODE_INCOMPLETE: default: break; @@ -327,10 +330,15 @@ public class K8sRoutingSnatHandler { } setExtIntfArpRule(k8sNode, true); - setSnatUpstreamRule(k8sNode, true); setSnatDownstreamRule(k8sNode, true); setContainerToExtRule(k8sNode, true); } + + private void processNodeUpdate(K8sNode k8sNode) { + if (k8sNode.extGatewayMac() != null) { + setSnatUpstreamRule(k8sNode, true); + } + } } private class InternalK8sNetworkListener implements K8sNetworkListener { diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java index 779bbc61e7..33d4febc1d 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java @@ -292,6 +292,8 @@ public class K8sSwitchingArpHandler { context.inPacket().receivedFrom().deviceId(), treatment, ByteBuffer.wrap(ethReply.serialize()))); + + context.block(); } private String getArpMode() { diff --git a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java index 7be1e9a541..87e1f1d6e4 100644 --- a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java +++ b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java @@ -60,6 +60,7 @@ public class DefaultK8sNode implements K8sNode { private final IpAddress managementIp; private final IpAddress dataIp; private final K8sNodeState state; + private final MacAddress extGatewayMac; private static final String NOT_NULL_MSG = "Node % cannot be null"; @@ -75,10 +76,12 @@ public class DefaultK8sNode implements K8sNode { * @param managementIp management IP address * @param dataIp data IP address * @param state node state + * @param extGatewayMac external gateway MAC address */ protected DefaultK8sNode(String hostname, Type type, DeviceId intgBridge, DeviceId extBridge, IpAddress managementIp, - IpAddress dataIp, K8sNodeState state) { + IpAddress dataIp, K8sNodeState state, + MacAddress extGatewayMac) { this.hostname = hostname; this.type = type; this.intgBridge = intgBridge; @@ -86,6 +89,7 @@ public class DefaultK8sNode implements K8sNode { this.managementIp = managementIp; this.dataIp = dataIp; this.state = state; + this.extGatewayMac = extGatewayMac; } @Override @@ -123,6 +127,7 @@ public class DefaultK8sNode implements K8sNode { .managementIp(managementIp) .dataIp(dataIp) .state(state) + .extGatewayMac(extGatewayMac) .build(); } @@ -136,6 +141,7 @@ public class DefaultK8sNode implements K8sNode { .managementIp(managementIp) .dataIp(dataIp) .state(state) + .extGatewayMac(extGatewayMac) .build(); } @@ -163,9 +169,24 @@ public class DefaultK8sNode implements K8sNode { .managementIp(managementIp) .dataIp(dataIp) .state(newState) + .extGatewayMac(extGatewayMac) .build(); } + @Override + public K8sNode updateExtGatewayMac(MacAddress newMac) { + return new Builder() + .hostname(hostname) + .type(type) + .intgBridge(intgBridge) + .managementIp(managementIp) + .dataIp(dataIp) + .state(state) + .extGatewayMac(newMac) + .build(); + + } + @Override public PortNumber grePortNum() { return tunnelPortNum(GRE_TUNNEL); @@ -245,15 +266,7 @@ public class DefaultK8sNode implements K8sNode { @Override public MacAddress extGatewayMac() { - OvsdbClientService client = getOvsClient(); - - if (client == null) { - return null; - } - - Interface iface = getOvsClient().getInterface(EXTERNAL_BRIDGE); - OvsdbMap data = (OvsdbMap) iface.getExternalIdsColumn().data(); - return MacAddress.valueOf((String) data.map().get(EXT_GW_MAC)); + return extGatewayMac; } @Override @@ -308,7 +321,7 @@ public class DefaultK8sNode implements K8sNode { @Override public int hashCode() { return Objects.hash(hostname, type, intgBridge, extBridge, - managementIp, dataIp, state); + managementIp, dataIp, state, extGatewayMac); } @Override @@ -321,6 +334,7 @@ public class DefaultK8sNode implements K8sNode { .add("managementIp", managementIp) .add("dataIp", dataIp) .add("state", state) + .add("extGatewayMac", extGatewayMac) .toString(); } @@ -376,7 +390,8 @@ public class DefaultK8sNode implements K8sNode { .extBridge(node.extBridge()) .managementIp(node.managementIp()) .dataIp(node.dataIp()) - .state(node.state()); + .state(node.state()) + .extGatewayMac(node.extGatewayMac()); } public static final class Builder implements K8sNode.Builder { @@ -389,6 +404,7 @@ public class DefaultK8sNode implements K8sNode { private IpAddress dataIp; private K8sNodeState state; private K8sApiConfig apiConfig; + private MacAddress extGatewayMac; // private constructor not intended to use from external private Builder() { @@ -407,7 +423,8 @@ public class DefaultK8sNode implements K8sNode { extBridge, managementIp, dataIp, - state); + state, + extGatewayMac); } @Override @@ -451,5 +468,11 @@ public class DefaultK8sNode implements K8sNode { this.state = state; return this; } + + @Override + public Builder extGatewayMac(MacAddress extGatewayMac) { + this.extGatewayMac = extGatewayMac; + return this; + } } } diff --git a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java index 30449ff5ab..065f2ddbd8 100644 --- a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java +++ b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java @@ -120,6 +120,14 @@ public interface K8sNode { */ K8sNode updateState(K8sNodeState newState); + /** + * Returns new kubernetes node instance with given external gateway MAC address. + * + * @param macAddress updated MAC address + * @return updated kubernetes node + */ + K8sNode updateExtGatewayMac(MacAddress macAddress); + /** * Returns the GRE tunnel port number. * @@ -271,5 +279,13 @@ public interface K8sNode { * @return kubernetes node builder */ Builder state(K8sNodeState state); + + /** + * Returns kubernetes node builder with supplied external gateway MAC. + * + * @param extGatewayMac external gateway MAC address + * @return kubernetes node builder + */ + Builder extGatewayMac(MacAddress extGatewayMac); } }