kube-router/pkg/controllers/netpol/network_policy_controller.go
Aaron U'Ren 837554bf1a
Fix Memory Consumption in network_policy_controller (#902)
* feat(gitignore): don't track intellij files

* fact(network_policy): networkPoliciesInfo -> stack

Take networkPoliciesInfo off of the npc struct and convert it to a stack
variable that is easy to cleanup.

* fix(network_policy): k8s obj memory accumulation

Kubernetes informers will block on handler execution and will then begin
to accumulate cached Kubernetes object information into the heap. This
change moves the full sync logic into it's own goroutine where full
syncs are triggered and gated via writing to a single item channel.

This ensures that:
- Syncs will only happen one at a time (as they are full syncs and we
  can't process multiple at once)
- Sync requests are only ever delayed and never lost as they will be
  added to the request channel
- After we make a sync request we return fast to ensure that the handler
  execution returns fast and that we don't block the Kubernetes
  informers

* fact(network_policy): rework readyForUpdates

Now that we are better managing requests for full syncs we no longer
need to manage readyForUpdates on the npc controller. We already enforce
not blocking the handlers and a single sync execution chain, whether it
comes from the controller in the form of a periodic sync or whether it
comes from a Kubernetes informer, either way the result is a
non-blocking, single thread of execution, full sync.

* fix(network_policy): address PR feedback
2020-05-20 16:15:57 +05:30

1804 lines
68 KiB
Go

package netpol
import (
"crypto/sha256"
"encoding/base32"
"errors"
"fmt"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"net"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/cloudnativelabs/kube-router/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/pkg/metrics"
"github.com/cloudnativelabs/kube-router/pkg/options"
"github.com/cloudnativelabs/kube-router/pkg/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
api "k8s.io/api/core/v1"
apiextensions "k8s.io/api/extensions/v1beta1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
const (
networkPolicyAnnotation = "net.beta.kubernetes.io/network-policy"
kubePodFirewallChainPrefix = "KUBE-POD-FW-"
kubeNetworkPolicyChainPrefix = "KUBE-NWPLCY-"
kubeSourceIpSetPrefix = "KUBE-SRC-"
kubeDestinationIpSetPrefix = "KUBE-DST-"
)
// Network policy controller provides both ingress and egress filtering for the pods as per the defined network
// policies. Two different types of iptables chains are used. Each pod running on the node which either
// requires ingress or egress filtering gets a pod specific chains. Each network policy has a iptables chain, which
// has rules expressed through ipsets matching source and destination pod ip's. In the FORWARD chain of the
// filter table a rule is added to jump the traffic originating (in case of egress network policy) from the pod
// or destined (in case of ingress network policy) to the pod specific iptables chain. Each
// pod specific iptables chain has rules to jump to the network polices chains, that pod matches. So packet
// originating/destined from/to pod goes through fitler table's, FORWARD chain, followed by pod specific chain,
// followed by one or more network policy chains, till there is a match which will accept the packet, or gets
// dropped by the rule in the pod chain, if there is no match.
// NetworkPolicyController strcut to hold information required by NetworkPolicyController
type NetworkPolicyController struct {
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
v1NetworkPolicy bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}
ipSetHandler *utils.IPSet
podLister cache.Indexer
npLister cache.Indexer
nsLister cache.Indexer
PodEventHandler cache.ResourceEventHandler
NamespaceEventHandler cache.ResourceEventHandler
NetworkPolicyEventHandler cache.ResourceEventHandler
}
// internal structure to represent a network policy
type networkPolicyInfo struct {
name string
namespace string
podSelector labels.Selector
// set of pods matching network policy spec podselector label selector
targetPods map[string]podInfo
// whitelist ingress rules from the network policy spec
ingressRules []ingressRule
// whitelist egress rules from the network policy spec
egressRules []egressRule
// policy type "ingress" or "egress" or "both" as defined by PolicyType in the spec
policyType string
}
// internal structure to represent Pod
type podInfo struct {
ip string
name string
namespace string
labels map[string]string
}
// internal stucture to represent NetworkPolicyIngressRule in the spec
type ingressRule struct {
matchAllPorts bool
ports []protocolAndPort
namedPorts []endPoints
matchAllSource bool
srcPods []podInfo
srcIPBlocks [][]string
}
// internal structure to represent NetworkPolicyEgressRule in the spec
type egressRule struct {
matchAllPorts bool
ports []protocolAndPort
namedPorts []endPoints
matchAllDestinations bool
dstPods []podInfo
dstIPBlocks [][]string
}
type protocolAndPort struct {
protocol string
port string
}
type endPoints struct {
ips []string
protocolAndPort
}
type numericPort2eps map[string]*endPoints
type protocol2eps map[string]numericPort2eps
type namedPort2eps map[string]protocol2eps
// Run runs forver till we receive notification on stopCh
func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
defer wg.Done()
glog.Info("Starting network policy controller")
npc.healthChan = healthChan
// Full syncs of the network policy controller take a lot of time and can only be processed one at a time,
// therefore, we start it in it's own goroutine and request a sync through a single item channel
glog.Info("Starting network policy controller full sync goroutine")
wg.Add(1)
go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
// Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first
select {
case <-stopCh:
glog.Info("Shutting down network policies full sync goroutine")
return
default:
}
select {
case <-stopCh:
glog.Info("Shutting down network policies full sync goroutine")
return
case <-fullSyncRequest:
glog.V(3).Info("Received request for a full sync, processing")
npc.fullPolicySync() // fullPolicySync() is a blocking request here
}
}
}(npc.fullSyncRequestChan, stopCh, wg)
// loop forever till notified to stop on stopCh
for {
glog.V(1).Info("Requesting periodic sync of iptables to reflect network policies")
npc.RequestFullSync()
select {
case <-stopCh:
glog.Infof("Shutting down network policies controller")
return
case <-t.C:
}
}
}
// OnPodUpdate handles updates to pods from the Kubernetes api server
func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
pod := obj.(*api.Pod)
glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)
npc.RequestFullSync()
}
// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
netpol := obj.(*networking.NetworkPolicy)
glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)
npc.RequestFullSync()
}
// OnNamespaceUpdate handles updates to namespace from kubernetes api server
func (npc *NetworkPolicyController) OnNamespaceUpdate(obj interface{}) {
namespace := obj.(*api.Namespace)
// namespace (and annotations on it) has no significance in GA ver of network policy
if npc.v1NetworkPolicy {
return
}
glog.V(2).Infof("Received update for namespace: %s", namespace.Name)
npc.RequestFullSync()
}
// RequestFullSync allows the request of a full network policy sync without blocking the callee
func (npc *NetworkPolicyController) RequestFullSync() {
select {
case npc.fullSyncRequestChan <- struct{}{}:
glog.V(3).Info("Full sync request queue was empty so a full sync request was successfully sent")
default: // Don't block if the buffered channel is full, return quickly so that we don't block callee execution
glog.V(1).Info("Full sync request queue was full, skipping...")
}
}
// Sync synchronizes iptables to desired state of network policies
func (npc *NetworkPolicyController) fullPolicySync() {
var err error
var networkPoliciesInfo []networkPolicyInfo
npc.mu.Lock()
defer npc.mu.Unlock()
healthcheck.SendHeartBeat(npc.healthChan, "NPC")
start := time.Now()
syncVersion := strconv.FormatInt(start.UnixNano(), 10)
defer func() {
endTime := time.Since(start)
if npc.MetricsEnabled {
metrics.ControllerIptablesSyncTime.Observe(endTime.Seconds())
}
glog.V(1).Infof("sync iptables took %v", endTime)
}()
glog.V(1).Infof("Starting sync of iptables with version: %s", syncVersion)
if npc.v1NetworkPolicy {
networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
if err != nil {
glog.Errorf("Aborting sync. Failed to build network policies: %v", err.Error())
return
}
} else {
// TODO remove the Beta support
networkPoliciesInfo, err = npc.buildBetaNetworkPoliciesInfo()
if err != nil {
glog.Errorf("Aborting sync. Failed to build network policies: %v", err.Error())
return
}
}
activePolicyChains, activePolicyIpSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion)
if err != nil {
glog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error())
return
}
activePodFwChains, err := npc.syncPodFirewallChains(networkPoliciesInfo, syncVersion)
if err != nil {
glog.Errorf("Aborting sync. Failed to sync pod firewalls: %v", err.Error())
return
}
err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIpSets)
if err != nil {
glog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error())
return
}
}
// Configure iptables rules representing each network policy. All pod's matched by
// network policy spec podselector labels are grouped together in one ipset which
// is used for matching destination ip address. Each ingress rule in the network
// policyspec is evaluated to set of matching pods, which are grouped in to a
// ipset used for source ip addr matching.
func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo []networkPolicyInfo, version string) (map[string]bool, map[string]bool, error) {
start := time.Now()
defer func() {
endTime := time.Since(start)
metrics.ControllerPolicyChainsSyncTime.Observe(endTime.Seconds())
glog.V(2).Infof("Syncing network policy chains took %v", endTime)
}()
activePolicyChains := make(map[string]bool)
activePolicyIpSets := make(map[string]bool)
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor due to: %s", err.Error())
}
// run through all network policies
for _, policy := range networkPoliciesInfo {
// ensure there is a unique chain per network policy in filter table
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
err := iptablesCmdHandler.NewChain("filter", policyChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePolicyChains[policyChainName] = true
currnetPodIps := make([]string, 0, len(policy.targetPods))
for ip := range policy.targetPods {
currnetPodIps = append(currnetPodIps, ip)
}
if policy.policyType == "both" || policy.policyType == "ingress" {
// create a ipset for all destination pod ip's matched by the policy spec PodSelector
targetDestPodIpSetName := policyDestinationPodIpSetName(policy.namespace, policy.name)
targetDestPodIpSet, err := npc.ipSetHandler.Create(targetDestPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
}
err = targetDestPodIpSet.Refresh(currnetPodIps, utils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh targetDestPodIpSet,: " + err.Error())
}
err = npc.processIngressRules(policy, targetDestPodIpSetName, activePolicyIpSets, version)
if err != nil {
return nil, nil, err
}
activePolicyIpSets[targetDestPodIpSet.Name] = true
}
if policy.policyType == "both" || policy.policyType == "egress" {
// create a ipset for all source pod ip's matched by the policy spec PodSelector
targetSourcePodIpSetName := policySourcePodIpSetName(policy.namespace, policy.name)
targetSourcePodIpSet, err := npc.ipSetHandler.Create(targetSourcePodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
}
err = targetSourcePodIpSet.Refresh(currnetPodIps, utils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh targetSourcePodIpSet: " + err.Error())
}
err = npc.processEgressRules(policy, targetSourcePodIpSetName, activePolicyIpSets, version)
if err != nil {
return nil, nil, err
}
activePolicyIpSets[targetSourcePodIpSet.Name] = true
}
}
glog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.")
return activePolicyChains, activePolicyIpSets, nil
}
func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo,
targetDestPodIpSetName string, activePolicyIpSets map[string]bool, version string) error {
// From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
// so no whitelist rules to be added to the network policy
if policy.ingressRules == nil {
return nil
}
iptablesCmdHandler, err := iptables.New()
if err != nil {
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
}
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the ingress rules in the spec and create iptables rules
// in the chain for the network policy
for i, ingressRule := range policy.ingressRules {
if len(ingressRule.srcPods) != 0 {
srcPodIpSetName := policyIndexedSourcePodIpSetName(policy.namespace, policy.name, i)
srcPodIpSet, err := npc.ipSetHandler.Create(srcPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIpSets[srcPodIpSet.Name] = true
ingressRuleSrcPodIps := make([]string, 0, len(ingressRule.srcPods))
for _, pod := range ingressRule.srcPods {
ingressRuleSrcPodIps = append(ingressRuleSrcPodIps, pod.ip)
}
err = srcPodIpSet.Refresh(ingressRuleSrcPodIps, utils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh srcPodIpSet: " + err.Error())
}
if len(ingressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the ingress rule
// so match on specified source and destination ip's and specified port (if any) and protocol
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err
}
}
}
if len(ingressRule.namedPorts) != 0 {
for j, endPoints := range ingressRule.namedPorts {
namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j)
namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIpSets[namedPortIpSet.Name] = true
err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh namedPortIpSet: " + err.Error())
}
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil {
return err
}
}
}
if len(ingressRule.ports) == 0 && len(ingressRule.namedPorts) == 0 {
// case where no 'ports' details specified in the ingress rule but 'from' details specified
// so match on specified source and destination ip with all port and protocol
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, targetDestPodIpSetName, "", ""); err != nil {
return err
}
}
}
// case where only 'ports' details specified but no 'from' details in the ingress rule
// so match on all sources, with specified port (if any) and protocol
if ingressRule.matchAllSource && !ingressRule.matchAllPorts {
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err
}
}
for j, endPoints := range ingressRule.namedPorts {
namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j)
namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIpSets[namedPortIpSet.Name] = true
err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh namedPortIpSet: " + err.Error())
}
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil {
return err
}
}
}
// case where nether ports nor from details are speified in the ingress rule
// so match on all ports, protocol, source IP's
if ingressRule.matchAllSource && ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIpSetName, "", ""); err != nil {
return err
}
}
if len(ingressRule.srcIPBlocks) != 0 {
srcIpBlockIpSetName := policyIndexedSourceIpBlockIpSetName(policy.namespace, policy.name, i)
srcIpBlockIpSet, err := npc.ipSetHandler.Create(srcIpBlockIpSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIpSets[srcIpBlockIpSet.Name] = true
err = srcIpBlockIpSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks)
if err != nil {
glog.Errorf("failed to refresh srcIpBlockIpSet: " + err.Error())
}
if !ingressRule.matchAllPorts {
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err
}
}
for j, endPoints := range ingressRule.namedPorts {
namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j)
namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIpSets[namedPortIpSet.Name] = true
err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh namedPortIpSet: " + err.Error())
}
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil {
return err
}
}
}
if ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, targetDestPodIpSetName, "", ""); err != nil {
return err
}
}
}
}
return nil
}
func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
targetSourcePodIpSetName string, activePolicyIpSets map[string]bool, version string) error {
// From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
// so no whitelist rules to be added to the network policy
if policy.egressRules == nil {
return nil
}
iptablesCmdHandler, err := iptables.New()
if err != nil {
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
}
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the egress rules in the spec and create iptables rules
// in the chain for the network policy
for i, egressRule := range policy.egressRules {
if len(egressRule.dstPods) != 0 {
dstPodIpSetName := policyIndexedDestinationPodIpSetName(policy.namespace, policy.name, i)
dstPodIpSet, err := npc.ipSetHandler.Create(dstPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIpSets[dstPodIpSet.Name] = true
egressRuleDstPodIps := make([]string, 0, len(egressRule.dstPods))
for _, pod := range egressRule.dstPods {
egressRuleDstPodIps = append(egressRuleDstPodIps, pod.ip)
}
err = dstPodIpSet.Refresh(egressRuleDstPodIps, utils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh dstPodIpSet: " + err.Error())
}
if len(egressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the egress rule
// so match on specified source and destination ip's and specified port (if any) and protocol
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err
}
}
}
if len(egressRule.namedPorts) != 0 {
for j, endPoints := range egressRule.namedPorts {
namedPortIpSetName := policyIndexedEgressNamedPortIpSetName(policy.namespace, policy.name, i, j)
namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIpSets[namedPortIpSet.Name] = true
err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh namedPortIpSet: " + err.Error())
}
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil {
return err
}
}
}
if len(egressRule.ports) == 0 && len(egressRule.namedPorts) == 0 {
// case where no 'ports' details specified in the ingress rule but 'from' details specified
// so match on specified source and destination ip with all port and protocol
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstPodIpSetName, "", ""); err != nil {
return err
}
}
}
// case where only 'ports' details specified but no 'to' details in the egress rule
// so match on all sources, with specified port (if any) and protocol
if egressRule.matchAllDestinations && !egressRule.matchAllPorts {
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, "", portProtocol.protocol, portProtocol.port); err != nil {
return err
}
}
}
// case where nether ports nor from details are speified in the egress rule
// so match on all ports, protocol, source IP's
if egressRule.matchAllDestinations && egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, "", "", ""); err != nil {
return err
}
}
if len(egressRule.dstIPBlocks) != 0 {
dstIpBlockIpSetName := policyIndexedDestinationIpBlockIpSetName(policy.namespace, policy.name, i)
dstIpBlockIpSet, err := npc.ipSetHandler.Create(dstIpBlockIpSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIpSets[dstIpBlockIpSet.Name] = true
err = dstIpBlockIpSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks)
if err != nil {
glog.Errorf("failed to refresh dstIpBlockIpSet: " + err.Error())
}
if !egressRule.matchAllPorts {
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstIpBlockIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
return err
}
}
}
if egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstIpBlockIpSetName, "", ""); err != nil {
return err
}
}
}
}
return nil
}
func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIpSetName, dstIpSetName, protocol, dPort string) error {
if iptablesCmdHandler == nil {
return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil")
}
args := make([]string, 0)
if comment != "" {
args = append(args, "-m", "comment", "--comment", comment)
}
if srcIpSetName != "" {
args = append(args, "-m", "set", "--match-set", srcIpSetName, "src")
}
if dstIpSetName != "" {
args = append(args, "-m", "set", "--match-set", dstIpSetName, "dst")
}
if protocol != "" {
args = append(args, "-p", protocol)
}
if dPort != "" {
args = append(args, "--dport", dPort)
}
args = append(args, "-j", "ACCEPT")
err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
return nil
}
func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []networkPolicyInfo, version string) (map[string]bool, error) {
activePodFwChains := make(map[string]bool)
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor: %s", err.Error())
}
// loop through the pods running on the node which to which ingress network policies to be applied
ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String())
if err != nil {
return nil, err
}
for _, pod := range *ingressNetworkPolicyEnabledPods {
// below condition occurs when we get trasient update while removing or adding pod
// subseqent update will do the correct action
if len(pod.ip) == 0 || pod.ip == "" {
continue
}
// ensure pod specific firewall chain exist for all the pods that need ingress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePodFwChains[podFwChainName] = true
// add entries in pod firewall to run through required network policies
for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
}
comment := "rule to permit the traffic traffic to pods when source is the pod's local node"
args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod
comment = "rule for stateful firewall for pod"
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting routed (coming for other node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
exists, err = iptablesCmdHandler.Exists("filter", "OUTPUT", args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", "OUTPUT", 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-d", pod.ip,
"-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// add rule to log the packets that will be dropped due to network policy enforcement
comment = "rule to log dropped traffic POD name:" + pod.name + " namespace: " + pod.namespace
args = []string{"-m", "comment", "--comment", comment, "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
// add default DROP rule at the end of chain
comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace
args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// loop through the pods running on the node which egress network policies to be applied
egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String())
if err != nil {
return nil, err
}
for _, pod := range *egressNetworkPolicyEnabledPods {
// below condition occurs when we get trasient update while removing or adding pod
// subseqent update will do the correct action
if len(pod.ip) == 0 || pod.ip == "" {
continue
}
// ensure pod specific firewall chain exist for all the pods that need egress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePodFwChains[podFwChainName] = true
// add entries in pod firewall to run through required network policies
for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
}
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod
comment := "rule for stateful firewall for pod"
args := []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
egressFilterChains := []string{"FORWARD", "OUTPUT", "INPUT"}
for _, chain := range egressFilterChains {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted
// to pod on a different node)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", chain, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-s", pod.ip,
"-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// add rule to log the packets that will be dropped due to network policy enforcement
comment = "rule to log dropped traffic POD name:" + pod.name + " namespace: " + pod.namespace
args = []string{"-m", "comment", "--comment", comment, "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
// add default DROP rule at the end of chain
comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace
args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
return activePodFwChains, nil
}
func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error {
cleanupPodFwChains := make([]string, 0)
cleanupPolicyChains := make([]string, 0)
cleanupPolicyIPSets := make([]*utils.Set, 0)
// initialize tool sets for working with iptables and ipset
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("failed to initialize iptables command executor due to %s", err.Error())
}
ipsets, err := utils.NewIPSet(false)
if err != nil {
glog.Fatalf("failed to create ipsets command executor due to %s", err.Error())
}
err = ipsets.Save()
if err != nil {
glog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error())
}
// find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed
chains, err := iptablesCmdHandler.ListChains("filter")
for _, chain := range chains {
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
if _, ok := activePolicyChains[chain]; !ok {
cleanupPolicyChains = append(cleanupPolicyChains, chain)
}
}
if strings.HasPrefix(chain, kubePodFirewallChainPrefix) {
if _, ok := activePodFwChains[chain]; !ok {
cleanupPodFwChains = append(cleanupPodFwChains, chain)
}
}
}
for _, set := range ipsets.Sets {
if strings.HasPrefix(set.Name, kubeSourceIpSetPrefix) ||
strings.HasPrefix(set.Name, kubeDestinationIpSetPrefix) {
if _, ok := activePolicyIPSets[set.Name]; !ok {
cleanupPolicyIPSets = append(cleanupPolicyIPSets, set)
}
}
}
// remove stale iptables podFwChain references from the filter table chains
for _, podFwChain := range cleanupPodFwChains {
primaryChains := []string{"FORWARD", "OUTPUT", "INPUT"}
for _, egressChain := range primaryChains {
forwardChainRules, err := iptablesCmdHandler.List("filter", egressChain)
if err != nil {
return fmt.Errorf("failed to list rules in filter table, %s podFwChain due to %s", egressChain, err.Error())
}
// TODO delete rule by spec, than rule number to avoid extra loop
var realRuleNo int
for i, rule := range forwardChainRules {
if strings.Contains(rule, podFwChain) {
err = iptablesCmdHandler.Delete("filter", egressChain, strconv.Itoa(i-realRuleNo))
if err != nil {
return fmt.Errorf("failed to delete rule: %s from the %s podFwChain of filter table due to %s", rule, egressChain, err.Error())
}
realRuleNo++
}
}
}
}
// cleanup pod firewall chain
for _, chain := range cleanupPodFwChains {
glog.V(2).Infof("Found pod fw chain to cleanup: %s", chain)
err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", chain, err.Error())
}
err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil {
return fmt.Errorf("Failed to delete the chain %s due to %s", chain, err.Error())
}
glog.V(2).Infof("Deleted pod specific firewall chain: %s from the filter table", chain)
}
// cleanup network policy chains
for _, policyChain := range cleanupPolicyChains {
glog.V(2).Infof("Found policy chain to cleanup %s", policyChain)
// first clean up any references from active pod firewall chains
for podFwChain := range activePodFwChains {
podFwChainRules, err := iptablesCmdHandler.List("filter", podFwChain)
if err != nil {
}
for i, rule := range podFwChainRules {
if strings.Contains(rule, policyChain) {
err = iptablesCmdHandler.Delete("filter", podFwChain, strconv.Itoa(i))
if err != nil {
return fmt.Errorf("Failed to delete rule %s from the chain %s", rule, podFwChain)
}
break
}
}
}
// now that all stale and active references to the network policy chain have been removed, delete the chain
err = iptablesCmdHandler.ClearChain("filter", policyChain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
}
err = iptablesCmdHandler.DeleteChain("filter", policyChain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
}
glog.V(2).Infof("Deleted network policy chain: %s from the filter table", policyChain)
}
// cleanup network policy ipsets
for _, set := range cleanupPolicyIPSets {
err = set.Destroy()
if err != nil {
return fmt.Errorf("Failed to delete ipset %s due to %s", set.Name, err)
}
}
return nil
}
func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIp string) (*map[string]podInfo, error) {
nodePods := make(map[string]podInfo)
for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod)
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
for _, policy := range networkPoliciesInfo {
if policy.namespace != pod.ObjectMeta.Namespace {
continue
}
_, ok := policy.targetPods[pod.Status.PodIP]
if ok && (policy.policyType == "both" || policy.policyType == "ingress") {
glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
namespace: pod.ObjectMeta.Namespace,
labels: pod.ObjectMeta.Labels}
break
}
}
}
return &nodePods, nil
}
func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIp string) (*map[string]podInfo, error) {
nodePods := make(map[string]podInfo)
for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod)
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
for _, policy := range networkPoliciesInfo {
if policy.namespace != pod.ObjectMeta.Namespace {
continue
}
_, ok := policy.targetPods[pod.Status.PodIP]
if ok && (policy.policyType == "both" || policy.policyType == "egress") {
glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
namespace: pod.ObjectMeta.Namespace,
labels: pod.ObjectMeta.Labels}
break
}
}
}
return &nodePods, nil
}
func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) {
numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0)
for _, npPort := range npPorts {
if npPort.Port == nil {
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)})
} else if npPort.Port.Type == intstr.Int {
numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)})
} else {
if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok {
if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok {
for _, eps := range numericPort2eps {
namedPorts = append(namedPorts, *eps)
}
}
}
}
}
return
}
func (npc *NetworkPolicyController) processBetaNetworkPolicyPorts(npPorts []apiextensions.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) {
numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0)
for _, npPort := range npPorts {
if npPort.Port == nil {
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)})
} else if npPort.Port.Type == intstr.Int {
numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)})
} else {
if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok {
if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok {
for _, eps := range numericPort2eps {
namedPorts = append(namedPorts, *eps)
}
}
}
}
}
return
}
func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyInfo, error) {
NetworkPolicies := make([]networkPolicyInfo, 0)
for _, policyObj := range npc.npLister.List() {
policy, ok := policyObj.(*networking.NetworkPolicy)
podSelector, _ := v1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if !ok {
return nil, fmt.Errorf("Failed to convert")
}
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
podSelector: podSelector,
policyType: "ingress",
}
ingressType, egressType := false, false
for _, policyType := range policy.Spec.PolicyTypes {
if policyType == networking.PolicyTypeIngress {
ingressType = true
}
if policyType == networking.PolicyTypeEgress {
egressType = true
}
}
if ingressType && egressType {
newPolicy.policyType = "both"
} else if egressType {
newPolicy.policyType = "egress"
} else if ingressType {
newPolicy.policyType = "ingress"
}
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, podSelector)
newPolicy.targetPods = make(map[string]podInfo)
namedPort2IngressEps := make(namedPort2eps)
if err == nil {
for _, matchingPod := range matchingPods {
if matchingPod.Status.PodIP == "" {
continue
}
newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels}
npc.grabNamedPortFromPod(matchingPod, &namedPort2IngressEps)
}
}
if policy.Spec.Ingress == nil {
newPolicy.ingressRules = nil
} else {
newPolicy.ingressRules = make([]ingressRule, 0)
}
if policy.Spec.Egress == nil {
newPolicy.egressRules = nil
} else {
newPolicy.egressRules = make([]egressRule, 0)
}
for _, specIngressRule := range policy.Spec.Ingress {
ingressRule := ingressRule{}
ingressRule.srcPods = make([]podInfo, 0)
ingressRule.srcIPBlocks = make([][]string, 0)
// If this field is empty or missing in the spec, this rule matches all sources
if len(specIngressRule.From) == 0 {
ingressRule.matchAllSource = true
} else {
ingressRule.matchAllSource = false
for _, peer := range specIngressRule.From {
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
for _, peerPod := range peerPods {
if peerPod.Status.PodIP == "" {
continue
}
ingressRule.srcPods = append(ingressRule.srcPods,
podInfo{ip: peerPod.Status.PodIP,
name: peerPod.ObjectMeta.Name,
namespace: peerPod.ObjectMeta.Namespace,
labels: peerPod.ObjectMeta.Labels})
}
}
ingressRule.srcIPBlocks = append(ingressRule.srcIPBlocks, npc.evalIPBlockPeer(peer)...)
}
}
ingressRule.ports = make([]protocolAndPort, 0)
ingressRule.namedPorts = make([]endPoints, 0)
// If this field is empty or missing in the spec, this rule matches all ports
if len(specIngressRule.Ports) == 0 {
ingressRule.matchAllPorts = true
} else {
ingressRule.matchAllPorts = false
ingressRule.ports, ingressRule.namedPorts = npc.processNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps)
}
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
}
for _, specEgressRule := range policy.Spec.Egress {
egressRule := egressRule{}
egressRule.dstPods = make([]podInfo, 0)
egressRule.dstIPBlocks = make([][]string, 0)
namedPort2EgressEps := make(namedPort2eps)
// If this field is empty or missing in the spec, this rule matches all sources
if len(specEgressRule.To) == 0 {
egressRule.matchAllDestinations = true
} else {
egressRule.matchAllDestinations = false
for _, peer := range specEgressRule.To {
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
for _, peerPod := range peerPods {
if peerPod.Status.PodIP == "" {
continue
}
egressRule.dstPods = append(egressRule.dstPods,
podInfo{ip: peerPod.Status.PodIP,
name: peerPod.ObjectMeta.Name,
namespace: peerPod.ObjectMeta.Namespace,
labels: peerPod.ObjectMeta.Labels})
npc.grabNamedPortFromPod(peerPod, &namedPort2EgressEps)
}
}
egressRule.dstIPBlocks = append(egressRule.dstIPBlocks, npc.evalIPBlockPeer(peer)...)
}
}
egressRule.ports = make([]protocolAndPort, 0)
egressRule.namedPorts = make([]endPoints, 0)
// If this field is empty or missing in the spec, this rule matches all ports
if len(specEgressRule.Ports) == 0 {
egressRule.matchAllPorts = true
} else {
egressRule.matchAllPorts = false
egressRule.ports, egressRule.namedPorts = npc.processNetworkPolicyPorts(specEgressRule.Ports, namedPort2EgressEps)
}
newPolicy.egressRules = append(newPolicy.egressRules, egressRule)
}
NetworkPolicies = append(NetworkPolicies, newPolicy)
}
return NetworkPolicies, nil
}
func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy, peer networking.NetworkPolicyPeer) ([]*api.Pod, error) {
var matchingPods []*api.Pod
matchingPods = make([]*api.Pod, 0)
var err error
// spec can have both PodSelector AND NamespaceSelector
if peer.NamespaceSelector != nil {
namespaceSelector, _ := v1.LabelSelectorAsSelector(peer.NamespaceSelector)
namespaces, err := npc.ListNamespaceByLabels(namespaceSelector)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
podSelector := labels.Everything()
if peer.PodSelector != nil {
podSelector, _ = v1.LabelSelectorAsSelector(peer.PodSelector)
}
for _, namespace := range namespaces {
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, podSelector)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
matchingPods = append(matchingPods, namespacePods...)
}
} else if peer.PodSelector != nil {
podSelector, _ := v1.LabelSelectorAsSelector(peer.PodSelector)
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace, podSelector)
}
return matchingPods, err
}
func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, podSelector labels.Selector) (ret []*api.Pod, err error) {
podLister := listers.NewPodLister(npc.podLister)
allMatchedNameSpacePods, err := podLister.Pods(namespace).List(podSelector)
if err != nil {
return nil, err
}
return allMatchedNameSpacePods, nil
}
func (npc *NetworkPolicyController) ListNamespaceByLabels(namespaceSelector labels.Selector) ([]*api.Namespace, error) {
namespaceLister := listers.NewNamespaceLister(npc.nsLister)
matchedNamespaces, err := namespaceLister.List(namespaceSelector)
if err != nil {
return nil, err
}
return matchedNamespaces, nil
}
func (npc *NetworkPolicyController) evalIPBlockPeer(peer networking.NetworkPolicyPeer) [][]string {
ipBlock := make([][]string, 0)
if peer.PodSelector == nil && peer.NamespaceSelector == nil && peer.IPBlock != nil {
if cidr := peer.IPBlock.CIDR; strings.HasSuffix(cidr, "/0") {
ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0"}, []string{"128.0.0.0/1", utils.OptionTimeout, "0"})
} else {
ipBlock = append(ipBlock, []string{cidr, utils.OptionTimeout, "0"})
}
for _, except := range peer.IPBlock.Except {
if strings.HasSuffix(except, "/0") {
ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}, []string{"128.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch})
} else {
ipBlock = append(ipBlock, []string{except, utils.OptionTimeout, "0", utils.OptionNoMatch})
}
}
}
return ipBlock
}
func (npc *NetworkPolicyController) grabNamedPortFromPod(pod *api.Pod, namedPort2eps *namedPort2eps) {
if pod == nil || namedPort2eps == nil {
return
}
for k := range pod.Spec.Containers {
for _, port := range pod.Spec.Containers[k].Ports {
name := port.Name
protocol := string(port.Protocol)
containerPort := strconv.Itoa(int(port.ContainerPort))
if (*namedPort2eps)[name] == nil {
(*namedPort2eps)[name] = make(protocol2eps)
}
if (*namedPort2eps)[name][protocol] == nil {
(*namedPort2eps)[name][protocol] = make(numericPort2eps)
}
if eps, ok := (*namedPort2eps)[name][protocol][containerPort]; !ok {
(*namedPort2eps)[name][protocol][containerPort] = &endPoints{
ips: []string{pod.Status.PodIP},
protocolAndPort: protocolAndPort{port: containerPort, protocol: protocol},
}
} else {
eps.ips = append(eps.ips, pod.Status.PodIP)
}
}
}
}
func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() ([]networkPolicyInfo, error) {
NetworkPolicies := make([]networkPolicyInfo, 0)
for _, policyObj := range npc.npLister.List() {
policy, _ := policyObj.(*apiextensions.NetworkPolicy)
podSelector, _ := v1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
podSelector: podSelector,
}
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, podSelector)
newPolicy.targetPods = make(map[string]podInfo)
newPolicy.ingressRules = make([]ingressRule, 0)
namedPort2IngressEps := make(namedPort2eps)
if err == nil {
for _, matchingPod := range matchingPods {
if matchingPod.Status.PodIP == "" {
continue
}
newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels}
npc.grabNamedPortFromPod(matchingPod, &namedPort2IngressEps)
}
}
for _, specIngressRule := range policy.Spec.Ingress {
ingressRule := ingressRule{}
ingressRule.ports = make([]protocolAndPort, 0)
ingressRule.namedPorts = make([]endPoints, 0)
ingressRule.ports, ingressRule.namedPorts = npc.processBetaNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps)
ingressRule.srcPods = make([]podInfo, 0)
for _, peer := range specIngressRule.From {
podSelector, _ := v1.LabelSelectorAsSelector(peer.PodSelector)
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, podSelector)
if err == nil {
for _, matchingPod := range matchingPods {
if matchingPod.Status.PodIP == "" {
continue
}
ingressRule.srcPods = append(ingressRule.srcPods,
podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels})
}
}
}
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
}
NetworkPolicies = append(NetworkPolicies, newPolicy)
}
return NetworkPolicies, nil
}
func podFirewallChainName(namespace, podName string, version string) string {
hash := sha256.Sum256([]byte(namespace + podName + version))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubePodFirewallChainPrefix + encoded[:16]
}
func networkPolicyChainName(namespace, policyName string, version string) string {
hash := sha256.Sum256([]byte(namespace + policyName + version))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeNetworkPolicyChainPrefix + encoded[:16]
}
func policySourcePodIpSetName(namespace, policyName string) string {
hash := sha256.Sum256([]byte(namespace + policyName))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeSourceIpSetPrefix + encoded[:16]
}
func policyDestinationPodIpSetName(namespace, policyName string) string {
hash := sha256.Sum256([]byte(namespace + policyName))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeDestinationIpSetPrefix + encoded[:16]
}
func policyIndexedSourcePodIpSetName(namespace, policyName string, ingressRuleNo int) string {
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "pod"))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeSourceIpSetPrefix + encoded[:16]
}
func policyIndexedDestinationPodIpSetName(namespace, policyName string, egressRuleNo int) string {
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "pod"))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeDestinationIpSetPrefix + encoded[:16]
}
func policyIndexedSourceIpBlockIpSetName(namespace, policyName string, ingressRuleNo int) string {
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "ipblock"))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeSourceIpSetPrefix + encoded[:16]
}
func policyIndexedDestinationIpBlockIpSetName(namespace, policyName string, egressRuleNo int) string {
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "ipblock"))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeDestinationIpSetPrefix + encoded[:16]
}
func policyIndexedIngressNamedPortIpSetName(namespace, policyName string, ingressRuleNo, namedPortNo int) string {
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + strconv.Itoa(namedPortNo) + "namedport"))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeDestinationIpSetPrefix + encoded[:16]
}
func policyIndexedEgressNamedPortIpSetName(namespace, policyName string, egressRuleNo, namedPortNo int) string {
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + strconv.Itoa(namedPortNo) + "namedport"))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeDestinationIpSetPrefix + encoded[:16]
}
// Cleanup cleanup configurations done
func (npc *NetworkPolicyController) Cleanup() {
glog.Info("Cleaning up iptables configuration permanently done by kube-router")
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Errorf("Failed to initialize iptables executor: %s", err.Error())
}
// delete jump rules in FORWARD chain to pod specific firewall chain
forwardChainRules, err := iptablesCmdHandler.List("filter", "FORWARD")
if err != nil {
glog.Errorf("Failed to delete iptables rules as part of cleanup")
return
}
// TODO: need a better way to delte rule with out using number
var realRuleNo int
for i, rule := range forwardChainRules {
if strings.Contains(rule, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo))
realRuleNo++
}
}
// delete jump rules in OUTPUT chain to pod specific firewall chain
forwardChainRules, err = iptablesCmdHandler.List("filter", "OUTPUT")
if err != nil {
glog.Errorf("Failed to delete iptables rules as part of cleanup")
return
}
// TODO: need a better way to delte rule with out using number
realRuleNo = 0
for i, rule := range forwardChainRules {
if strings.Contains(rule, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.Delete("filter", "OUTPUT", strconv.Itoa(i-realRuleNo))
realRuleNo++
}
}
// flush and delete pod specific firewall chain
chains, err := iptablesCmdHandler.ListChains("filter")
for _, chain := range chains {
if strings.HasPrefix(chain, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return
}
err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return
}
}
}
// flush and delete per network policy specific chain
chains, err = iptablesCmdHandler.ListChains("filter")
for _, chain := range chains {
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return
}
err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return
}
}
}
// delete all ipsets
ipset, err := utils.NewIPSet(false)
if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error())
}
err = ipset.Save()
if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error())
}
err = ipset.DestroyAllWithin()
if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error())
}
glog.Infof("Successfully cleaned the iptables configuration done by kube-router")
}
func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnPodUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPoObj := newObj.(*api.Pod)
oldPoObj := oldObj.(*api.Pod)
if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP {
// for the network policies, we are only interested in pod status phase change or IP change
npc.OnPodUpdate(newObj)
}
},
DeleteFunc: func(obj interface{}) {
npc.handlePodDelete(obj)
},
}
}
func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnNamespaceUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnNamespaceUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
npc.handleNamespaceDelete(obj)
},
}
}
func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnNetworkPolicyUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnNetworkPolicyUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
npc.handleNetworkPolicyDelete(obj)
},
}
}
func (npc *NetworkPolicyController) handlePodDelete(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("unexpected object type: %v", obj)
return
}
if pod, ok = tombstone.Obj.(*api.Pod); !ok {
glog.Errorf("unexpected object type: %v", obj)
return
}
}
glog.V(2).Infof("Received pod: %s/%s delete event", pod.Namespace, pod.Name)
npc.RequestFullSync()
}
func (npc *NetworkPolicyController) handleNamespaceDelete(obj interface{}) {
namespace, ok := obj.(*api.Namespace)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("unexpected object type: %v", obj)
return
}
if namespace, ok = tombstone.Obj.(*api.Namespace); !ok {
glog.Errorf("unexpected object type: %v", obj)
return
}
}
// namespace (and annotations on it) has no significance in GA ver of network policy
if npc.v1NetworkPolicy {
return
}
glog.V(2).Infof("Received namespace: %s delete event", namespace.Name)
npc.RequestFullSync()
}
func (npc *NetworkPolicyController) handleNetworkPolicyDelete(obj interface{}) {
netpol, ok := obj.(*networking.NetworkPolicy)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("unexpected object type: %v", obj)
return
}
if netpol, ok = tombstone.Obj.(*networking.NetworkPolicy); !ok {
glog.Errorf("unexpected object type: %v", obj)
return
}
}
glog.V(2).Infof("Received network policy: %s/%s delete event", netpol.Namespace, netpol.Name)
npc.RequestFullSync()
}
// NewNetworkPolicyController returns new NetworkPolicyController object
func NewNetworkPolicyController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{}
// Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time,
// additional requests would be pointless to queue since after the first one was processed the system would already
// be up to date with all of the policy changes from any enqueued request after that
npc.fullSyncRequestChan = make(chan struct{}, 1)
if config.MetricsEnabled {
//Register the metrics for this controller
prometheus.MustRegister(metrics.ControllerIptablesSyncTime)
prometheus.MustRegister(metrics.ControllerPolicyChainsSyncTime)
npc.MetricsEnabled = true
}
npc.syncPeriod = config.IPTablesSyncPeriod
npc.v1NetworkPolicy = true
v, _ := clientset.Discovery().ServerVersion()
valid := regexp.MustCompile("[0-9]")
v.Minor = strings.Join(valid.FindAllString(v.Minor, -1), "")
minorVer, _ := strconv.Atoi(v.Minor)
if v.Major == "1" && minorVer < 7 {
npc.v1NetworkPolicy = false
}
node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
if err != nil {
return nil, err
}
npc.nodeHostName = node.Name
nodeIP, err := utils.GetNodeIP(node)
if err != nil {
return nil, err
}
npc.nodeIP = nodeIP
ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, err
}
err = ipset.Save()
if err != nil {
return nil, err
}
npc.ipSetHandler = ipset
npc.podLister = podInformer.GetIndexer()
npc.PodEventHandler = npc.newPodEventHandler()
npc.nsLister = nsInformer.GetIndexer()
npc.NamespaceEventHandler = npc.newNamespaceEventHandler()
npc.npLister = npInformer.GetIndexer()
npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler()
return &npc, nil
}