diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java index feb81fad63..7e92f5cbb3 100644 --- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java +++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java @@ -87,27 +87,26 @@ public final class Constants { public static final int PRIORITY_NODE_PORT_INTER_RULE = 40000; // flow table index - public static final int STAT_INBOUND_TABLE = 0; - public static final int VTAP_INBOUND_TABLE = 1; - public static final int VTAP_INBOUND_MIRROR_TABLE = 2; + public static final int STAT_INGRESS_TABLE = 0; + public static final int VTAP_INGRESS_TABLE = 1; + public static final int VTAP_INGRESS_MIRROR_TABLE = 2; public static final int VTAG_TABLE = 30; public static final int ARP_TABLE = 35; - public static final int ACL_EGRESS_TABLE = 40; - public static final int ACL_INGRESS_TABLE = 44; - public static final int CT_TABLE = 45; - public static final int ACL_RECIRC_TABLE = 43; - public static final int JUMP_TABLE = 50; + public static final int JUMP_TABLE = 40; + public static final int GROUPING_TABLE = 50; public static final int NAT_TABLE = 51; public static final int SERVICE_TABLE = 52; public static final int POD_TABLE = 53; + public static final int ACL_TABLE = 55; + public static final int ACL_INGRESS_TABLE = 56; + public static final int ACL_EGRESS_TABLE = 58; public static final int ROUTING_TABLE = 60; - public static final int STAT_OUTBOUND_TABLE = 70; - public static final int VTAP_OUTBOUND_TABLE = 71; - public static final int VTAP_OUTBOUND_MIRROR_TABLE = 72; + public static final int STAT_EGRESS_TABLE = 70; + public static final int VTAP_EGRESS_TABLE = 71; + public static final int VTAP_EGRESS_MIRROR_TABLE = 72; public static final int FORWARDING_TABLE = 80; public static final int ERROR_TABLE = 100; public static final int EXT_ENTRY_TABLE = 0; public static final int POD_RESOLUTION_TABLE = 11; - public static final int INBOUND_TABLE = 10; } diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java index 4ed2f994a2..4a0c64c7f0 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java @@ -56,20 +56,21 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static org.onlab.util.Tools.groupedThreads; -import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_TABLE; +import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE; +import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE; import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE; import static org.onosproject.k8snetworking.api.Constants.DEFAULT_GATEWAY_MAC; import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE; -import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE; +import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE; import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID; import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE; import static org.onosproject.k8snetworking.api.Constants.PRIORITY_SNAT_RULE; import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE; -import static org.onosproject.k8snetworking.api.Constants.STAT_INBOUND_TABLE; -import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE; +import static org.onosproject.k8snetworking.api.Constants.STAT_INGRESS_TABLE; +import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE; import static org.onosproject.k8snetworking.api.Constants.VTAG_TABLE; -import static org.onosproject.k8snetworking.api.Constants.VTAP_INBOUND_TABLE; -import static org.onosproject.k8snetworking.api.Constants.VTAP_OUTBOUND_TABLE; +import static org.onosproject.k8snetworking.api.Constants.VTAP_INGRESS_TABLE; +import static org.onosproject.k8snetworking.api.Constants.VTAP_EGRESS_TABLE; import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId; import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension; import static org.slf4j.LoggerFactory.getLogger; @@ -223,32 +224,35 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { DeviceId deviceId = k8sNode.intgBridge(); // for inbound table transition - connectTables(deviceId, STAT_INBOUND_TABLE, VTAP_INBOUND_TABLE); - connectTables(deviceId, VTAP_INBOUND_TABLE, VTAG_TABLE); + connectTables(deviceId, STAT_INGRESS_TABLE, VTAP_INGRESS_TABLE); + connectTables(deviceId, VTAP_INGRESS_TABLE, VTAG_TABLE); // for vTag and ARP table transition connectTables(deviceId, VTAG_TABLE, ARP_TABLE); - connectTables(deviceId, ACL_EGRESS_TABLE, JUMP_TABLE); + connectTables(deviceId, JUMP_TABLE, GROUPING_TABLE); // for ARP and ACL table transition - connectTables(deviceId, ARP_TABLE, JUMP_TABLE); + connectTables(deviceId, ARP_TABLE, GROUPING_TABLE); - // for JUMP table transition to routing table - connectTables(deviceId, JUMP_TABLE, ROUTING_TABLE); + // for grouping table transition to ACL table + connectTables(deviceId, GROUPING_TABLE, ACL_TABLE); - // for JUMP table transition - // we need JUMP table for bypassing routing table which contains large + // for ACL table transition to routing table + connectTables(deviceId, ACL_TABLE, ROUTING_TABLE); + + // for grouping table transition + // we need grouping table for bypassing routing table which contains large // amount of flow rules which might cause performance degradation during // table lookup // setupJumpTable(k8sNode); // for routing and outbound table transition - connectTables(deviceId, ROUTING_TABLE, STAT_OUTBOUND_TABLE); + connectTables(deviceId, ROUTING_TABLE, STAT_EGRESS_TABLE); // for outbound table transition - connectTables(deviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE); - connectTables(deviceId, VTAP_OUTBOUND_TABLE, FORWARDING_TABLE); + connectTables(deviceId, STAT_EGRESS_TABLE, VTAP_EGRESS_TABLE); + connectTables(deviceId, VTAP_EGRESS_TABLE, FORWARDING_TABLE); } private void setupJumpTable(K8sNode k8sNode) { @@ -267,7 +271,7 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { .withPriority(HIGH_PRIORITY) .fromApp(appId) .makePermanent() - .forTable(JUMP_TABLE) + .forTable(GROUPING_TABLE) .build(); applyRule(flowRule, true); @@ -275,7 +279,7 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { selector = DefaultTrafficSelector.builder(); treatment = DefaultTrafficTreatment.builder(); - treatment.transition(STAT_OUTBOUND_TABLE); + treatment.transition(STAT_EGRESS_TABLE); flowRule = DefaultFlowRule.builder() .forDevice(deviceId) @@ -284,7 +288,7 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { .withPriority(DROP_PRIORITY) .fromApp(appId) .makePermanent() - .forTable(JUMP_TABLE) + .forTable(GROUPING_TABLE) .build(); applyRule(flowRule, true); @@ -305,7 +309,7 @@ public class K8sFlowRuleManager implements K8sFlowRuleService { if (mac != null) { tBuilder.setEthSrc(mac); } - tBuilder.transition(STAT_OUTBOUND_TABLE); + tBuilder.transition(STAT_EGRESS_TABLE); } else { PortNumber portNum = tunnelPortNumByNetId(k8sNetwork.networkId(), k8sNetworkService, node); diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNetworkPolicyHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNetworkPolicyHandler.java new file mode 100644 index 0000000000..212f9b7166 --- /dev/null +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNetworkPolicyHandler.java @@ -0,0 +1,507 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.k8snetworking.impl; + +import com.google.common.collect.Maps; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.networking.NetworkPolicy; +import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort; +import org.onlab.packet.Ethernet; +import org.onlab.packet.IPv4; +import org.onlab.packet.IpAddress; +import org.onlab.packet.IpPrefix; +import org.onlab.packet.TpPort; +import org.onosproject.cfg.ComponentConfigService; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.k8snetworking.api.K8sFlowRuleService; +import org.onosproject.k8snetworking.api.K8sNetworkPolicyEvent; +import org.onosproject.k8snetworking.api.K8sNetworkPolicyListener; +import org.onosproject.k8snetworking.api.K8sNetworkPolicyService; +import org.onosproject.k8snetworking.api.K8sNetworkService; +import org.onosproject.k8snetworking.api.K8sPodEvent; +import org.onosproject.k8snetworking.api.K8sPodListener; +import org.onosproject.k8snetworking.api.K8sPodService; +import org.onosproject.k8snode.api.K8sNodeService; +import org.onosproject.net.device.DeviceService; +import org.onosproject.net.driver.DriverService; +import org.onosproject.net.flow.DefaultTrafficSelector; +import org.onosproject.net.flow.DefaultTrafficTreatment; +import org.onosproject.net.flow.TrafficSelector; +import org.onosproject.net.flow.TrafficTreatment; +import org.onosproject.store.service.StorageService; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.slf4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_TABLE; +import static org.onosproject.k8snetworking.api.Constants.ACL_INGRESS_TABLE; +import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE; +import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID; +import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE; +import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE; +import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX; +import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.shiftIpDomain; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Handles the ACL by referring to the network policy defined through kubernetes. + */ +@Component(immediate = true) +public class K8sNetworkPolicyHandler { + + private final Logger log = getLogger(getClass()); + + private static final String DIRECTION_INGRESS = "ingress"; + private static final String DIRECTION_EGRESS = "egress"; + + private static final String PROTOCOL_TCP = "tcp"; + private static final String PROTOCOL_UDP = "udp"; + + private static final int HOST_PREFIX = 32; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected CoreService coreService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected LeadershipService leadershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected DriverService driverService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected DeviceService deviceService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected ComponentConfigService configService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sNetworkService k8sNetworkService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sFlowRuleService k8sFlowRuleService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sNodeService k8sNodeService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sPodService k8sPodService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY) + protected K8sNetworkPolicyService k8sNetworkPolicyService; + + private final ExecutorService eventExecutor = newSingleThreadExecutor( + groupedThreads(this.getClass().getSimpleName(), "event-handler", log)); + private final InternalPodListener internalPodListener = + new InternalPodListener(); + private final InternalNetworkPolicyListener internalNetworkPolicyListener = + new InternalNetworkPolicyListener(); + + private ApplicationId appId; + private NodeId localNodeId; + + @Activate + protected void activate() { + appId = coreService.registerApplication(K8S_NETWORKING_APP_ID); + + localNodeId = clusterService.getLocalNode().id(); + leadershipService.runForLeadership(appId.name()); + k8sPodService.addListener(internalPodListener); + k8sNetworkPolicyService.addListener(internalNetworkPolicyListener); + + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + leadershipService.withdraw(appId.name()); + k8sPodService.removeListener(internalPodListener); + k8sNetworkPolicyService.removeListener(internalNetworkPolicyListener); + eventExecutor.shutdown(); + + log.info("Stopped"); + } + + private void setBlockRulesByPolicy(NetworkPolicy policy, boolean install) { + Map labels = + policy.getSpec().getPodSelector().getMatchLabels(); + Map> filter = Maps.newConcurrentMap(); + + k8sPodService.pods().forEach(p -> { + p.getMetadata().getLabels().forEach((k, v) -> { + if (labels.get(k) != null && labels.get(k).equals(v)) { + filter.put(p.getStatus().getPodIP(), policy.getSpec().getPolicyTypes()); + } + }); + }); + + setBlockRules(filter, install); + } + + private void setBlockRulesByPod(Pod pod, boolean install) { + Map> filter = Maps.newConcurrentMap(); + + k8sNetworkPolicyService.networkPolicies().forEach(p -> { + Map labels = p.getSpec().getPodSelector().getMatchLabels(); + pod.getMetadata().getLabels().forEach((k, v) -> { + if (labels.get(k) != null && labels.get(k).equals(v)) { + filter.put(pod.getStatus().getPodIP(), p.getSpec().getPolicyTypes()); + } + }); + }); + + setBlockRules(filter, install); + } + + private void setBlockRules(Map> filter, boolean install) { + filter.forEach((k, v) -> { + v.forEach(d -> { + TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder() + .matchEthType(Ethernet.TYPE_IPV4); + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder(); + if (d.equalsIgnoreCase(DIRECTION_INGRESS)) { + sBuilder.matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX)); + tBuilder.transition(ACL_INGRESS_TABLE); + } else if (d.equalsIgnoreCase(DIRECTION_EGRESS)) { + sBuilder.matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX)); + tBuilder.transition(ACL_EGRESS_TABLE); + } + + k8sNodeService.completeNodes().forEach(n -> { + k8sFlowRuleService.setRule( + appId, + n.intgBridge(), + sBuilder.build(), + tBuilder.build(), + PRIORITY_CIDR_RULE, + ACL_TABLE, + install + ); + }); + }); + }); + } + + private void setAllowRulesByPolicy(NetworkPolicy policy, boolean install) { + Map>> white = Maps.newConcurrentMap(); + + policy.getSpec().getIngress().forEach(i -> { + Map> direction = Maps.newConcurrentMap(); + direction.put(DIRECTION_INGRESS, i.getPorts()); + i.getFrom().stream() + .filter(p -> p.getIpBlock() != null) + .forEach(p -> { + white.compute(p.getIpBlock().getCidr(), (k, v) -> direction); + + // TODO: need to handle namespace label later + + Map podLabels = p.getPodSelector().getMatchLabels(); + k8sPodService.pods().forEach(pod -> { + pod.getMetadata().getLabels().forEach((k, v) -> { + if (podLabels.get(k) != null && podLabels.get(k).equals(v)) { + white.compute(shiftIpDomain(pod.getStatus().getPodIP(), + SHIFTED_IP_PREFIX), (m, n) -> direction); + } + }); + }); + }); + }); + + policy.getSpec().getEgress().forEach(e -> { + Map> direction = Maps.newConcurrentMap(); + direction.put(DIRECTION_EGRESS, e.getPorts()); + e.getTo().stream() + .filter(p -> p.getIpBlock() != null) + .forEach(p -> { + white.compute(p.getIpBlock().getCidr(), (k, v) -> { + if (v != null) { + v.put(DIRECTION_EGRESS, e.getPorts()); + return v; + } else { + return direction; + } + }); + + // TODO: need to handle namespace label later + + Map podLabels = p.getPodSelector().getMatchLabels(); + k8sPodService.pods().forEach(pod -> { + pod.getMetadata().getLabels().forEach((k, v) -> { + if (podLabels.get(k) != null && podLabels.get(k).equals(v)) { + white.compute(shiftIpDomain(pod.getStatus().getPodIP(), + SHIFTED_IP_PREFIX), (m, n) -> { + if (n != null) { + n.put(DIRECTION_EGRESS, e.getPorts()); + return n; + } else { + return direction; + } + }); + } + }); + }); + }); + }); + + setAllowRules(white, install); + } + + private void setAllowRulesByPod(Pod pod, boolean install) { + Map>> white = Maps.newConcurrentMap(); + + k8sNetworkPolicyService.networkPolicies().forEach(policy -> { + policy.getSpec().getIngress().forEach(i -> { + Map> direction = Maps.newConcurrentMap(); + direction.put(DIRECTION_INGRESS, i.getPorts()); + i.getFrom().forEach(peer -> { + + // TODO: need to handle namespace label later + + Map podLabels = peer.getPodSelector().getMatchLabels(); + pod.getMetadata().getLabels().forEach((k, v) -> { + if (podLabels.get(k) != null && podLabels.get(k).equals(v)) { + white.compute(shiftIpDomain(pod.getStatus().getPodIP(), + SHIFTED_IP_PREFIX), (m, n) -> direction); + } + }); + }); + }); + }); + + k8sNetworkPolicyService.networkPolicies().forEach(policy -> { + policy.getSpec().getEgress().forEach(e -> { + Map> direction = Maps.newConcurrentMap(); + direction.put(DIRECTION_EGRESS, e.getPorts()); + e.getTo().forEach(p -> { + + // TODO: need to handle namespace label later + + Map podLabels = p.getPodSelector().getMatchLabels(); + pod.getMetadata().getLabels().forEach((k, v) -> { + if (podLabels.get(k) != null && podLabels.get(k).equals(v)) { + white.compute(shiftIpDomain(pod.getStatus().getPodIP(), + SHIFTED_IP_PREFIX), (m, n) -> { + if (n != null) { + n.put(DIRECTION_EGRESS, e.getPorts()); + return n; + } else { + return direction; + } + }); + } + }); + }); + }); + }); + + setAllowRules(white, install); + } + + private void setAllowRules(Map>> white, boolean install) { + white.forEach((k, v) -> { + v.forEach((pk, pv) -> { + TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder() + .matchEthType(Ethernet.TYPE_IPV4); + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder(); + if (pk.equalsIgnoreCase(DIRECTION_INGRESS)) { + sBuilder.matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX)); + tBuilder.transition(ROUTING_TABLE); + + if (pv.size() == 0) { + k8sNodeService.completeNodes().forEach(n -> { + k8sFlowRuleService.setRule( + appId, + n.intgBridge(), + sBuilder.build(), + tBuilder.build(), + PRIORITY_CIDR_RULE, + ACL_INGRESS_TABLE, + install + ); + }); + } else { + pv.forEach(p -> { + if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) { + sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP); + sBuilder.matchTcpSrc(TpPort.tpPort(p.getPort().getIntVal())); + } + if (p.getProtocol().equalsIgnoreCase(PROTOCOL_UDP)) { + sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP); + sBuilder.matchUdpSrc(TpPort.tpPort(p.getPort().getIntVal())); + } + + k8sNodeService.completeNodes().forEach(n -> { + k8sFlowRuleService.setRule( + appId, + n.intgBridge(), + sBuilder.build(), + tBuilder.build(), + PRIORITY_CIDR_RULE, + ACL_INGRESS_TABLE, + install + ); + }); + }); + } + } else if (pk.equalsIgnoreCase(DIRECTION_EGRESS)) { + sBuilder.matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX)); + tBuilder.transition(ROUTING_TABLE); + + if (pv.size() == 0) { + k8sNodeService.completeNodes().forEach(n -> { + k8sFlowRuleService.setRule( + appId, + n.intgBridge(), + sBuilder.build(), + tBuilder.build(), + PRIORITY_CIDR_RULE, + ACL_EGRESS_TABLE, + install + ); + }); + } else { + pv.forEach(p -> { + if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) { + sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP); + sBuilder.matchTcpDst(TpPort.tpPort(p.getPort().getIntVal())); + } + if (p.getProtocol().equalsIgnoreCase(PROTOCOL_UDP)) { + sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP); + sBuilder.matchUdpDst(TpPort.tpPort(p.getPort().getIntVal())); + } + + k8sNodeService.completeNodes().forEach(n -> { + k8sFlowRuleService.setRule( + appId, + n.intgBridge(), + sBuilder.build(), + tBuilder.build(), + PRIORITY_CIDR_RULE, + ACL_EGRESS_TABLE, + install + ); + }); + }); + } + + } else { + log.error("In correct direction has been specified at network policy."); + } + }); + }); + } + + private class InternalPodListener implements K8sPodListener { + + private boolean isRelevantHelper() { + return Objects.equals(localNodeId, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(K8sPodEvent event) { + Pod pod = event.subject(); + switch (event.type()) { + case K8S_POD_CREATED: + case K8S_POD_UPDATED: + eventExecutor.execute(() -> processPodCreation(pod)); + break; + case K8S_POD_REMOVED: + eventExecutor.execute(() -> processPodRemoval(pod)); + break; + default: + break; + } + } + + private void processPodCreation(Pod pod) { + if (!isRelevantHelper()) { + return; + } + + setBlockRulesByPod(pod, true); + setAllowRulesByPod(pod, true); + } + + private void processPodRemoval(Pod pod) { + if (!isRelevantHelper()) { + return; + } + + setBlockRulesByPod(pod, false); + setAllowRulesByPod(pod, false); + } + } + + private class InternalNetworkPolicyListener implements K8sNetworkPolicyListener { + + private boolean isRelevantHelper() { + return Objects.equals(localNodeId, leadershipService.getLeader(appId.name())); + } + + @Override + public void event(K8sNetworkPolicyEvent event) { + NetworkPolicy policy = event.subject(); + switch (event.type()) { + case K8S_NETWORK_POLICY_CREATED: + case K8S_NETWORK_POLICY_UPDATED: + eventExecutor.execute(() -> processNetworkPolicyCreation(policy)); + break; + case K8S_NETWORK_POLICY_REMOVED: + eventExecutor.execute(() -> processNetworkPolicyRemoval(policy)); + break; + default: + break; + } + } + + private void processNetworkPolicyCreation(NetworkPolicy policy) { + if (!isRelevantHelper()) { + return; + } + + setBlockRulesByPolicy(policy, true); + setAllowRulesByPolicy(policy, true); + } + + private void processNetworkPolicyRemoval(NetworkPolicy policy) { + if (!isRelevantHelper()) { + return; + } + + setBlockRulesByPolicy(policy, false); + setAllowRulesByPolicy(policy, false); + } + } +} diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java index bbf505ac69..38493f3a90 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java @@ -86,10 +86,11 @@ import java.util.stream.Collectors; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE; import static org.onosproject.k8snetworking.api.Constants.A_CLASS; import static org.onosproject.k8snetworking.api.Constants.B_CLASS; import static org.onosproject.k8snetworking.api.Constants.DST; -import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE; +import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE; import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID; import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL; import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS; @@ -106,11 +107,11 @@ import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE; import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR; import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX; import static org.onosproject.k8snetworking.api.Constants.SRC; -import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_CIDR; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE; import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT; +import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE; import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr; import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap; import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId; @@ -253,9 +254,9 @@ public class K8sServiceHandler { k8sNetworkService.networks().forEach(n -> { // TODO: need to provide a way to add multiple service IP CIDR ranges setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), serviceCidr, - JUMP_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install); + GROUPING_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install); setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(), - JUMP_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install); + GROUPING_TABLE, ACL_TABLE, PRIORITY_CT_RULE, install); }); // +trk-new CT rules @@ -285,25 +286,25 @@ public class K8sServiceHandler { // src: POD -> dst: service (unNAT POD) grouping setSrcDstCidrRules(deviceId, fullSrcPodCidr, serviceCidr, B_CLASS, null, - SHIFTED_IP_PREFIX, SRC, JUMP_TABLE, SERVICE_TABLE, + SHIFTED_IP_PREFIX, SRC, GROUPING_TABLE, SERVICE_TABLE, PRIORITY_CT_RULE, install); // src: POD (unNAT service) -> dst: shifted POD grouping setSrcDstCidrRules(deviceId, fullSrcPodCidr, SHIFTED_IP_CIDR, B_CLASS, null, - srcPodPrefix, DST, JUMP_TABLE, POD_TABLE, PRIORITY_CT_RULE, install); + srcPodPrefix, DST, GROUPING_TABLE, POD_TABLE, PRIORITY_CT_RULE, install); // src: node -> dst: service (unNAT POD) grouping setSrcDstCidrRules(deviceId, fullSrcNodeCidr, serviceCidr, A_CLASS, - null, null, null, JUMP_TABLE, SERVICE_TABLE, + null, null, null, GROUPING_TABLE, SERVICE_TABLE, PRIORITY_CT_RULE, install); // src: POD (unNAT service) -> dst: node grouping setSrcDstCidrRules(deviceId, fullSrcPodCidr, fullSrcNodeCidr, A_CLASS, - null, null, null, JUMP_TABLE, POD_TABLE, + null, null, null, GROUPING_TABLE, POD_TABLE, PRIORITY_CT_RULE, install); k8sNetworkService.networks().forEach(n -> { setSrcDstCidrRules(deviceId, fullSrcPodCidr, n.cidr(), B_CLASS, n.segmentId(), null, null, ROUTING_TABLE, - STAT_OUTBOUND_TABLE, PRIORITY_INTER_ROUTING_RULE, install); + STAT_EGRESS_TABLE, PRIORITY_INTER_ROUTING_RULE, install); }); // setup load balancing rules using group table @@ -450,7 +451,7 @@ public class K8sServiceHandler { } ExtensionTreatment resubmitTreatment = buildResubmitExtension( - deviceService.getDevice(deviceId), ROUTING_TABLE); + deviceService.getDevice(deviceId), ACL_TABLE); tBuilder.extension(resubmitTreatment, deviceId); return buildGroupBucket(tBuilder.build(), SELECT, (short) -1); @@ -553,7 +554,7 @@ public class K8sServiceHandler { TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() .setIpSrc(IpAddress.valueOf(serviceIp)) - .transition(ROUTING_TABLE); + .transition(ACL_TABLE); if (TCP.equals(protocol)) { tBuilder.setTcpSrc(TpPort.tpPort(servicePort)); @@ -586,7 +587,7 @@ public class K8sServiceHandler { if (mac != null) { tBuilder.setEthSrc(mac); } - tBuilder.transition(STAT_OUTBOUND_TABLE); + tBuilder.transition(STAT_EGRESS_TABLE); } else { PortNumber portNum = tunnelPortNumByNetId(network.networkId(), k8sNetworkService, n); @@ -664,7 +665,7 @@ public class K8sServiceHandler { .natPortMax(TpPort.tpPort(p.getPort())) .build(); ExtensionTreatment resubmitTreatment = buildResubmitExtension( - deviceService.getDevice(deviceId), ROUTING_TABLE); + deviceService.getDevice(deviceId), ACL_TABLE); TrafficTreatment treatment = DefaultTrafficTreatment.builder() .extension(ctNatTreatment, deviceId) .extension(resubmitTreatment, deviceId) diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java index 409d80b4bb..4b9adc71ad 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java @@ -56,7 +56,7 @@ import java.util.concurrent.ExecutorService; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.onlab.util.Tools.groupedThreads; -import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_TABLE; +import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE; import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE; import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE; import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE; @@ -248,7 +248,7 @@ public class K8sSwitchingHandler { if (ethType == Ethernet.TYPE_ARP) { tBuilder.transition(ARP_TABLE); } else if (ethType == Ethernet.TYPE_IPV4) { - tBuilder.transition(ACL_EGRESS_TABLE); + tBuilder.transition(JUMP_TABLE); } k8sFlowRuleService.setRule( @@ -271,7 +271,7 @@ public class K8sSwitchingHandler { TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() .setTunnelId(Long.valueOf(net.segmentId())) - .transition(ACL_EGRESS_TABLE); + .transition(JUMP_TABLE); k8sFlowRuleService.setRule( appId, @@ -293,7 +293,7 @@ public class K8sSwitchingHandler { TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder() .setTunnelId(Long.valueOf(net.segmentId())) - .transition(ACL_EGRESS_TABLE); + .transition(JUMP_TABLE); k8sFlowRuleService.setRule( appId, diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java index a9601d1730..b9ef48a80f 100644 --- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java +++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java @@ -295,6 +295,18 @@ public final class K8sNetworkingUtil { return ipMap; } + /** + * Returns a shifted IP address. + * + * @param ipAddress IP address to be shifted + * @param shiftPrefix A IP prefix used in shifted IP address + * @return shifted Ip address + */ + public static String shiftIpDomain(String ipAddress, String shiftPrefix) { + String origIpPrefix = ipAddress.split("\\.")[0] + "." + ipAddress.split("\\.")[1]; + return StringUtils.replace(ipAddress, origIpPrefix, shiftPrefix); + } + /** * Returns an unshifted IP address. *