From b4c063ee8ae28ddc34b5b66a2766c0abfa8f58ba Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Fri, 28 Jul 2017 11:12:52 +0530 Subject: [PATCH 1/4] support for network policy GA with this refactoring support for network policy V1 (or GA) is added. Changes are backward compatible so beta network policy semantics are still available for k8s ver 1.6.* and less Fixes #16 --- app/controllers/network_policy_controller.go | 156 ++++++++++++++++--- app/watchers/network_policy_watcher.go | 59 +++---- 2 files changed, 163 insertions(+), 52 deletions(-) diff --git a/app/controllers/network_policy_controller.go b/app/controllers/network_policy_controller.go index d7bc588a..44f5ee11 100644 --- a/app/controllers/network_policy_controller.go +++ b/app/controllers/network_policy_controller.go @@ -21,6 +21,8 @@ import ( "github.com/janeczku/go-ipset/ipset" "k8s.io/client-go/kubernetes" apiv1 "k8s.io/client-go/pkg/api/v1" + apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" + networking "k8s.io/client-go/pkg/apis/networking/v1" ) // Network policy controller provides an ingress firewall for the pods as per the defined network policies. @@ -34,10 +36,11 @@ import ( // dropped by the rule in the pod chain, if there is no match. type NetworkPolicyController struct { - nodeIP net.IP - nodeHostName string - mu sync.Mutex - syncPeriod time.Duration + nodeIP net.IP + nodeHostName string + mu sync.Mutex + syncPeriod time.Duration + v1NetworkPolicy bool // list of all active network policies expressed as networkPolicyInfo networkPoliciesInfo *[]networkPolicyInfo @@ -94,7 +97,7 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro glog.Infof("Performing periodic syn of the iptables to reflect network policies") err := npc.Sync() if err != nil { - glog.Errorf("Error during periodic sync: ", err) + glog.Errorf("Error during periodic sync: " + err.Error()) } } else { continue @@ -122,7 +125,6 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) { } func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) { - glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name) if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { err := npc.Sync() if err != nil { @@ -134,6 +136,12 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w } func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) { + + // namespace (and annotations on it) has no significance in GA ver of network policy + if npc.v1NetworkPolicy { + return + } + glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name) if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { err := npc.Sync() @@ -162,24 +170,33 @@ func (npc *NetworkPolicyController) Sync() error { glog.Infof("sync iptables took %v", time.Since(start)) }() - npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo() - if err != nil { - return errors.New("Aborting sync. Failed to build network policies: %s" + err.Error()) + if npc.v1NetworkPolicy { + npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo() + if err != nil { + return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) + } + + } else { + npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo() + if err != nil { + return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) + } + } activePolicyChains, err := npc.syncNetworkPolicyChains() if err != nil { - return errors.New("Aborting sync. Failed to sync network policy chains: %s" + err.Error()) + return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error()) } activePodFwChains, err := npc.syncPodFirewallChains() if err != nil { - return errors.New("Aborting sync. Failed to sync pod firewalls: %s" + err.Error()) + return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error()) } err = cleanupStaleRules(activePolicyChains, activePodFwChains) if err != nil { - return errors.New("Aborting sync. Failed to cleanup stale iptable rules: %s" + err.Error()) + return errors.New("Aborting sync. Failed to cleanup stale iptable rules: " + err.Error()) } return nil @@ -334,11 +351,11 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er } // loop through the pods running on the node which has default ingress to be denied - podsOnNodeInfo, err := getPodsRunningOnNode(npc.nodeIP.String()) + firewallEnabledPods, err := npc.getFirewallEnabledPods(npc.nodeIP.String()) if err != nil { return nil, err } - for _, pod := range *podsOnNodeInfo { + for _, pod := range *firewallEnabledPods { // below condition occurs when we get trasient update while removing or adding pod // subseqent update will do the correct action @@ -346,7 +363,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er continue } - // ensure pod specfic firewall chain exist for all the pods running on this node + // ensure pod specfic firewall chain exist for all the pods that need ingress firewall podFwChainName := podFirewallChainName(pod.namespace, pod.name) err = iptablesCmdHandler.NewChain("filter", podFwChainName) if err != nil && err.(*iptables.Error).ExitStatus() != 1 { @@ -557,7 +574,7 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er return nil } -func getPodsRunningOnNode(nodeIp string) (*map[string]podInfo, error) { +func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[string]podInfo, error) { nodePods := make(map[string]podInfo) @@ -565,12 +582,39 @@ func getPodsRunningOnNode(nodeIp string) (*map[string]podInfo, error) { if strings.Compare(pod.Status.HostIP, nodeIp) != 0 { continue } - default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace) - if err != nil { - return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error()) - } - if strings.Compare(default_policy, "DefaultDeny") != 0 { - continue + if npc.v1NetworkPolicy { + podNeedsFirewall := false + for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { + policy, _ := policy_obj.(*networking.NetworkPolicy) + if policy.Namespace != pod.ObjectMeta.Namespace { + continue + } + matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, + policy.Spec.PodSelector.MatchLabels) + if err != nil { + return nil, fmt.Errorf("Failed to get the pods %s", err.Error()) + } + for _, matchingPod := range matchingPods { + if matchingPod.ObjectMeta.Name == pod.ObjectMeta.Name { + podNeedsFirewall = true + break + } + } + if podNeedsFirewall { + break + } + } + if !podNeedsFirewall { + continue + } + } else { + default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace) + if err != nil { + return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error()) + } + if strings.Compare(default_policy, "DefaultDeny") != 0 { + continue + } } nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, name: pod.ObjectMeta.Name, @@ -584,7 +628,66 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { NetworkPolicies := make([]networkPolicyInfo, 0) - for _, policy := range watchers.NetworkPolicyWatcher.List() { + for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { + + policy, ok := policy_obj.(*networking.NetworkPolicy) + if !ok { + return nil, fmt.Errorf("Failed to convert") + } + newPolicy := networkPolicyInfo{ + name: policy.Name, + namespace: policy.Namespace, + labels: policy.Spec.PodSelector.MatchLabels, + } + matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) + newPolicy.destPods = make(map[string]podInfo) + newPolicy.ingressRules = make([]ingressRule, 0) + if err == nil { + for _, matchingPod := range matchingPods { + newPolicy.destPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, + name: matchingPod.ObjectMeta.Name, + namespace: matchingPod.ObjectMeta.Namespace, + labels: matchingPod.ObjectMeta.Labels} + } + } + + for _, specIngressRule := range policy.Spec.Ingress { + ingressRule := ingressRule{} + + ingressRule.ports = make([]protocolAndPort, 0) + for _, port := range specIngressRule.Ports { + protocolAndPort := protocolAndPort{protocol: string(*port.Protocol), port: port.Port.String()} + ingressRule.ports = append(ingressRule.ports, protocolAndPort) + } + + ingressRule.srcPods = make([]podInfo, 0) + for _, peer := range specIngressRule.From { + matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels) + if err == nil { + for _, matchingPod := range matchingPods { + ingressRule.srcPods = append(ingressRule.srcPods, + podInfo{ip: matchingPod.Status.PodIP, + name: matchingPod.ObjectMeta.Name, + namespace: matchingPod.ObjectMeta.Namespace, + labels: matchingPod.ObjectMeta.Labels}) + } + } + } + newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule) + } + NetworkPolicies = append(NetworkPolicies, newPolicy) + } + + return &NetworkPolicies, nil +} + +func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { + + NetworkPolicies := make([]networkPolicyInfo, 0) + + for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { + + policy, _ := policy_obj.(*apiextensions.NetworkPolicy) newPolicy := networkPolicyInfo{ name: policy.Name, namespace: policy.Namespace, @@ -781,6 +884,13 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options npc.syncPeriod = config.IPTablesSyncPeriod + npc.v1NetworkPolicy = true + v, _ := clientset.Discovery().ServerVersion() + minorVer, _ := strconv.Atoi(v.Minor) + if v.Major == "1" && minorVer < 7 { + npc.v1NetworkPolicy = false + } + node, err := utils.GetNodeObject(clientset, config.HostnameOverride) if err != nil { return nil, err diff --git a/app/watchers/network_policy_watcher.go b/app/watchers/network_policy_watcher.go index b59885cf..b0c4b1cf 100644 --- a/app/watchers/network_policy_watcher.go +++ b/app/watchers/network_policy_watcher.go @@ -2,6 +2,7 @@ package watchers import ( "reflect" + "strconv" "time" "github.com/cloudnativelabs/kube-router/utils" @@ -9,11 +10,12 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" + networking "k8s.io/client-go/pkg/apis/networking/v1" cache "k8s.io/client-go/tools/cache" ) type NetworkPolicyUpdate struct { - NetworkPolicy *apiextensions.NetworkPolicy + NetworkPolicy interface{} Op Operation } @@ -33,28 +35,16 @@ type NetworkPolicyUpdatesHandler interface { } func (npw *networkPolicyWatcher) networkPolicyAddEventHandler(obj interface{}) { - policy, ok := obj.(*apiextensions.NetworkPolicy) - if !ok { - return - } - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: policy}) + npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: obj}) } func (npw *networkPolicyWatcher) networkPolicyDeleteEventHandler(obj interface{}) { - policy, ok := obj.(*apiextensions.NetworkPolicy) - if !ok { - return - } - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: policy}) + npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: obj}) } func (npw *networkPolicyWatcher) networkPolicyUpdateEventHandler(oldObj, newObj interface{}) { - policy, ok := newObj.(*apiextensions.NetworkPolicy) - if !ok { - return - } if !reflect.DeepEqual(newObj, oldObj) { - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: policy}) + npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: newObj}) } } @@ -64,13 +54,8 @@ func (npw *networkPolicyWatcher) RegisterHandler(handler NetworkPolicyUpdatesHan })) } -func (npw *networkPolicyWatcher) List() []*apiextensions.NetworkPolicy { - obj_list := npw.networkPolicyLister.List() - np_instances := make([]*apiextensions.NetworkPolicy, len(obj_list)) - for i, ins := range obj_list { - np_instances[i] = ins.(*apiextensions.NetworkPolicy) - } - return np_instances +func (npw *networkPolicyWatcher) List() []interface{} { + return npw.networkPolicyLister.List() } func (npw *networkPolicyWatcher) HasSynced() bool { @@ -91,13 +76,29 @@ func StartNetworkPolicyWatcher(clientset *kubernetes.Clientset, resyncPeriod tim } npw.clientset = clientset + + v1NetworkPolicy := true + v, _ := clientset.Discovery().ServerVersion() + minorVer, _ := strconv.Atoi(v.Minor) + if v.Major == "1" && minorVer < 7 { + v1NetworkPolicy = false + } + npw.broadcaster = utils.NewBroadcaster() - lw := cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything()) - npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer( - lw, - &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + var lw *cache.ListWatch + if v1NetworkPolicy { + lw = cache.NewListWatchFromClient(clientset.Networking().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything()) + npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer( + lw, &networking.NetworkPolicy{}, resyncPeriod, eventHandler, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + } else { + lw = cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything()) + npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer( + lw, &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + } networkPolicyStopCh = make(chan struct{}) go npw.networkPolicyController.Run(networkPolicyStopCh) return &npw, nil From 7d2d5b4ebb12308627b7593544088c0d92b98aa2 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Fri, 28 Jul 2017 14:35:45 +0530 Subject: [PATCH 2/4] update readme to mention support for network policy GA --- Documentation/README.md | 3 +++ README.md | 3 +++ 2 files changed, 6 insertions(+) diff --git a/Documentation/README.md b/Documentation/README.md index c6d3ca92..806aec1f 100644 --- a/Documentation/README.md +++ b/Documentation/README.md @@ -29,6 +29,9 @@ Features: Network policy controller is responsible for reading the namespace, network policy and pods information from Kubernetes API server and configure iptables accordingly to provide ingress filter to the pods. +Kube-router supports the networking.k8s.io/NetworkPolicy API or network policy V1/GA +[semantics](https://github.com/kubernetes/kubernetes/pull/39164#issue-197243974) + Please read blog for design details of Network Policy controller https://cloudnativelabs.github.io/post/2017-05-1-kube-network-policies/ diff --git a/README.md b/README.md index 3f4227ad..3471efb6 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,9 @@ is easy with kube-router -- just add a flag to kube-router. It uses ipsets with iptables to ensure your firewall rules have as little performance impact on your cluster as possible. +Kube-router supports the networking.k8s.io/NetworkPolicy API or network policy V1/GA +[semantics](https://github.com/kubernetes/kubernetes/pull/39164#issue-197243974) + Read more about kube-router's approach to Kubernetes Network Policies: - [Enforcing Kubernetes network policies with iptables](https://cloudnativelabs.github.io/post/2017-05-1-kube-network-policies/) From c1a4e65a35c847e6ef4b7d7d85be133f665e611d Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Fri, 28 Jul 2017 14:37:50 +0530 Subject: [PATCH 3/4] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3471efb6..8bc76208 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ iptables to ensure your firewall rules have as little performance impact on your cluster as possible. Kube-router supports the networking.k8s.io/NetworkPolicy API or network policy V1/GA -[semantics](https://github.com/kubernetes/kubernetes/pull/39164#issue-197243974) +[semantics](https://github.com/kubernetes/kubernetes/pull/39164#issue-197243974) and also network policy beta semantics. Read more about kube-router's approach to Kubernetes Network Policies: - [Enforcing Kubernetes network policies with iptables](https://cloudnativelabs.github.io/post/2017-05-1-kube-network-policies/) From c85e02a061c087aa41087924c9a1464ebdc1086f Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Fri, 28 Jul 2017 14:38:41 +0530 Subject: [PATCH 4/4] Update README.md --- Documentation/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Documentation/README.md b/Documentation/README.md index 806aec1f..1879677c 100644 --- a/Documentation/README.md +++ b/Documentation/README.md @@ -30,7 +30,7 @@ Features: Network policy controller is responsible for reading the namespace, network policy and pods information from Kubernetes API server and configure iptables accordingly to provide ingress filter to the pods. Kube-router supports the networking.k8s.io/NetworkPolicy API or network policy V1/GA -[semantics](https://github.com/kubernetes/kubernetes/pull/39164#issue-197243974) +[semantics](https://github.com/kubernetes/kubernetes/pull/39164#issue-197243974) and also network policy beta semantics. Please read blog for design details of Network Policy controller https://cloudnativelabs.github.io/post/2017-05-1-kube-network-policies/