use iptables-save and iptables-restore commands to consolidate

individual iptables command that are run during full network
policies sync
This commit is contained in:
Murali Reddy 2021-02-26 03:07:19 +05:30 committed by Aaron U'Ren
parent 8f2e26a6c6
commit 888cac9193
4 changed files with 195 additions and 291 deletions

View File

@ -1,6 +1,7 @@
package netpol package netpol
import ( import (
"bytes"
"crypto/sha256" "crypto/sha256"
"encoding/base32" "encoding/base32"
"fmt" "fmt"
@ -67,6 +68,8 @@ type NetworkPolicyController struct {
PodEventHandler cache.ResourceEventHandler PodEventHandler cache.ResourceEventHandler
NamespaceEventHandler cache.ResourceEventHandler NamespaceEventHandler cache.ResourceEventHandler
NetworkPolicyEventHandler cache.ResourceEventHandler NetworkPolicyEventHandler cache.ResourceEventHandler
filterTableRules bytes.Buffer
} }
// internal structure to represent a network policy // internal structure to represent a network policy
@ -220,6 +223,12 @@ func (npc *NetworkPolicyController) fullPolicySync() {
return return
} }
npc.filterTableRules.Reset()
if err := utils.SaveInto("filter", &npc.filterTableRules); err != nil {
glog.Errorf("Aborting sync. Failed to run iptables-save: %v" + err.Error())
return
}
activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion) activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion)
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error()) glog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error())
@ -232,11 +241,23 @@ func (npc *NetworkPolicyController) fullPolicySync() {
return return
} }
err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets) err = npc.cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets)
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error()) glog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error())
return return
} }
fmt.Println(npc.filterTableRules.String())
if err := utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil {
glog.Errorf("Aborting sync. Failed to run iptables-restore: %v" + err.Error())
return
}
err = npc.cleanupStaleIPSets(activePolicyIPSets)
if err != nil {
glog.Errorf("Failed to cleanup stale ipsets: %v", err.Error())
return
}
} }
// Creates custom chains KUBE-ROUTER-INPUT, KUBE-ROUTER-FORWARD, KUBE-ROUTER-OUTPUT // Creates custom chains KUBE-ROUTER-INPUT, KUBE-ROUTER-FORWARD, KUBE-ROUTER-OUTPUT
@ -356,25 +377,16 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
} }
func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error { func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error {
cleanupPodFwChains := make([]string, 0) cleanupPodFwChains := make([]string, 0)
cleanupPolicyChains := make([]string, 0) cleanupPolicyChains := make([]string, 0)
cleanupPolicyIPSets := make([]*utils.Set, 0)
// initialize tool sets for working with iptables and ipset // initialize tool sets for working with iptables and ipset
iptablesCmdHandler, err := iptables.New() iptablesCmdHandler, err := iptables.New()
if err != nil { if err != nil {
glog.Fatalf("failed to initialize iptables command executor due to %s", err.Error()) glog.Fatalf("failed to initialize iptables command executor due to %s", err.Error())
} }
ipsets, err := utils.NewIPSet(false)
if err != nil {
glog.Fatalf("failed to create ipsets command executor due to %s", err.Error())
}
err = ipsets.Save()
if err != nil {
glog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error())
}
// find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed // find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed
chains, err := iptablesCmdHandler.ListChains("filter") chains, err := iptablesCmdHandler.ListChains("filter")
@ -393,6 +405,58 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
} }
} }
} }
var newChains, newRules, desiredFilterTable bytes.Buffer
rules := strings.Split(npc.filterTableRules.String(), "\n")
if len(rules) > 0 && rules[len(rules)-1] == "" {
rules = rules[:len(rules)-1]
}
for _, rule := range rules {
skipRule := false
for _, podFWChainName := range cleanupPodFwChains {
if strings.Contains(rule, podFWChainName) {
skipRule = true
break
}
}
for _, policyChainName := range cleanupPolicyChains {
if strings.Contains(rule, policyChainName) {
skipRule = true
break
}
}
if strings.Contains(rule, "COMMIT") || strings.HasPrefix(rule, "# ") {
skipRule = true
}
if skipRule {
continue
}
if strings.HasPrefix(rule, ":") {
newChains.WriteString(rule + " - [0:0]\n")
}
if strings.HasPrefix(rule, "-") {
newRules.WriteString(rule + "\n")
}
}
desiredFilterTable.WriteString("*filter" + "\n")
desiredFilterTable.Write(newChains.Bytes())
desiredFilterTable.Write(newRules.Bytes())
desiredFilterTable.WriteString("COMMIT" + "\n")
npc.filterTableRules = desiredFilterTable
return nil
}
func (npc *NetworkPolicyController) cleanupStaleIPSets(activePolicyIPSets map[string]bool) error {
cleanupPolicyIPSets := make([]*utils.Set, 0)
ipsets, err := utils.NewIPSet(false)
if err != nil {
glog.Fatalf("failed to create ipsets command executor due to %s", err.Error())
}
err = ipsets.Save()
if err != nil {
glog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error())
}
for _, set := range ipsets.Sets { for _, set := range ipsets.Sets {
if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) || if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) ||
strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) { strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) {
@ -401,78 +465,6 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
} }
} }
} }
// remove stale iptables podFwChain references from the filter table chains
for _, podFwChain := range cleanupPodFwChains {
primaryChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName}
for _, egressChain := range primaryChains {
forwardChainRules, err := iptablesCmdHandler.List("filter", egressChain)
if err != nil {
return fmt.Errorf("failed to list rules in filter table, %s podFwChain due to %s", egressChain, err.Error())
}
// TODO delete rule by spec, than rule number to avoid extra loop
var realRuleNo int
for i, rule := range forwardChainRules {
if strings.Contains(rule, podFwChain) {
err = iptablesCmdHandler.Delete("filter", egressChain, strconv.Itoa(i-realRuleNo))
if err != nil {
return fmt.Errorf("failed to delete rule: %s from the %s podFwChain of filter table due to %s", rule, egressChain, err.Error())
}
realRuleNo++
}
}
}
}
// cleanup pod firewall chain
for _, chain := range cleanupPodFwChains {
glog.V(2).Infof("Found pod fw chain to cleanup: %s", chain)
err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", chain, err.Error())
}
err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil {
return fmt.Errorf("Failed to delete the chain %s due to %s", chain, err.Error())
}
glog.V(2).Infof("Deleted pod specific firewall chain: %s from the filter table", chain)
}
// cleanup network policy chains
for _, policyChain := range cleanupPolicyChains {
glog.V(2).Infof("Found policy chain to cleanup %s", policyChain)
// first clean up any references from active pod firewall chains
for podFwChain := range activePodFwChains {
podFwChainRules, err := iptablesCmdHandler.List("filter", podFwChain)
if err != nil {
return fmt.Errorf("Unable to list rules from the chain %s: %s", podFwChain, err)
}
for i, rule := range podFwChainRules {
if strings.Contains(rule, policyChain) {
err = iptablesCmdHandler.Delete("filter", podFwChain, strconv.Itoa(i))
if err != nil {
return fmt.Errorf("Failed to delete rule %s from the chain %s", rule, podFwChain)
}
break
}
}
}
// now that all stale and active references to the network policy chain have been removed, delete the chain
err = iptablesCmdHandler.ClearChain("filter", policyChain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
}
err = iptablesCmdHandler.DeleteChain("filter", policyChain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
}
glog.V(2).Infof("Deleted network policy chain: %s from the filter table", policyChain)
}
// cleanup network policy ipsets // cleanup network policy ipsets
for _, set := range cleanupPolicyIPSets { for _, set := range cleanupPolicyIPSets {
err = set.Destroy() err = set.Destroy()

View File

@ -3,10 +3,8 @@ package netpol
import ( import (
"crypto/sha256" "crypto/sha256"
"encoding/base32" "encoding/base32"
"fmt"
"strings" "strings"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog" "github.com/golang/glog"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -62,34 +60,20 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
activePodFwChains := make(map[string]bool) activePodFwChains := make(map[string]bool)
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor: %s", err.Error())
}
dropUnmarkedTrafficRules := func(podName, podNamespace, podFwChainName string) error { dropUnmarkedTrafficRules := func(podName, podNamespace, podFwChainName string) error {
// add rule to log the packets that will be dropped due to network policy enforcement // add rule to log the packets that will be dropped due to network policy enforcement
comment := "rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace comment := "\"rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace + "\""
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10"} args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10", "\n"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
// add rule to DROP if no applicable network policy permits the traffic // add rule to DROP if no applicable network policy permits the traffic
comment = "rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace comment = "\"rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace + "\""
args = []string{"-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT"} args = []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT", "\n"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
// reset mark to let traffic pass through rest of the chains // reset mark to let traffic pass through rest of the chains
args = []string{"-j", "MARK", "--set-mark", "0/0x10000"} args = []string{"-A", podFwChainName, "-j", "MARK", "--set-mark", "0/0x10000", "\n"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
return nil return nil
} }
@ -109,105 +93,50 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
// ensure pod specific firewall chain exist for all the pods that need ingress firewall // ensure pod specific firewall chain exist for all the pods that need ingress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName) npc.filterTableRules.WriteString(":" + podFwChainName + "\n")
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePodFwChains[podFwChainName] = true activePodFwChains[podFwChainName] = true
// add entries in pod firewall to run through required network policies // add entries in pod firewall to run through required network policies
for _, policy := range networkPoliciesInfo { for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok { if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name comment := "\"run through nw policy " + policy.name + "\""
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
} }
} }
comment := "rule to permit the traffic traffic to pods when source is the pod's local node" comment := "\"rule to permit the traffic traffic to pods when source is the pod's local node\""
args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT"} args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod // ensure statefull firewall, that permits return traffic for the traffic originated by the pod
comment = "rule for stateful firewall for pod" comment = "\"rule for stateful firewall for pod\""
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"}
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting routed (coming for other node pods) // this rule applies to the traffic getting routed (coming for other node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName " to chain " + podFwChainName + "\""
args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName} args = []string{"-I", kubeForwardChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
exists, err = iptablesCmdHandler.Exists("filter", kubeOutputChainName, args...) args = []string{"-I", kubeOutputChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"}
if err != nil { npc.filterTableRules.WriteString(strings.Join(args, " "))
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", kubeOutputChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods) // this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName " to chain " + podFwChainName + "\n"
args = []string{"-m", "physdev", "--physdev-is-bridged", args = []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment, "-m", "comment", "--comment", comment,
"-d", pod.ip, "-d", pod.ip,
"-j", podFwChainName} "-j", podFwChainName, "\n"}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName) err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName)
if err != nil { if err != nil {
@ -230,83 +159,46 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
// ensure pod specific firewall chain exist for all the pods that need egress firewall // ensure pod specific firewall chain exist for all the pods that need egress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName) if !activePodFwChains[podFwChainName] {
if err != nil && err.(*iptables.Error).ExitStatus() != 1 { npc.filterTableRules.WriteString(":" + podFwChainName + "\n")
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
} }
activePodFwChains[podFwChainName] = true activePodFwChains[podFwChainName] = true
// add entries in pod firewall to run through required network policies // add entries in pod firewall to run through required network policies
for _, policy := range networkPoliciesInfo { for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok { if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name comment := "\"run through nw policy " + policy.name + "\n"
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
} }
} }
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod // ensure statefull firewall, that permits return traffic for the traffic originated by the pod
comment := "rule for stateful firewall for pod" comment := "\"rule for stateful firewall for pod\""
args := []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName} egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName}
for _, chain := range egressFilterChains { for _, chain := range egressFilterChains {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted // this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted
// to pod on a different node) // to pod on a different node)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + comment = "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName " to chain " + podFwChainName + "\""
args = []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName} args = []string{"-A", chain, "-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName, "\n"}
exists, err = iptablesCmdHandler.Exists("filter", chain, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.AppendUnique("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
} }
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods) // this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + comment = "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName " to chain " + podFwChainName + "\""
args = []string{"-m", "physdev", "--physdev-is-bridged", args = []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment, "-m", "comment", "--comment", comment,
"-s", pod.ip, "-s", pod.ip,
"-j", podFwChainName} "-j", podFwChainName, "\n"}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName) err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName)
if err != nil { if err != nil {

View File

@ -11,7 +11,6 @@ import (
"github.com/cloudnativelabs/kube-router/pkg/metrics" "github.com/cloudnativelabs/kube-router/pkg/metrics"
"github.com/cloudnativelabs/kube-router/pkg/utils" "github.com/cloudnativelabs/kube-router/pkg/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog" "github.com/golang/glog"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1" networking "k8s.io/api/networking/v1"
@ -79,20 +78,12 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
activePolicyChains := make(map[string]bool) activePolicyChains := make(map[string]bool)
activePolicyIPSets := make(map[string]bool) activePolicyIPSets := make(map[string]bool)
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor due to: %s", err.Error())
}
// run through all network policies // run through all network policies
for _, policy := range networkPoliciesInfo { for _, policy := range networkPoliciesInfo {
// ensure there is a unique chain per network policy in filter table // ensure there is a unique chain per network policy in filter table
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
err := iptablesCmdHandler.NewChain("filter", policyChainName) npc.filterTableRules.WriteString(":" + policyChainName + "\n")
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePolicyChains[policyChainName] = true activePolicyChains[policyChainName] = true
@ -153,11 +144,6 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
return nil return nil
} }
iptablesCmdHandler, err := iptables.New()
if err != nil {
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
}
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the ingress rules in the spec and create iptables rules // run through all the ingress rules in the spec and create iptables rules
@ -188,7 +174,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
for _, portProtocol := range ingressRule.ports { for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err return err
} }
} }
@ -208,7 +194,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
} }
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
return err return err
} }
} }
@ -219,7 +205,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
// so match on specified source and destination ip with all port and protocol // so match on specified source and destination ip with all port and protocol
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", ""); err != nil {
return err return err
} }
} }
@ -231,7 +217,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
for _, portProtocol := range ingressRule.ports { for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err return err
} }
} }
@ -251,7 +237,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
} }
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
return err return err
} }
} }
@ -262,7 +248,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if ingressRule.matchAllSource && ingressRule.matchAllPorts { if ingressRule.matchAllSource && ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, "", ""); err != nil {
return err return err
} }
} }
@ -282,7 +268,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
for _, portProtocol := range ingressRule.ports { for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err return err
} }
} }
@ -302,7 +288,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
} }
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
return err return err
} }
} }
@ -310,7 +296,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if ingressRule.matchAllPorts { if ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", ""); err != nil {
return err return err
} }
} }
@ -329,11 +315,6 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return nil return nil
} }
iptablesCmdHandler, err := iptables.New()
if err != nil {
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
}
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the egress rules in the spec and create iptables rules // run through all the egress rules in the spec and create iptables rules
@ -363,7 +344,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
for _, portProtocol := range egressRule.ports { for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err return err
} }
} }
@ -385,7 +366,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
} }
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
return err return err
} }
} }
@ -397,7 +378,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
// so match on specified source and destination ip with all port and protocol // so match on specified source and destination ip with all port and protocol
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", ""); err != nil {
return err return err
} }
} }
@ -409,7 +390,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
for _, portProtocol := range egressRule.ports { for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port); err != nil {
return err return err
} }
} }
@ -420,7 +401,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if egressRule.matchAllDestinations && egressRule.matchAllPorts { if egressRule.matchAllDestinations && egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", "", ""); err != nil {
return err return err
} }
} }
@ -439,7 +420,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
for _, portProtocol := range egressRule.ports { for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err return err
} }
} }
@ -447,7 +428,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if egressRule.matchAllPorts { if egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", ""); err != nil {
return err return err
} }
} }
@ -456,13 +437,13 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return nil return nil
} }
func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort string) error { func (npc *NetworkPolicyController) appendRuleToPolicyChain(policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort string) error {
if iptablesCmdHandler == nil {
return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil")
}
args := make([]string, 0) args := make([]string, 0)
args = append(args, "-A", policyChainName)
if comment != "" { if comment != "" {
args = append(args, "-m", "comment", "--comment", comment) args = append(args, "-m", "comment", "--comment", "\""+comment+"\"")
} }
if srcIPSetName != "" { if srcIPSetName != "" {
args = append(args, "-m", "set", "--match-set", srcIPSetName, "src") args = append(args, "-m", "set", "--match-set", srcIPSetName, "src")
@ -477,19 +458,11 @@ func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *
args = append(args, "--dport", dPort) args = append(args, "--dport", dPort)
} }
markComment := "rule to mark traffic matching a network policy" markArgs := append(args, "-j", "MARK", "--set-xmark", "0x10000/0x10000", "\n")
markArgs := append(args, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000") npc.filterTableRules.WriteString(strings.Join(markArgs, " "))
err := iptablesCmdHandler.AppendUnique("filter", policyChainName, markArgs...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
returnComment := "rule to RETURN traffic matching a network policy" returnArgs := append(args, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN", "\n")
returnArgs := append(args, "-m", "comment", "--comment", returnComment, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN") npc.filterTableRules.WriteString(strings.Join(returnArgs, " "))
err = iptablesCmdHandler.AppendUnique("filter", policyChainName, returnArgs...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
return nil return nil
} }

47
pkg/utils/iptables.go Normal file
View File

@ -0,0 +1,47 @@
package utils
import (
"bytes"
"fmt"
"os/exec"
)
// SaveInto calls `iptables-save` for given table and stores result in a given buffer.
func SaveInto(table string, buffer *bytes.Buffer) error {
path, err := exec.LookPath("iptables-save")
if err != nil {
return err
}
stderrBuffer := bytes.NewBuffer(nil)
args := []string{"iptables-save", "-t", table}
cmd := exec.Cmd{
Path: path,
Args: args,
Stdout: buffer,
Stderr: stderrBuffer,
}
if err := cmd.Run(); err != nil {
return fmt.Errorf("%v (%s)", err, stderrBuffer)
}
return nil
}
// Restore runs `iptables-restore` passing data through []byte.
func Restore(table string, data []byte) error {
path, err := exec.LookPath("iptables-restore")
if err != nil {
return err
}
args := []string{"iptables-restore", "-T", table}
cmd := exec.Cmd{
Path: path,
Args: args,
Stdin: bytes.NewBuffer(data),
}
b, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%v (%s)", err, b)
}
return nil
}