mirror of
https://github.com/cloudnativelabs/kube-router.git
synced 2025-10-05 06:51:05 +02:00
(pod's traffic that is destined to node's local ip). with out this fix even with network policy to drop all egress traffic, pod can reach host IP's. Pod's can access any service hosted in host network as well
1832 lines
68 KiB
Go
1832 lines
68 KiB
Go
package netpol
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"encoding/base32"
|
|
"errors"
|
|
"fmt"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"net"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cloudnativelabs/kube-router/pkg/healthcheck"
|
|
"github.com/cloudnativelabs/kube-router/pkg/metrics"
|
|
"github.com/cloudnativelabs/kube-router/pkg/options"
|
|
"github.com/cloudnativelabs/kube-router/pkg/utils"
|
|
"github.com/coreos/go-iptables/iptables"
|
|
"github.com/golang/glog"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
api "k8s.io/api/core/v1"
|
|
apiextensions "k8s.io/api/extensions/v1beta1"
|
|
networking "k8s.io/api/networking/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/client-go/kubernetes"
|
|
listers "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
const (
|
|
networkPolicyAnnotation = "net.beta.kubernetes.io/network-policy"
|
|
kubePodFirewallChainPrefix = "KUBE-POD-FW-"
|
|
kubeNetworkPolicyChainPrefix = "KUBE-NWPLCY-"
|
|
kubeSourceIpSetPrefix = "KUBE-SRC-"
|
|
kubeDestinationIpSetPrefix = "KUBE-DST-"
|
|
)
|
|
|
|
// Network policy controller provides both ingress and egress filtering for the pods as per the defined network
|
|
// policies. Two different types of iptables chains are used. Each pod running on the node which either
|
|
// requires ingress or egress filtering gets a pod specific chains. Each network policy has a iptables chain, which
|
|
// has rules expressed through ipsets matching source and destination pod ip's. In the FORWARD chain of the
|
|
// filter table a rule is added to jump the traffic originating (in case of egress network policy) from the pod
|
|
// or destined (in case of ingress network policy) to the pod specific iptables chain. Each
|
|
// pod specific iptables chain has rules to jump to the network polices chains, that pod matches. So packet
|
|
// originating/destined from/to pod goes through fitler table's, FORWARD chain, followed by pod specific chain,
|
|
// followed by one or more network policy chains, till there is a match which will accept the packet, or gets
|
|
// dropped by the rule in the pod chain, if there is no match.
|
|
|
|
// NetworkPolicyController strcut to hold information required by NetworkPolicyController
|
|
type NetworkPolicyController struct {
|
|
nodeIP net.IP
|
|
nodeHostName string
|
|
mu sync.Mutex
|
|
syncPeriod time.Duration
|
|
MetricsEnabled bool
|
|
v1NetworkPolicy bool
|
|
readyForUpdates bool
|
|
healthChan chan<- *healthcheck.ControllerHeartbeat
|
|
|
|
// list of all active network policies expressed as networkPolicyInfo
|
|
networkPoliciesInfo *[]networkPolicyInfo
|
|
ipSetHandler *utils.IPSet
|
|
|
|
podLister cache.Indexer
|
|
npLister cache.Indexer
|
|
nsLister cache.Indexer
|
|
|
|
PodEventHandler cache.ResourceEventHandler
|
|
NamespaceEventHandler cache.ResourceEventHandler
|
|
NetworkPolicyEventHandler cache.ResourceEventHandler
|
|
}
|
|
|
|
// internal structure to represent a network policy
|
|
type networkPolicyInfo struct {
|
|
name string
|
|
namespace string
|
|
podSelector labels.Selector
|
|
|
|
// set of pods matching network policy spec podselector label selector
|
|
targetPods map[string]podInfo
|
|
|
|
// whitelist ingress rules from the network policy spec
|
|
ingressRules []ingressRule
|
|
|
|
// whitelist egress rules from the network policy spec
|
|
egressRules []egressRule
|
|
|
|
// policy type "ingress" or "egress" or "both" as defined by PolicyType in the spec
|
|
policyType string
|
|
}
|
|
|
|
// internal structure to represent Pod
|
|
type podInfo struct {
|
|
ip string
|
|
name string
|
|
namespace string
|
|
labels map[string]string
|
|
}
|
|
|
|
// internal stucture to represent NetworkPolicyIngressRule in the spec
|
|
type ingressRule struct {
|
|
matchAllPorts bool
|
|
ports []protocolAndPort
|
|
namedPorts []endPoints
|
|
matchAllSource bool
|
|
srcPods []podInfo
|
|
srcIPBlocks [][]string
|
|
}
|
|
|
|
// internal structure to represent NetworkPolicyEgressRule in the spec
|
|
type egressRule struct {
|
|
matchAllPorts bool
|
|
ports []protocolAndPort
|
|
namedPorts []endPoints
|
|
matchAllDestinations bool
|
|
dstPods []podInfo
|
|
dstIPBlocks [][]string
|
|
}
|
|
|
|
type protocolAndPort struct {
|
|
protocol string
|
|
port string
|
|
}
|
|
|
|
type endPoints struct {
|
|
ips []string
|
|
protocolAndPort
|
|
}
|
|
|
|
type numericPort2eps map[string]*endPoints
|
|
type protocol2eps map[string]numericPort2eps
|
|
type namedPort2eps map[string]protocol2eps
|
|
|
|
// Run runs forver till we receive notification on stopCh
|
|
func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
|
t := time.NewTicker(npc.syncPeriod)
|
|
defer t.Stop()
|
|
defer wg.Done()
|
|
|
|
glog.Info("Starting network policy controller")
|
|
npc.healthChan = healthChan
|
|
|
|
// loop forever till notified to stop on stopCh
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
glog.Info("Shutting down network policies controller")
|
|
return
|
|
default:
|
|
}
|
|
|
|
glog.V(1).Info("Performing periodic sync of iptables to reflect network policies")
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
glog.Errorf("Error during periodic sync of network policies in network policy controller. Error: " + err.Error())
|
|
glog.Errorf("Skipping sending heartbeat from network policy controller as periodic sync failed.")
|
|
} else {
|
|
healthcheck.SendHeartBeat(healthChan, "NPC")
|
|
}
|
|
npc.readyForUpdates = true
|
|
select {
|
|
case <-stopCh:
|
|
glog.Infof("Shutting down network policies controller")
|
|
return
|
|
case <-t.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnPodUpdate handles updates to pods from the Kubernetes api server
|
|
func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
|
|
pod := obj.(*api.Pod)
|
|
glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)
|
|
|
|
if !npc.readyForUpdates {
|
|
glog.V(3).Infof("Skipping update to pod: %s/%s, controller still performing bootup full-sync", pod.Namespace, pod.Name)
|
|
return
|
|
}
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
glog.Errorf("Error syncing network policy for the update to pod: %s/%s Error: %s", pod.Namespace, pod.Name, err)
|
|
}
|
|
}
|
|
|
|
// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server
|
|
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
|
|
netpol := obj.(*networking.NetworkPolicy)
|
|
glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)
|
|
|
|
if !npc.readyForUpdates {
|
|
glog.V(3).Infof("Skipping update to network policy: %s/%s, controller still performing bootup full-sync", netpol.Namespace, netpol.Name)
|
|
return
|
|
}
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
glog.Errorf("Error syncing network policy for the update to network policy: %s/%s Error: %s", netpol.Namespace, netpol.Name, err)
|
|
}
|
|
}
|
|
|
|
// OnNamespaceUpdate handles updates to namespace from kubernetes api server
|
|
func (npc *NetworkPolicyController) OnNamespaceUpdate(obj interface{}) {
|
|
namespace := obj.(*api.Namespace)
|
|
// namespace (and annotations on it) has no significance in GA ver of network policy
|
|
if npc.v1NetworkPolicy {
|
|
return
|
|
}
|
|
glog.V(2).Infof("Received update for namespace: %s", namespace.Name)
|
|
|
|
if !npc.readyForUpdates {
|
|
glog.V(3).Infof("Skipping update to namespace: %s, controller still performing bootup full-sync", namespace.Name)
|
|
return
|
|
}
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
glog.Errorf("Error syncing on namespace update: %s", err)
|
|
}
|
|
}
|
|
|
|
// Sync synchronizes iptables to desired state of network policies
|
|
func (npc *NetworkPolicyController) Sync() error {
|
|
|
|
var err error
|
|
npc.mu.Lock()
|
|
defer npc.mu.Unlock()
|
|
|
|
healthcheck.SendHeartBeat(npc.healthChan, "NPC")
|
|
start := time.Now()
|
|
syncVersion := strconv.FormatInt(start.UnixNano(), 10)
|
|
defer func() {
|
|
endTime := time.Since(start)
|
|
if npc.MetricsEnabled {
|
|
metrics.ControllerIptablesSyncTime.Observe(endTime.Seconds())
|
|
}
|
|
glog.V(1).Infof("sync iptables took %v", endTime)
|
|
}()
|
|
|
|
glog.V(1).Infof("Starting sync of iptables with version: %s", syncVersion)
|
|
if npc.v1NetworkPolicy {
|
|
npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
|
|
}
|
|
} else {
|
|
// TODO remove the Beta support
|
|
npc.networkPoliciesInfo, err = npc.buildBetaNetworkPoliciesInfo()
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
|
|
}
|
|
}
|
|
|
|
activePolicyChains, activePolicyIpSets, err := npc.syncNetworkPolicyChains(syncVersion)
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error())
|
|
}
|
|
|
|
activePodFwChains, err := npc.syncPodFirewallChains(syncVersion)
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error())
|
|
}
|
|
|
|
err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIpSets)
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to cleanup stale iptables rules: " + err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Configure iptables rules representing each network policy. All pod's matched by
|
|
// network policy spec podselector labels are grouped together in one ipset which
|
|
// is used for matching destination ip address. Each ingress rule in the network
|
|
// policyspec is evaluated to set of matching pods, which are grouped in to a
|
|
// ipset used for source ip addr matching.
|
|
func (npc *NetworkPolicyController) syncNetworkPolicyChains(version string) (map[string]bool, map[string]bool, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
endTime := time.Since(start)
|
|
metrics.ControllerPolicyChainsSyncTime.Observe(endTime.Seconds())
|
|
glog.V(2).Infof("Syncing network policy chains took %v", endTime)
|
|
}()
|
|
activePolicyChains := make(map[string]bool)
|
|
activePolicyIpSets := make(map[string]bool)
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
glog.Fatalf("Failed to initialize iptables executor due to: %s", err.Error())
|
|
}
|
|
|
|
// run through all network policies
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
|
|
// ensure there is a unique chain per network policy in filter table
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
err := iptablesCmdHandler.NewChain("filter", policyChainName)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
|
|
activePolicyChains[policyChainName] = true
|
|
|
|
currnetPodIps := make([]string, 0, len(policy.targetPods))
|
|
for ip := range policy.targetPods {
|
|
currnetPodIps = append(currnetPodIps, ip)
|
|
}
|
|
|
|
if policy.policyType == "both" || policy.policyType == "ingress" {
|
|
// create a ipset for all destination pod ip's matched by the policy spec PodSelector
|
|
targetDestPodIpSetName := policyDestinationPodIpSetName(policy.namespace, policy.name)
|
|
targetDestPodIpSet, err := npc.ipSetHandler.Create(targetDestPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
err = targetDestPodIpSet.Refresh(currnetPodIps, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh targetDestPodIpSet,: " + err.Error())
|
|
}
|
|
err = npc.processIngressRules(policy, targetDestPodIpSetName, activePolicyIpSets, version)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
activePolicyIpSets[targetDestPodIpSet.Name] = true
|
|
}
|
|
|
|
if policy.policyType == "both" || policy.policyType == "egress" {
|
|
// create a ipset for all source pod ip's matched by the policy spec PodSelector
|
|
targetSourcePodIpSetName := policySourcePodIpSetName(policy.namespace, policy.name)
|
|
targetSourcePodIpSet, err := npc.ipSetHandler.Create(targetSourcePodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
err = targetSourcePodIpSet.Refresh(currnetPodIps, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh targetSourcePodIpSet: " + err.Error())
|
|
}
|
|
err = npc.processEgressRules(policy, targetSourcePodIpSetName, activePolicyIpSets, version)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
activePolicyIpSets[targetSourcePodIpSet.Name] = true
|
|
}
|
|
|
|
}
|
|
|
|
glog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.")
|
|
|
|
return activePolicyChains, activePolicyIpSets, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo,
|
|
targetDestPodIpSetName string, activePolicyIpSets map[string]bool, version string) error {
|
|
|
|
// From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
|
|
// so no whitelist rules to be added to the network policy
|
|
if policy.ingressRules == nil {
|
|
return nil
|
|
}
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
|
|
}
|
|
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
|
|
// run through all the ingress rules in the spec and create iptables rules
|
|
// in the chain for the network policy
|
|
for i, ingressRule := range policy.ingressRules {
|
|
|
|
if len(ingressRule.srcPods) != 0 {
|
|
srcPodIpSetName := policyIndexedSourcePodIpSetName(policy.namespace, policy.name, i)
|
|
srcPodIpSet, err := npc.ipSetHandler.Create(srcPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIpSets[srcPodIpSet.Name] = true
|
|
|
|
ingressRuleSrcPodIps := make([]string, 0, len(ingressRule.srcPods))
|
|
for _, pod := range ingressRule.srcPods {
|
|
ingressRuleSrcPodIps = append(ingressRuleSrcPodIps, pod.ip)
|
|
}
|
|
err = srcPodIpSet.Refresh(ingressRuleSrcPodIps, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh srcPodIpSet: " + err.Error())
|
|
}
|
|
|
|
if len(ingressRule.ports) != 0 {
|
|
// case where 'ports' details and 'from' details specified in the ingress rule
|
|
// so match on specified source and destination ip's and specified port (if any) and protocol
|
|
for _, portProtocol := range ingressRule.ports {
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(ingressRule.namedPorts) != 0 {
|
|
for j, endPoints := range ingressRule.namedPorts {
|
|
namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j)
|
|
namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
activePolicyIpSets[namedPortIpSet.Name] = true
|
|
err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh namedPortIpSet: " + err.Error())
|
|
}
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(ingressRule.ports) == 0 && len(ingressRule.namedPorts) == 0 {
|
|
// case where no 'ports' details specified in the ingress rule but 'from' details specified
|
|
// so match on specified source and destination ip with all port and protocol
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, targetDestPodIpSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// case where only 'ports' details specified but no 'from' details in the ingress rule
|
|
// so match on all sources, with specified port (if any) and protocol
|
|
if ingressRule.matchAllSource && !ingressRule.matchAllPorts {
|
|
for _, portProtocol := range ingressRule.ports {
|
|
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for j, endPoints := range ingressRule.namedPorts {
|
|
namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j)
|
|
namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIpSets[namedPortIpSet.Name] = true
|
|
|
|
err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh namedPortIpSet: " + err.Error())
|
|
}
|
|
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// case where nether ports nor from details are speified in the ingress rule
|
|
// so match on all ports, protocol, source IP's
|
|
if ingressRule.matchAllSource && ingressRule.matchAllPorts {
|
|
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIpSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(ingressRule.srcIPBlocks) != 0 {
|
|
srcIpBlockIpSetName := policyIndexedSourceIpBlockIpSetName(policy.namespace, policy.name, i)
|
|
srcIpBlockIpSet, err := npc.ipSetHandler.Create(srcIpBlockIpSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
activePolicyIpSets[srcIpBlockIpSet.Name] = true
|
|
err = srcIpBlockIpSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks)
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh srcIpBlockIpSet: " + err.Error())
|
|
}
|
|
if !ingressRule.matchAllPorts {
|
|
for _, portProtocol := range ingressRule.ports {
|
|
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for j, endPoints := range ingressRule.namedPorts {
|
|
namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j)
|
|
namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIpSets[namedPortIpSet.Name] = true
|
|
|
|
err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh namedPortIpSet: " + err.Error())
|
|
}
|
|
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if ingressRule.matchAllPorts {
|
|
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, targetDestPodIpSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
|
|
targetSourcePodIpSetName string, activePolicyIpSets map[string]bool, version string) error {
|
|
|
|
// From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
|
|
// so no whitelist rules to be added to the network policy
|
|
if policy.egressRules == nil {
|
|
return nil
|
|
}
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
|
|
}
|
|
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
|
|
// run through all the egress rules in the spec and create iptables rules
|
|
// in the chain for the network policy
|
|
for i, egressRule := range policy.egressRules {
|
|
|
|
if len(egressRule.dstPods) != 0 {
|
|
dstPodIpSetName := policyIndexedDestinationPodIpSetName(policy.namespace, policy.name, i)
|
|
dstPodIpSet, err := npc.ipSetHandler.Create(dstPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIpSets[dstPodIpSet.Name] = true
|
|
|
|
egressRuleDstPodIps := make([]string, 0, len(egressRule.dstPods))
|
|
for _, pod := range egressRule.dstPods {
|
|
egressRuleDstPodIps = append(egressRuleDstPodIps, pod.ip)
|
|
}
|
|
err = dstPodIpSet.Refresh(egressRuleDstPodIps, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh dstPodIpSet: " + err.Error())
|
|
}
|
|
if len(egressRule.ports) != 0 {
|
|
// case where 'ports' details and 'from' details specified in the egress rule
|
|
// so match on specified source and destination ip's and specified port (if any) and protocol
|
|
for _, portProtocol := range egressRule.ports {
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(egressRule.namedPorts) != 0 {
|
|
for j, endPoints := range egressRule.namedPorts {
|
|
namedPortIpSetName := policyIndexedEgressNamedPortIpSetName(policy.namespace, policy.name, i, j)
|
|
namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIpSets[namedPortIpSet.Name] = true
|
|
|
|
err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh namedPortIpSet: " + err.Error())
|
|
}
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
if len(egressRule.ports) == 0 && len(egressRule.namedPorts) == 0 {
|
|
// case where no 'ports' details specified in the ingress rule but 'from' details specified
|
|
// so match on specified source and destination ip with all port and protocol
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstPodIpSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// case where only 'ports' details specified but no 'to' details in the egress rule
|
|
// so match on all sources, with specified port (if any) and protocol
|
|
if egressRule.matchAllDestinations && !egressRule.matchAllPorts {
|
|
for _, portProtocol := range egressRule.ports {
|
|
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, "", portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// case where nether ports nor from details are speified in the egress rule
|
|
// so match on all ports, protocol, source IP's
|
|
if egressRule.matchAllDestinations && egressRule.matchAllPorts {
|
|
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, "", "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if len(egressRule.dstIPBlocks) != 0 {
|
|
dstIpBlockIpSetName := policyIndexedDestinationIpBlockIpSetName(policy.namespace, policy.name, i)
|
|
dstIpBlockIpSet, err := npc.ipSetHandler.Create(dstIpBlockIpSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
activePolicyIpSets[dstIpBlockIpSet.Name] = true
|
|
err = dstIpBlockIpSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks)
|
|
if err != nil {
|
|
glog.Errorf("failed to refresh dstIpBlockIpSet: " + err.Error())
|
|
}
|
|
if !egressRule.matchAllPorts {
|
|
for _, portProtocol := range egressRule.ports {
|
|
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstIpBlockIpSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if egressRule.matchAllPorts {
|
|
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstIpBlockIpSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIpSetName, dstIpSetName, protocol, dPort string) error {
|
|
if iptablesCmdHandler == nil {
|
|
return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil")
|
|
}
|
|
args := make([]string, 0)
|
|
if comment != "" {
|
|
args = append(args, "-m", "comment", "--comment", comment)
|
|
}
|
|
if srcIpSetName != "" {
|
|
args = append(args, "-m", "set", "--match-set", srcIpSetName, "src")
|
|
}
|
|
if dstIpSetName != "" {
|
|
args = append(args, "-m", "set", "--match-set", dstIpSetName, "dst")
|
|
}
|
|
if protocol != "" {
|
|
args = append(args, "-p", protocol)
|
|
}
|
|
if dPort != "" {
|
|
args = append(args, "--dport", dPort)
|
|
}
|
|
args = append(args, "-j", "ACCEPT")
|
|
err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) syncPodFirewallChains(version string) (map[string]bool, error) {
|
|
|
|
activePodFwChains := make(map[string]bool)
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
glog.Fatalf("Failed to initialize iptables executor: %s", err.Error())
|
|
}
|
|
|
|
// loop through the pods running on the node which to which ingress network policies to be applied
|
|
ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(npc.nodeIP.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, pod := range *ingressNetworkPolicyEnabledPods {
|
|
|
|
// below condition occurs when we get trasient update while removing or adding pod
|
|
// subseqent update will do the correct action
|
|
if len(pod.ip) == 0 || pod.ip == "" {
|
|
continue
|
|
}
|
|
|
|
// ensure pod specific firewall chain exist for all the pods that need ingress firewall
|
|
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
|
|
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
activePodFwChains[podFwChainName] = true
|
|
|
|
// add entries in pod firewall to run through required network policies
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
if _, ok := policy.targetPods[pod.ip]; ok {
|
|
comment := "run through nw policy " + policy.name
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
|
|
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
comment := "rule to permit the traffic traffic to pods when source is the pod's local node"
|
|
args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT"}
|
|
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod
|
|
comment = "rule for stateful firewall for pod"
|
|
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
|
|
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic getting routed (coming for other node pods)
|
|
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
|
|
" to chain " + podFwChainName
|
|
args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName}
|
|
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
|
|
exists, err = iptablesCmdHandler.Exists("filter", "OUTPUT", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", "OUTPUT", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic getting switched (coming for same node pods)
|
|
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
|
|
" to chain " + podFwChainName
|
|
args = []string{"-m", "physdev", "--physdev-is-bridged",
|
|
"-m", "comment", "--comment", comment,
|
|
"-d", pod.ip,
|
|
"-j", podFwChainName}
|
|
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// add rule to log the packets that will be dropped due to network policy enforcement
|
|
comment = "rule to log dropped traffic POD name:" + pod.name + " namespace: " + pod.namespace
|
|
args = []string{"-m", "comment", "--comment", comment, "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10"}
|
|
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
|
|
// add default DROP rule at the end of chain
|
|
comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace
|
|
args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"}
|
|
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// loop through the pods running on the node which egress network policies to be applied
|
|
egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(npc.nodeIP.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, pod := range *egressNetworkPolicyEnabledPods {
|
|
|
|
// below condition occurs when we get trasient update while removing or adding pod
|
|
// subseqent update will do the correct action
|
|
if len(pod.ip) == 0 || pod.ip == "" {
|
|
continue
|
|
}
|
|
|
|
// ensure pod specific firewall chain exist for all the pods that need egress firewall
|
|
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
|
|
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
activePodFwChains[podFwChainName] = true
|
|
|
|
// add entries in pod firewall to run through required network policies
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
if _, ok := policy.targetPods[pod.ip]; ok {
|
|
comment := "run through nw policy " + policy.name
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
|
|
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod
|
|
comment := "rule for stateful firewall for pod"
|
|
args := []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
|
|
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
egressFilterChains := []string{"FORWARD", "OUTPUT", "INPUT"}
|
|
for _, chain := range egressFilterChains {
|
|
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted
|
|
// to pod on a different node)
|
|
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
|
|
" to chain " + podFwChainName
|
|
args = []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName}
|
|
exists, err = iptablesCmdHandler.Exists("filter", chain, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", chain, 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic getting switched (coming for same node pods)
|
|
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
|
|
" to chain " + podFwChainName
|
|
args = []string{"-m", "physdev", "--physdev-is-bridged",
|
|
"-m", "comment", "--comment", comment,
|
|
"-s", pod.ip,
|
|
"-j", podFwChainName}
|
|
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// add rule to log the packets that will be dropped due to network policy enforcement
|
|
comment = "rule to log dropped traffic POD name:" + pod.name + " namespace: " + pod.namespace
|
|
args = []string{"-m", "comment", "--comment", comment, "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10"}
|
|
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
|
|
// add default DROP rule at the end of chain
|
|
comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace
|
|
args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"}
|
|
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
return activePodFwChains, nil
|
|
}
|
|
|
|
func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error {
|
|
|
|
cleanupPodFwChains := make([]string, 0)
|
|
cleanupPolicyChains := make([]string, 0)
|
|
cleanupPolicyIPSets := make([]*utils.Set, 0)
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
glog.Fatalf("failed to initialize iptables command executor due to %s", err.Error())
|
|
}
|
|
ipsets, err := utils.NewIPSet(false)
|
|
if err != nil {
|
|
glog.Fatalf("failed to create ipsets command executor due to %s", err.Error())
|
|
}
|
|
err = ipsets.Save()
|
|
if err != nil {
|
|
glog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error())
|
|
}
|
|
|
|
// get the list of chains created for pod firewall and network policies
|
|
chains, err := iptablesCmdHandler.ListChains("filter")
|
|
for _, chain := range chains {
|
|
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
|
|
if _, ok := activePolicyChains[chain]; !ok {
|
|
cleanupPolicyChains = append(cleanupPolicyChains, chain)
|
|
}
|
|
}
|
|
if strings.HasPrefix(chain, kubePodFirewallChainPrefix) {
|
|
if _, ok := activePodFwChains[chain]; !ok {
|
|
cleanupPodFwChains = append(cleanupPodFwChains, chain)
|
|
}
|
|
}
|
|
}
|
|
for _, set := range ipsets.Sets {
|
|
if strings.HasPrefix(set.Name, kubeSourceIpSetPrefix) ||
|
|
strings.HasPrefix(set.Name, kubeDestinationIpSetPrefix) {
|
|
if _, ok := activePolicyIPSets[set.Name]; !ok {
|
|
cleanupPolicyIPSets = append(cleanupPolicyIPSets, set)
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanup FORWARD chain rules to jump to pod firewall
|
|
for _, chain := range cleanupPodFwChains {
|
|
|
|
forwardChainRules, err := iptablesCmdHandler.List("filter", "FORWARD")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list rules in filter table, FORWARD chain due to %s", err.Error())
|
|
}
|
|
outputChainRules, err := iptablesCmdHandler.List("filter", "OUTPUT")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list rules in filter table, OUTPUT chain due to %s", err.Error())
|
|
}
|
|
|
|
// TODO delete rule by spec, than rule number to avoid extra loop
|
|
var realRuleNo int
|
|
for i, rule := range forwardChainRules {
|
|
if strings.Contains(rule, chain) {
|
|
err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete rule: %s from the FORWARD chain of filter table due to %s", rule, err.Error())
|
|
}
|
|
realRuleNo++
|
|
}
|
|
}
|
|
realRuleNo = 0
|
|
for i, rule := range outputChainRules {
|
|
if strings.Contains(rule, chain) {
|
|
err = iptablesCmdHandler.Delete("filter", "OUTPUT", strconv.Itoa(i-realRuleNo))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete rule: %s from the OUTPUT chain of filter table due to %s", rule, err.Error())
|
|
}
|
|
realRuleNo++
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanup pod firewall chain
|
|
for _, chain := range cleanupPodFwChains {
|
|
glog.V(2).Infof("Found pod fw chain to cleanup: %s", chain)
|
|
err = iptablesCmdHandler.ClearChain("filter", chain)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", chain, err.Error())
|
|
}
|
|
err = iptablesCmdHandler.DeleteChain("filter", chain)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to delete the chain %s due to %s", chain, err.Error())
|
|
}
|
|
glog.V(2).Infof("Deleted pod specific firewall chain: %s from the filter table", chain)
|
|
}
|
|
|
|
// cleanup network policy chains
|
|
for _, policyChain := range cleanupPolicyChains {
|
|
glog.V(2).Infof("Found policy chain to cleanup %s", policyChain)
|
|
|
|
// first clean up any references from pod firewall chain
|
|
for podFwChain := range activePodFwChains {
|
|
podFwChainRules, err := iptablesCmdHandler.List("filter", podFwChain)
|
|
if err != nil {
|
|
|
|
}
|
|
for i, rule := range podFwChainRules {
|
|
if strings.Contains(rule, policyChain) {
|
|
err = iptablesCmdHandler.Delete("filter", podFwChain, strconv.Itoa(i))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to delete rule %s from the chain %s", rule, podFwChain)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
err = iptablesCmdHandler.ClearChain("filter", policyChain)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
|
|
}
|
|
err = iptablesCmdHandler.DeleteChain("filter", policyChain)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
|
|
}
|
|
glog.V(2).Infof("Deleted network policy chain: %s from the filter table", policyChain)
|
|
}
|
|
|
|
// cleanup network policy ipsets
|
|
for _, set := range cleanupPolicyIPSets {
|
|
err = set.Destroy()
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to delete ipset %s due to %s", set.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(nodeIp string) (*map[string]podInfo, error) {
|
|
nodePods := make(map[string]podInfo)
|
|
|
|
for _, obj := range npc.podLister.List() {
|
|
pod := obj.(*api.Pod)
|
|
|
|
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
|
|
continue
|
|
}
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
if policy.namespace != pod.ObjectMeta.Namespace {
|
|
continue
|
|
}
|
|
_, ok := policy.targetPods[pod.Status.PodIP]
|
|
if ok && (policy.policyType == "both" || policy.policyType == "ingress") {
|
|
glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
|
|
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
|
|
name: pod.ObjectMeta.Name,
|
|
namespace: pod.ObjectMeta.Namespace,
|
|
labels: pod.ObjectMeta.Labels}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return &nodePods, nil
|
|
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp string) (*map[string]podInfo, error) {
|
|
|
|
nodePods := make(map[string]podInfo)
|
|
|
|
for _, obj := range npc.podLister.List() {
|
|
pod := obj.(*api.Pod)
|
|
|
|
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
|
|
continue
|
|
}
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
if policy.namespace != pod.ObjectMeta.Namespace {
|
|
continue
|
|
}
|
|
_, ok := policy.targetPods[pod.Status.PodIP]
|
|
if ok && (policy.policyType == "both" || policy.policyType == "egress") {
|
|
glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
|
|
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
|
|
name: pod.ObjectMeta.Name,
|
|
namespace: pod.ObjectMeta.Namespace,
|
|
labels: pod.ObjectMeta.Labels}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return &nodePods, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) {
|
|
numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0)
|
|
for _, npPort := range npPorts {
|
|
if npPort.Port == nil {
|
|
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)})
|
|
} else if npPort.Port.Type == intstr.Int {
|
|
numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)})
|
|
} else {
|
|
if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok {
|
|
if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok {
|
|
for _, eps := range numericPort2eps {
|
|
namedPorts = append(namedPorts, *eps)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) processBetaNetworkPolicyPorts(npPorts []apiextensions.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) {
|
|
numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0)
|
|
for _, npPort := range npPorts {
|
|
if npPort.Port == nil {
|
|
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)})
|
|
} else if npPort.Port.Type == intstr.Int {
|
|
numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)})
|
|
} else {
|
|
if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok {
|
|
if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok {
|
|
for _, eps := range numericPort2eps {
|
|
namedPorts = append(namedPorts, *eps)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
|
|
|
|
NetworkPolicies := make([]networkPolicyInfo, 0)
|
|
|
|
for _, policyObj := range npc.npLister.List() {
|
|
|
|
policy, ok := policyObj.(*networking.NetworkPolicy)
|
|
podSelector, _ := v1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
|
|
if !ok {
|
|
return nil, fmt.Errorf("Failed to convert")
|
|
}
|
|
newPolicy := networkPolicyInfo{
|
|
name: policy.Name,
|
|
namespace: policy.Namespace,
|
|
podSelector: podSelector,
|
|
policyType: "ingress",
|
|
}
|
|
|
|
ingressType, egressType := false, false
|
|
for _, policyType := range policy.Spec.PolicyTypes {
|
|
if policyType == networking.PolicyTypeIngress {
|
|
ingressType = true
|
|
}
|
|
if policyType == networking.PolicyTypeEgress {
|
|
egressType = true
|
|
}
|
|
}
|
|
if ingressType && egressType {
|
|
newPolicy.policyType = "both"
|
|
} else if egressType {
|
|
newPolicy.policyType = "egress"
|
|
} else if ingressType {
|
|
newPolicy.policyType = "ingress"
|
|
}
|
|
|
|
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, podSelector)
|
|
newPolicy.targetPods = make(map[string]podInfo)
|
|
namedPort2IngressEps := make(namedPort2eps)
|
|
if err == nil {
|
|
for _, matchingPod := range matchingPods {
|
|
if matchingPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
|
|
name: matchingPod.ObjectMeta.Name,
|
|
namespace: matchingPod.ObjectMeta.Namespace,
|
|
labels: matchingPod.ObjectMeta.Labels}
|
|
npc.grabNamedPortFromPod(matchingPod, &namedPort2IngressEps)
|
|
}
|
|
}
|
|
|
|
if policy.Spec.Ingress == nil {
|
|
newPolicy.ingressRules = nil
|
|
} else {
|
|
newPolicy.ingressRules = make([]ingressRule, 0)
|
|
}
|
|
|
|
if policy.Spec.Egress == nil {
|
|
newPolicy.egressRules = nil
|
|
} else {
|
|
newPolicy.egressRules = make([]egressRule, 0)
|
|
}
|
|
|
|
for _, specIngressRule := range policy.Spec.Ingress {
|
|
ingressRule := ingressRule{}
|
|
ingressRule.srcPods = make([]podInfo, 0)
|
|
ingressRule.srcIPBlocks = make([][]string, 0)
|
|
|
|
// If this field is empty or missing in the spec, this rule matches all sources
|
|
if len(specIngressRule.From) == 0 {
|
|
ingressRule.matchAllSource = true
|
|
} else {
|
|
ingressRule.matchAllSource = false
|
|
for _, peer := range specIngressRule.From {
|
|
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
|
|
for _, peerPod := range peerPods {
|
|
if peerPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
ingressRule.srcPods = append(ingressRule.srcPods,
|
|
podInfo{ip: peerPod.Status.PodIP,
|
|
name: peerPod.ObjectMeta.Name,
|
|
namespace: peerPod.ObjectMeta.Namespace,
|
|
labels: peerPod.ObjectMeta.Labels})
|
|
}
|
|
}
|
|
ingressRule.srcIPBlocks = append(ingressRule.srcIPBlocks, npc.evalIPBlockPeer(peer)...)
|
|
}
|
|
}
|
|
|
|
ingressRule.ports = make([]protocolAndPort, 0)
|
|
ingressRule.namedPorts = make([]endPoints, 0)
|
|
// If this field is empty or missing in the spec, this rule matches all ports
|
|
if len(specIngressRule.Ports) == 0 {
|
|
ingressRule.matchAllPorts = true
|
|
} else {
|
|
ingressRule.matchAllPorts = false
|
|
ingressRule.ports, ingressRule.namedPorts = npc.processNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps)
|
|
}
|
|
|
|
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
|
|
}
|
|
|
|
for _, specEgressRule := range policy.Spec.Egress {
|
|
egressRule := egressRule{}
|
|
egressRule.dstPods = make([]podInfo, 0)
|
|
egressRule.dstIPBlocks = make([][]string, 0)
|
|
namedPort2EgressEps := make(namedPort2eps)
|
|
|
|
// If this field is empty or missing in the spec, this rule matches all sources
|
|
if len(specEgressRule.To) == 0 {
|
|
egressRule.matchAllDestinations = true
|
|
} else {
|
|
egressRule.matchAllDestinations = false
|
|
for _, peer := range specEgressRule.To {
|
|
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
|
|
for _, peerPod := range peerPods {
|
|
if peerPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
egressRule.dstPods = append(egressRule.dstPods,
|
|
podInfo{ip: peerPod.Status.PodIP,
|
|
name: peerPod.ObjectMeta.Name,
|
|
namespace: peerPod.ObjectMeta.Namespace,
|
|
labels: peerPod.ObjectMeta.Labels})
|
|
npc.grabNamedPortFromPod(peerPod, &namedPort2EgressEps)
|
|
}
|
|
|
|
}
|
|
egressRule.dstIPBlocks = append(egressRule.dstIPBlocks, npc.evalIPBlockPeer(peer)...)
|
|
}
|
|
}
|
|
|
|
egressRule.ports = make([]protocolAndPort, 0)
|
|
egressRule.namedPorts = make([]endPoints, 0)
|
|
// If this field is empty or missing in the spec, this rule matches all ports
|
|
if len(specEgressRule.Ports) == 0 {
|
|
egressRule.matchAllPorts = true
|
|
} else {
|
|
egressRule.matchAllPorts = false
|
|
egressRule.ports, egressRule.namedPorts = npc.processNetworkPolicyPorts(specEgressRule.Ports, namedPort2EgressEps)
|
|
}
|
|
|
|
newPolicy.egressRules = append(newPolicy.egressRules, egressRule)
|
|
}
|
|
NetworkPolicies = append(NetworkPolicies, newPolicy)
|
|
}
|
|
|
|
return &NetworkPolicies, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy, peer networking.NetworkPolicyPeer) ([]*api.Pod, error) {
|
|
|
|
var matchingPods []*api.Pod
|
|
matchingPods = make([]*api.Pod, 0)
|
|
var err error
|
|
// spec can have both PodSelector AND NamespaceSelector
|
|
if peer.NamespaceSelector != nil {
|
|
namespaceSelector, _ := v1.LabelSelectorAsSelector(peer.NamespaceSelector)
|
|
namespaces, err := npc.ListNamespaceByLabels(namespaceSelector)
|
|
if err != nil {
|
|
return nil, errors.New("Failed to build network policies info due to " + err.Error())
|
|
}
|
|
|
|
podSelector := labels.Everything()
|
|
if peer.PodSelector != nil {
|
|
podSelector, _ = v1.LabelSelectorAsSelector(peer.PodSelector)
|
|
}
|
|
for _, namespace := range namespaces {
|
|
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, podSelector)
|
|
if err != nil {
|
|
return nil, errors.New("Failed to build network policies info due to " + err.Error())
|
|
}
|
|
matchingPods = append(matchingPods, namespacePods...)
|
|
}
|
|
} else if peer.PodSelector != nil {
|
|
podSelector, _ := v1.LabelSelectorAsSelector(peer.PodSelector)
|
|
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace, podSelector)
|
|
}
|
|
|
|
return matchingPods, err
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, podSelector labels.Selector) (ret []*api.Pod, err error) {
|
|
podLister := listers.NewPodLister(npc.podLister)
|
|
allMatchedNameSpacePods, err := podLister.Pods(namespace).List(podSelector)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return allMatchedNameSpacePods, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) ListNamespaceByLabels(namespaceSelector labels.Selector) ([]*api.Namespace, error) {
|
|
namespaceLister := listers.NewNamespaceLister(npc.nsLister)
|
|
matchedNamespaces, err := namespaceLister.List(namespaceSelector)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return matchedNamespaces, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) evalIPBlockPeer(peer networking.NetworkPolicyPeer) [][]string {
|
|
ipBlock := make([][]string, 0)
|
|
if peer.PodSelector == nil && peer.NamespaceSelector == nil && peer.IPBlock != nil {
|
|
if cidr := peer.IPBlock.CIDR; strings.HasSuffix(cidr, "/0") {
|
|
ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0"}, []string{"128.0.0.0/1", utils.OptionTimeout, "0"})
|
|
} else {
|
|
ipBlock = append(ipBlock, []string{cidr, utils.OptionTimeout, "0"})
|
|
}
|
|
for _, except := range peer.IPBlock.Except {
|
|
if strings.HasSuffix(except, "/0") {
|
|
ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}, []string{"128.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch})
|
|
} else {
|
|
ipBlock = append(ipBlock, []string{except, utils.OptionTimeout, "0", utils.OptionNoMatch})
|
|
}
|
|
}
|
|
}
|
|
return ipBlock
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) grabNamedPortFromPod(pod *api.Pod, namedPort2eps *namedPort2eps) {
|
|
if pod == nil || namedPort2eps == nil {
|
|
return
|
|
}
|
|
for k := range pod.Spec.Containers {
|
|
for _, port := range pod.Spec.Containers[k].Ports {
|
|
name := port.Name
|
|
protocol := string(port.Protocol)
|
|
containerPort := strconv.Itoa(int(port.ContainerPort))
|
|
|
|
if (*namedPort2eps)[name] == nil {
|
|
(*namedPort2eps)[name] = make(protocol2eps)
|
|
}
|
|
if (*namedPort2eps)[name][protocol] == nil {
|
|
(*namedPort2eps)[name][protocol] = make(numericPort2eps)
|
|
}
|
|
if eps, ok := (*namedPort2eps)[name][protocol][containerPort]; !ok {
|
|
(*namedPort2eps)[name][protocol][containerPort] = &endPoints{
|
|
ips: []string{pod.Status.PodIP},
|
|
protocolAndPort: protocolAndPort{port: containerPort, protocol: protocol},
|
|
}
|
|
} else {
|
|
eps.ips = append(eps.ips, pod.Status.PodIP)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
|
|
|
|
NetworkPolicies := make([]networkPolicyInfo, 0)
|
|
|
|
for _, policyObj := range npc.npLister.List() {
|
|
|
|
policy, _ := policyObj.(*apiextensions.NetworkPolicy)
|
|
podSelector, _ := v1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
|
|
newPolicy := networkPolicyInfo{
|
|
name: policy.Name,
|
|
namespace: policy.Namespace,
|
|
podSelector: podSelector,
|
|
}
|
|
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, podSelector)
|
|
newPolicy.targetPods = make(map[string]podInfo)
|
|
newPolicy.ingressRules = make([]ingressRule, 0)
|
|
namedPort2IngressEps := make(namedPort2eps)
|
|
if err == nil {
|
|
for _, matchingPod := range matchingPods {
|
|
if matchingPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
|
|
name: matchingPod.ObjectMeta.Name,
|
|
namespace: matchingPod.ObjectMeta.Namespace,
|
|
labels: matchingPod.ObjectMeta.Labels}
|
|
npc.grabNamedPortFromPod(matchingPod, &namedPort2IngressEps)
|
|
}
|
|
}
|
|
|
|
for _, specIngressRule := range policy.Spec.Ingress {
|
|
ingressRule := ingressRule{}
|
|
|
|
ingressRule.ports = make([]protocolAndPort, 0)
|
|
ingressRule.namedPorts = make([]endPoints, 0)
|
|
ingressRule.ports, ingressRule.namedPorts = npc.processBetaNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps)
|
|
ingressRule.srcPods = make([]podInfo, 0)
|
|
for _, peer := range specIngressRule.From {
|
|
podSelector, _ := v1.LabelSelectorAsSelector(peer.PodSelector)
|
|
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, podSelector)
|
|
if err == nil {
|
|
for _, matchingPod := range matchingPods {
|
|
if matchingPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
ingressRule.srcPods = append(ingressRule.srcPods,
|
|
podInfo{ip: matchingPod.Status.PodIP,
|
|
name: matchingPod.ObjectMeta.Name,
|
|
namespace: matchingPod.ObjectMeta.Namespace,
|
|
labels: matchingPod.ObjectMeta.Labels})
|
|
}
|
|
}
|
|
}
|
|
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
|
|
}
|
|
NetworkPolicies = append(NetworkPolicies, newPolicy)
|
|
}
|
|
|
|
return &NetworkPolicies, nil
|
|
}
|
|
|
|
func podFirewallChainName(namespace, podName string, version string) string {
|
|
hash := sha256.Sum256([]byte(namespace + podName + version))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubePodFirewallChainPrefix + encoded[:16]
|
|
}
|
|
|
|
func networkPolicyChainName(namespace, policyName string, version string) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + version))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeNetworkPolicyChainPrefix + encoded[:16]
|
|
}
|
|
|
|
func policySourcePodIpSetName(namespace, policyName string) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeSourceIpSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyDestinationPodIpSetName(namespace, policyName string) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIpSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedSourcePodIpSetName(namespace, policyName string, ingressRuleNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "pod"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeSourceIpSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedDestinationPodIpSetName(namespace, policyName string, egressRuleNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "pod"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIpSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedSourceIpBlockIpSetName(namespace, policyName string, ingressRuleNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "ipblock"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeSourceIpSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedDestinationIpBlockIpSetName(namespace, policyName string, egressRuleNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "ipblock"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIpSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedIngressNamedPortIpSetName(namespace, policyName string, ingressRuleNo, namedPortNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + strconv.Itoa(namedPortNo) + "namedport"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIpSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedEgressNamedPortIpSetName(namespace, policyName string, egressRuleNo, namedPortNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + strconv.Itoa(namedPortNo) + "namedport"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIpSetPrefix + encoded[:16]
|
|
}
|
|
|
|
// Cleanup cleanup configurations done
|
|
func (npc *NetworkPolicyController) Cleanup() {
|
|
|
|
glog.Info("Cleaning up iptables configuration permanently done by kube-router")
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
glog.Errorf("Failed to initialize iptables executor: %s", err.Error())
|
|
}
|
|
|
|
// delete jump rules in FORWARD chain to pod specific firewall chain
|
|
forwardChainRules, err := iptablesCmdHandler.List("filter", "FORWARD")
|
|
if err != nil {
|
|
glog.Errorf("Failed to delete iptables rules as part of cleanup")
|
|
return
|
|
}
|
|
|
|
// TODO: need a better way to delte rule with out using number
|
|
var realRuleNo int
|
|
for i, rule := range forwardChainRules {
|
|
if strings.Contains(rule, kubePodFirewallChainPrefix) {
|
|
err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo))
|
|
realRuleNo++
|
|
}
|
|
}
|
|
|
|
// delete jump rules in OUTPUT chain to pod specific firewall chain
|
|
forwardChainRules, err = iptablesCmdHandler.List("filter", "OUTPUT")
|
|
if err != nil {
|
|
glog.Errorf("Failed to delete iptables rules as part of cleanup")
|
|
return
|
|
}
|
|
|
|
// TODO: need a better way to delte rule with out using number
|
|
realRuleNo = 0
|
|
for i, rule := range forwardChainRules {
|
|
if strings.Contains(rule, kubePodFirewallChainPrefix) {
|
|
err = iptablesCmdHandler.Delete("filter", "OUTPUT", strconv.Itoa(i-realRuleNo))
|
|
realRuleNo++
|
|
}
|
|
}
|
|
|
|
// flush and delete pod specific firewall chain
|
|
chains, err := iptablesCmdHandler.ListChains("filter")
|
|
for _, chain := range chains {
|
|
if strings.HasPrefix(chain, kubePodFirewallChainPrefix) {
|
|
err = iptablesCmdHandler.ClearChain("filter", chain)
|
|
if err != nil {
|
|
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
|
|
return
|
|
}
|
|
err = iptablesCmdHandler.DeleteChain("filter", chain)
|
|
if err != nil {
|
|
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// flush and delete per network policy specific chain
|
|
chains, err = iptablesCmdHandler.ListChains("filter")
|
|
for _, chain := range chains {
|
|
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
|
|
err = iptablesCmdHandler.ClearChain("filter", chain)
|
|
if err != nil {
|
|
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
|
|
return
|
|
}
|
|
err = iptablesCmdHandler.DeleteChain("filter", chain)
|
|
if err != nil {
|
|
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// delete all ipsets
|
|
ipset, err := utils.NewIPSet(false)
|
|
if err != nil {
|
|
glog.Errorf("Failed to clean up ipsets: " + err.Error())
|
|
}
|
|
err = ipset.Save()
|
|
if err != nil {
|
|
glog.Errorf("Failed to clean up ipsets: " + err.Error())
|
|
}
|
|
err = ipset.DestroyAllWithin()
|
|
if err != nil {
|
|
glog.Errorf("Failed to clean up ipsets: " + err.Error())
|
|
}
|
|
glog.Infof("Successfully cleaned the iptables configuration done by kube-router")
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {
|
|
return cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
npc.OnPodUpdate(obj)
|
|
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
newPoObj := newObj.(*api.Pod)
|
|
oldPoObj := oldObj.(*api.Pod)
|
|
if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP {
|
|
// for the network policies, we are only interested in pod status phase change or IP change
|
|
npc.OnPodUpdate(newObj)
|
|
}
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
npc.handlePodDelete(obj)
|
|
},
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
|
|
return cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
npc.OnNamespaceUpdate(obj)
|
|
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
npc.OnNamespaceUpdate(newObj)
|
|
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
npc.handleNamespaceDelete(obj)
|
|
|
|
},
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
|
|
return cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
npc.OnNetworkPolicyUpdate(obj)
|
|
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
npc.OnNetworkPolicyUpdate(newObj)
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
npc.handleNetworkPolicyDelete(obj)
|
|
|
|
},
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) handlePodDelete(obj interface{}) {
|
|
pod, ok := obj.(*api.Pod)
|
|
if !ok {
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
if !ok {
|
|
glog.Errorf("unexpected object type: %v", obj)
|
|
return
|
|
}
|
|
if pod, ok = tombstone.Obj.(*api.Pod); !ok {
|
|
glog.Errorf("unexpected object type: %v", obj)
|
|
return
|
|
}
|
|
}
|
|
glog.V(2).Infof("Received pod: %s/%s delete event", pod.Namespace, pod.Name)
|
|
if !npc.readyForUpdates {
|
|
glog.V(3).Infof("Skipping pod: %s/%s delete event, controller still performing bootup full-sync", pod.Namespace, pod.Name)
|
|
return
|
|
}
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
glog.Errorf("Error syncing network policy for pod: %s/%s delete event Error: %s", pod.Namespace, pod.Name, err)
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) handleNamespaceDelete(obj interface{}) {
|
|
namespace, ok := obj.(*api.Namespace)
|
|
if !ok {
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
if !ok {
|
|
glog.Errorf("unexpected object type: %v", obj)
|
|
return
|
|
}
|
|
if namespace, ok = tombstone.Obj.(*api.Namespace); !ok {
|
|
glog.Errorf("unexpected object type: %v", obj)
|
|
return
|
|
}
|
|
}
|
|
// namespace (and annotations on it) has no significance in GA ver of network policy
|
|
if npc.v1NetworkPolicy {
|
|
return
|
|
}
|
|
glog.V(2).Infof("Received namespace: %s delete event", namespace.Name)
|
|
|
|
if !npc.readyForUpdates {
|
|
glog.V(3).Infof("Skipping update to namespace: %s, controller still performing bootup full-sync", namespace.Name)
|
|
return
|
|
}
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
glog.Errorf("Error syncing network policies on namespace: %s delete event", err)
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) handleNetworkPolicyDelete(obj interface{}) {
|
|
netpol, ok := obj.(*networking.NetworkPolicy)
|
|
if !ok {
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
if !ok {
|
|
glog.Errorf("unexpected object type: %v", obj)
|
|
return
|
|
}
|
|
if netpol, ok = tombstone.Obj.(*networking.NetworkPolicy); !ok {
|
|
glog.Errorf("unexpected object type: %v", obj)
|
|
return
|
|
}
|
|
}
|
|
glog.V(2).Infof("Received network policy: %s/%s delete event", netpol.Namespace, netpol.Name)
|
|
|
|
if !npc.readyForUpdates {
|
|
glog.V(3).Infof("Skipping network policy: %s/%s delete event as controller still performing bootup full-sync", netpol.Namespace, netpol.Name)
|
|
return
|
|
}
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
glog.Errorf("Error syncing network policy for the network policy: %s/%s delete event, Error: %s", netpol.Namespace, netpol.Name, err)
|
|
}
|
|
}
|
|
|
|
// NewNetworkPolicyController returns new NetworkPolicyController object
|
|
func NewNetworkPolicyController(clientset kubernetes.Interface,
|
|
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
|
|
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
|
|
npc := NetworkPolicyController{}
|
|
|
|
if config.MetricsEnabled {
|
|
//Register the metrics for this controller
|
|
prometheus.MustRegister(metrics.ControllerIptablesSyncTime)
|
|
prometheus.MustRegister(metrics.ControllerPolicyChainsSyncTime)
|
|
npc.MetricsEnabled = true
|
|
}
|
|
|
|
npc.syncPeriod = config.IPTablesSyncPeriod
|
|
|
|
npc.v1NetworkPolicy = true
|
|
v, _ := clientset.Discovery().ServerVersion()
|
|
valid := regexp.MustCompile("[0-9]")
|
|
v.Minor = strings.Join(valid.FindAllString(v.Minor, -1), "")
|
|
minorVer, _ := strconv.Atoi(v.Minor)
|
|
if v.Major == "1" && minorVer < 7 {
|
|
npc.v1NetworkPolicy = false
|
|
}
|
|
|
|
node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
npc.nodeHostName = node.Name
|
|
|
|
nodeIP, err := utils.GetNodeIP(node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
npc.nodeIP = nodeIP
|
|
|
|
ipset, err := utils.NewIPSet(false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = ipset.Save()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
npc.ipSetHandler = ipset
|
|
|
|
npc.podLister = podInformer.GetIndexer()
|
|
npc.PodEventHandler = npc.newPodEventHandler()
|
|
|
|
npc.nsLister = nsInformer.GetIndexer()
|
|
npc.NamespaceEventHandler = npc.newNamespaceEventHandler()
|
|
|
|
npc.npLister = npInformer.GetIndexer()
|
|
npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler()
|
|
|
|
return &npc, nil
|
|
}
|