From d684ec0c655c5544a464ee92d2a0a68a6e0bf089 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Tue, 22 Jun 2021 02:50:19 +0530 Subject: [PATCH] add logic to explicitly ACCEPT traffic from/to the pod if its permitted by applicable network policies. If there are no network policies then by default ACCEPT the pod traffic --- .../netpol/network_policy_controller.go | 38 ++- pkg/controllers/netpol/pod.go | 306 +++++++++--------- 2 files changed, 196 insertions(+), 148 deletions(-) diff --git a/pkg/controllers/netpol/network_policy_controller.go b/pkg/controllers/netpol/network_policy_controller.go index ec8bab44..fc88bdce 100644 --- a/pkg/controllers/netpol/network_policy_controller.go +++ b/pkg/controllers/netpol/network_policy_controller.go @@ -32,6 +32,7 @@ const ( kubeInputChainName = "KUBE-ROUTER-INPUT" kubeForwardChainName = "KUBE-ROUTER-FORWARD" kubeOutputChainName = "KUBE-ROUTER-OUTPUT" + kubeDefaultNetpolChain = "KUBE-NWPLCY-DEFAULT" ) // Network policy controller provides both ingress and egress filtering for the pods as per the defined network @@ -143,9 +144,12 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle klog.Info("Starting network policy controller") npc.healthChan = healthChan - // setup kube-router specific top level custom chains + // setup kube-router specific top level custom chains (KUBE-ROUTER-INPUT, KUBE-ROUTER-FORWARD, KUBE-ROUTER-OUTPUT) npc.ensureTopLevelChains() + // setup default network policy chain that is applied to traffic from/to the pods that does not match any network policy + npc.ensureDefaultNetworkPolicyChain() + // Full syncs of the network policy controller take a lot of time and can only be processed one at a time, // therefore, we start it in it's own goroutine and request a sync through a single item channel klog.Info("Starting network policy controller full sync goroutine") @@ -377,6 +381,38 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, externalIPIndex+4) } + // for the traffic to/from the local pod's let network policy controller be + // authoritative entity to ACCEPT the traffic if it complies to network policies + for _, chain := range chains { + comment := "rule to explictly ACCEPT traffic that comply to network policies" + args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", "0x20000/0x20000", "-j", "ACCEPT"} + err = iptablesCmdHandler.AppendUnique("filter", chain, args...) + if err != nil { + klog.Fatalf("Failed to run iptables command: %s", err.Error()) + } + } +} + +// Creates custom chains KUBE-NWPLCY-DEFAULT +func (npc *NetworkPolicyController) ensureDefaultNetworkPolicyChain() { + + iptablesCmdHandler, err := iptables.New() + if err != nil { + klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error()) + } + + markArgs := make([]string, 0) + markComment := "rule to mark traffic matching a network policy" + markArgs = append(markArgs, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000") + + err = iptablesCmdHandler.NewChain("filter", kubeDefaultNetpolChain) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + klog.Fatalf("Failed to run iptables command to create %s chain due to %s", kubeDefaultNetpolChain, err.Error()) + } + err = iptablesCmdHandler.AppendUnique("filter", kubeDefaultNetpolChain, markArgs...) + if err != nil { + klog.Fatalf("Failed to run iptables command: %s", err.Error()) + } } func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) error { diff --git a/pkg/controllers/netpol/pod.go b/pkg/controllers/netpol/pod.go index 1f99560c..8195e52c 100644 --- a/pkg/controllers/netpol/pod.go +++ b/pkg/controllers/netpol/pod.go @@ -98,18 +98,12 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo [] return nil } - // loop through the pods running on the node which to which ingress network policies to be applied - ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String()) + // loop through the pods running on the node + allLocalPods, err := npc.getLocalPods(npc.nodeIP.String()) if err != nil { return nil, err } - for _, pod := range *ingressNetworkPolicyEnabledPods { - - // below condition occurs when we get trasient update while removing or adding pod - // subseqent update will do the correct action - if len(pod.ip) == 0 || pod.ip == "" { - continue - } + for _, pod := range *allLocalPods { // ensure pod specific firewall chain exist for all the pods that need ingress firewall podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) @@ -117,177 +111,195 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo [] activePodFwChains[podFwChainName] = true - // add entries in pod firewall to run through required network policies - for _, policy := range networkPoliciesInfo { - if _, ok := policy.targetPods[pod.ip]; ok { - comment := "\"run through nw policy " + policy.name + "\"" - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) - args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - } + // setup rules to run pod inbound traffic through applicable ingress network policies + err = npc.setupPodIngressRules(&pod, podFwChainName, networkPoliciesInfo, version) + if err != nil { + return nil, err } - comment := "\"rule to permit the traffic traffic to pods when source is the pod's local node\"" - args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + // setup rules to intercept inbound traffic to the pods + err = npc.interceptPodInboundTraffic(&pod, podFwChainName) + if err != nil { + return nil, err + } - // ensure statefull firewall, that permits return traffic for the traffic originated by the pod - comment = "\"rule for stateful firewall for pod\"" - args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + // setup rules to run pod inbound traffic through applicable ingress network policies + err = npc.setupPodEgressRules(&pod, podFwChainName, networkPoliciesInfo, version) + if err != nil { + return nil, err + } - // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain - // this rule applies to the traffic getting routed (coming for other node pods) - comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName + "\"" - args = []string{"-I", kubeForwardChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - - // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain - // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy - args = []string{"-I", kubeOutputChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - - // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain - // this rule applies to the traffic getting switched (coming for same node pods) - comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName + "\"" - args = []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged", - "-m", "comment", "--comment", comment, - "-d", pod.ip, - "-j", podFwChainName, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + // setup rules to intercept inbound traffic to the pods + err = npc.interceptPodOutboundTraffic(&pod, podFwChainName) + if err != nil { + return nil, err + } err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName) if err != nil { return nil, err } - } - // loop through the pods running on the node which egress network policies to be applied - egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String()) - if err != nil { - return nil, err - } - for _, pod := range *egressNetworkPolicyEnabledPods { - - // below condition occurs when we get trasient update while removing or adding pod - // subseqent update will do the correct action - if len(pod.ip) == 0 || pod.ip == "" { - continue - } - - // ensure pod specific firewall chain exist for all the pods that need egress firewall - podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) - if !activePodFwChains[podFwChainName] { - npc.filterTableRules.WriteString(":" + podFwChainName + "\n") - } - activePodFwChains[podFwChainName] = true - - // add entries in pod firewall to run through required network policies - for _, policy := range networkPoliciesInfo { - if _, ok := policy.targetPods[pod.ip]; ok { - comment := "\"run through nw policy " + policy.name + "\"" - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) - args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - } - } - - // ensure statefull firewall, that permits return traffic for the traffic originated by the pod - comment := "\"rule for stateful firewall for pod\"" - args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} + // set mark to indicate traffic from/to the pod passed network policies. + // Mark will be checked to explictly ACCEPT the traffic + comment := "set mark to ACCEPT traffic that comply to network policies" + args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-j", "MARK", "--set-mark", "0x20000/0x20000"} npc.filterTableRules.WriteString(strings.Join(args, " ")) - - egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName} - for _, chain := range egressFilterChains { - // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain - // this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted - // to pod on a different node) - comment = "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName + "\"" - args = []string{"-A", chain, "-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - } - - // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain - // this rule applies to the traffic getting switched (coming for same node pods) - comment = "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName + "\"" - args = []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged", - "-m", "comment", "--comment", comment, - "-s", pod.ip, - "-j", podFwChainName, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - - err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName) - if err != nil { - return nil, err - } } return activePodFwChains, nil } -func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) { - nodePods := make(map[string]podInfo) - - for _, obj := range npc.podLister.List() { - pod := obj.(*api.Pod) - - // ignore the pods running on the different node and pods that are not actionable - if strings.Compare(pod.Status.HostIP, nodeIP) != 0 || !isNetPolActionable(pod) { - continue - } - - for _, policy := range networkPoliciesInfo { - if policy.namespace != pod.ObjectMeta.Namespace { - continue - } - _, ok := policy.targetPods[pod.Status.PodIP] - if ok && (policy.policyType == "both" || policy.policyType == "ingress") { - klog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") - nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, - name: pod.ObjectMeta.Name, - namespace: pod.ObjectMeta.Namespace, - labels: pod.ObjectMeta.Labels} - break - } +// setup rules to jump to applicable network policy chaings for the pod inbound traffic +func (npc *NetworkPolicyController) setupPodIngressRules(pod *podInfo, podFwChainName string, networkPoliciesInfo []networkPolicyInfo, version string) error { + // add entries in pod firewall to run through required network policies + for _, policy := range networkPoliciesInfo { + if _, ok := policy.targetPods[pod.ip]; ok { + comment := "\"run through nw policy " + policy.name + "\"" + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) } } - return &nodePods, nil + // if pod does not have any network policy which applies rules for pod's ingress traffic + // then apply default network policy + if !npc.isIngressNetworkPolicyEnabledPod(networkPoliciesInfo, pod) { + comment := "run through default ingress policy chain" + args := []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + } + + comment := "\"rule to permit the traffic traffic to pods when source is the pod's local node\"" + args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + + // ensure statefull firewall, that permits return traffic for the traffic originated by the pod + comment = "\"rule for stateful firewall for pod\"" + args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + + return nil } -func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) { +func (npc *NetworkPolicyController) interceptPodInboundTraffic(pod *podInfo, podFwChainName string) error { + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting routed (coming for other node pods) + comment := "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args := []string{"-I", kubeForwardChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) - nodePods := make(map[string]podInfo) + // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain + // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy + args = []string{"-I", kubeOutputChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) + comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args = []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-d", pod.ip, + "-j", podFwChainName, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + return nil +} + +// setup rules to jump to applicable network policy chains for the pod outbound traffic +func (npc *NetworkPolicyController) setupPodEgressRules(pod *podInfo, podFwChainName string, networkPoliciesInfo []networkPolicyInfo, version string) error { + // add entries in pod firewall to run through required network policies + for _, policy := range networkPoliciesInfo { + if _, ok := policy.targetPods[pod.ip]; ok { + comment := "\"run through nw policy " + policy.name + "\"" + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + } + } + + // if pod does not have any network policy which applies rules for pod's egress traffic + // then apply default network policy + if !npc.isEgressNetworkPolicyEnabledPod(networkPoliciesInfo, pod) { + comment := "run through default network policy chain" + args := []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + } + + // ensure statefull firewall, that permits return traffic for the traffic originated by the pod + comment := "\"rule for stateful firewall for pod\"" + args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + return nil +} + +// setup iptable rules to intercept outbound traffic from pods and run it across the +// firewall chain corresponding to the pod so that egress network policies are enforced +func (npc *NetworkPolicyController) interceptPodOutboundTraffic(pod *podInfo, podFwChainName string) error { + egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName} + for _, chain := range egressFilterChains { + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted + // to pod on a different node) + comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args := []string{"-A", chain, "-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + } + + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) + comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args := []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-s", pod.ip, + "-j", podFwChainName, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + return nil +} + +func (npc *NetworkPolicyController) getLocalPods(nodeIP string) (*map[string]podInfo, error) { + localPods := make(map[string]podInfo) for _, obj := range npc.podLister.List() { pod := obj.(*api.Pod) - // ignore the pods running on the different node and pods that are not actionable if strings.Compare(pod.Status.HostIP, nodeIP) != 0 || !isNetPolActionable(pod) { continue } + localPods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, + name: pod.ObjectMeta.Name, + namespace: pod.ObjectMeta.Namespace, + labels: pod.ObjectMeta.Labels} + } + return &localPods, nil +} - for _, policy := range networkPoliciesInfo { - if policy.namespace != pod.ObjectMeta.Namespace { - continue - } - _, ok := policy.targetPods[pod.Status.PodIP] - if ok && (policy.policyType == "both" || policy.policyType == "egress") { - klog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") - nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, - name: pod.ObjectMeta.Name, - namespace: pod.ObjectMeta.Namespace, - labels: pod.ObjectMeta.Labels} - break - } +func (npc *NetworkPolicyController) isIngressNetworkPolicyEnabledPod(networkPoliciesInfo []networkPolicyInfo, pod *podInfo) bool { + for _, policy := range networkPoliciesInfo { + if policy.namespace != pod.namespace { + continue + } + _, ok := policy.targetPods[pod.ip] + if ok && (policy.policyType == "both" || policy.policyType == "ingress") { + return true } } + return false +} - return &nodePods, nil +func (npc *NetworkPolicyController) isEgressNetworkPolicyEnabledPod(networkPoliciesInfo []networkPolicyInfo, pod *podInfo) bool { + for _, policy := range networkPoliciesInfo { + if policy.namespace != pod.namespace { + continue + } + _, ok := policy.targetPods[pod.ip] + if ok && (policy.policyType == "both" || policy.policyType == "egress") { + return true + } + } + return false } func podFirewallChainName(namespace, podName string, version string) string {