Fix for network policy connection refused issue (#461) (#471)

* Instead of clearing the iptables firewall chains for each resync, new chains are now generated side-by-side with the existing ones.

* Chain naming now has an addition component, version, which ensures chain name uniqueness.

* Existing cleanup procedure for stale iptables rules will handle garbage collection of unused chains.
This commit is contained in:
Johan Thomsen 2018-06-21 13:09:24 +02:00 committed by Murali Reddy
parent 7c21815b43
commit 58da2d412d

View File

@ -206,6 +206,7 @@ func (npc *NetworkPolicyController) Sync() error {
defer npc.mu.Unlock() defer npc.mu.Unlock()
start := time.Now() start := time.Now()
syncVersion := string(start.UnixNano())
defer func() { defer func() {
endTime := time.Since(start) endTime := time.Since(start)
if npc.MetricsEnabled { if npc.MetricsEnabled {
@ -214,7 +215,7 @@ func (npc *NetworkPolicyController) Sync() error {
glog.V(1).Infof("sync iptables took %v", endTime) glog.V(1).Infof("sync iptables took %v", endTime)
}() }()
glog.V(1).Info("Starting periodic sync of iptables") glog.V(1).Info("Starting sync of iptables")
if npc.v1NetworkPolicy { if npc.v1NetworkPolicy {
npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo() npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
@ -229,12 +230,12 @@ func (npc *NetworkPolicyController) Sync() error {
} }
} }
activePolicyChains, activePolicyIpSets, err := npc.syncNetworkPolicyChains() activePolicyChains, activePolicyIpSets, err := npc.syncNetworkPolicyChains(syncVersion)
if err != nil { if err != nil {
return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error()) return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error())
} }
activePodFwChains, err := npc.syncPodFirewallChains() activePodFwChains, err := npc.syncPodFirewallChains(syncVersion)
if err != nil { if err != nil {
return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error()) return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error())
} }
@ -252,7 +253,7 @@ func (npc *NetworkPolicyController) Sync() error {
// is used for matching destination ip address. Each ingress rule in the network // is used for matching destination ip address. Each ingress rule in the network
// policyspec is evaluated to set of matching pods, which are grouped in to a // policyspec is evaluated to set of matching pods, which are grouped in to a
// ipset used for source ip addr matching. // ipset used for source ip addr matching.
func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool, map[string]bool, error) { func (npc *NetworkPolicyController) syncNetworkPolicyChains(version string) (map[string]bool, map[string]bool, error) {
activePolicyChains := make(map[string]bool) activePolicyChains := make(map[string]bool)
activePolicyIpSets := make(map[string]bool) activePolicyIpSets := make(map[string]bool)
@ -266,7 +267,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
for _, policy := range *npc.networkPoliciesInfo { for _, policy := range *npc.networkPoliciesInfo {
// ensure there is a unique chain per network policy in filter table // ensure there is a unique chain per network policy in filter table
policyChainName := networkPolicyChainName(policy.namespace, policy.name) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
err := iptablesCmdHandler.NewChain("filter", policyChainName) err := iptablesCmdHandler.NewChain("filter", policyChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 { if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
@ -305,18 +306,12 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
glog.Errorf("failed to refresh targetDestPodIpSet: " + err.Error()) glog.Errorf("failed to refresh targetDestPodIpSet: " + err.Error())
} }
// TODO use iptables-restore to better implement the logic, than flush and add rules err = npc.processIngressRules(policy, targetDestPodIpSetName, activePolicyIpSets, version)
err = iptablesCmdHandler.ClearChain("filter", policyChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
err = npc.processIngressRules(policy, targetDestPodIpSetName, activePolicyIpSets)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
err = npc.processEgressRules(policy, targetSourcePodIpSetName, activePolicyIpSets) err = npc.processEgressRules(policy, targetSourcePodIpSetName, activePolicyIpSets, version)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -328,7 +323,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains() (map[string]bool,
} }
func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo, func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo,
targetDestPodIpSetName string, activePolicyIpSets map[string]bool) error { targetDestPodIpSetName string, activePolicyIpSets map[string]bool, version string) error {
// From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
// so no whitelist rules to be added to the network policy // so no whitelist rules to be added to the network policy
@ -341,7 +336,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error()) return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
} }
policyChainName := networkPolicyChainName(policy.namespace, policy.name) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the ingress rules in the spec and create iptable rules // run through all the ingress rules in the spec and create iptable rules
// in the chain for the network policy // in the chain for the network policy
@ -466,7 +461,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
} }
func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
targetSourcePodIpSetName string, activePolicyIpSets map[string]bool) error { targetSourcePodIpSetName string, activePolicyIpSets map[string]bool, version string) error {
// From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
// so no whitelist rules to be added to the network policy // so no whitelist rules to be added to the network policy
@ -479,7 +474,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error()) return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
} }
policyChainName := networkPolicyChainName(policy.namespace, policy.name) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the egress rules in the spec and create iptable rules // run through all the egress rules in the spec and create iptable rules
// in the chain for the network policy // in the chain for the network policy
@ -600,7 +595,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return nil return nil
} }
func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, error) { func (npc *NetworkPolicyController) syncPodFirewallChains(version string) (map[string]bool, error) {
activePodFwChains := make(map[string]bool) activePodFwChains := make(map[string]bool)
@ -623,7 +618,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
} }
// ensure pod specific firewall chain exist for all the pods that need ingress firewall // ensure pod specific firewall chain exist for all the pods that need ingress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name) podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName) err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 { if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
@ -703,7 +698,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
for _, policy := range *npc.networkPoliciesInfo { for _, policy := range *npc.networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok { if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name comment := "run through nw policy " + policy.name
policyChainName := networkPolicyChainName(policy.namespace, policy.name) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil { if err != nil {
@ -747,7 +742,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
} }
// ensure pod specific firewall chain exist for all the pods that need egress firewall // ensure pod specific firewall chain exist for all the pods that need egress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name) podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName) err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 { if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
@ -801,7 +796,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
for _, policy := range *npc.networkPoliciesInfo { for _, policy := range *npc.networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok { if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name comment := "run through nw policy " + policy.name
policyChainName := networkPolicyChainName(policy.namespace, policy.name) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil { if err != nil {
@ -1302,14 +1297,14 @@ func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() (*[]networkPo
return &NetworkPolicies, nil return &NetworkPolicies, nil
} }
func podFirewallChainName(namespace, podName string) string { func podFirewallChainName(namespace, podName string, version string) string {
hash := sha256.Sum256([]byte(namespace + podName)) hash := sha256.Sum256([]byte(namespace + podName + version))
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return "KUBE-POD-FW-" + encoded[:16] return "KUBE-POD-FW-" + encoded[:16]
} }
func networkPolicyChainName(namespace, policyName string) string { func networkPolicyChainName(namespace, policyName string, version string) string {
hash := sha256.Sum256([]byte(namespace + policyName)) hash := sha256.Sum256([]byte(namespace + policyName + version))
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return "KUBE-NWPLCY-" + encoded[:16] return "KUBE-NWPLCY-" + encoded[:16]
} }