diff --git a/app/controllers/network_policy_controller.go b/app/controllers/network_policy_controller.go index d7bc588a..44f5ee11 100644 --- a/app/controllers/network_policy_controller.go +++ b/app/controllers/network_policy_controller.go @@ -21,6 +21,8 @@ import ( "github.com/janeczku/go-ipset/ipset" "k8s.io/client-go/kubernetes" apiv1 "k8s.io/client-go/pkg/api/v1" + apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" + networking "k8s.io/client-go/pkg/apis/networking/v1" ) // Network policy controller provides an ingress firewall for the pods as per the defined network policies. @@ -34,10 +36,11 @@ import ( // dropped by the rule in the pod chain, if there is no match. type NetworkPolicyController struct { - nodeIP net.IP - nodeHostName string - mu sync.Mutex - syncPeriod time.Duration + nodeIP net.IP + nodeHostName string + mu sync.Mutex + syncPeriod time.Duration + v1NetworkPolicy bool // list of all active network policies expressed as networkPolicyInfo networkPoliciesInfo *[]networkPolicyInfo @@ -94,7 +97,7 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro glog.Infof("Performing periodic syn of the iptables to reflect network policies") err := npc.Sync() if err != nil { - glog.Errorf("Error during periodic sync: ", err) + glog.Errorf("Error during periodic sync: " + err.Error()) } } else { continue @@ -122,7 +125,6 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) { } func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) { - glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name) if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { err := npc.Sync() if err != nil { @@ -134,6 +136,12 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w } func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) { + + // namespace (and annotations on it) has no significance in GA ver of network policy + if npc.v1NetworkPolicy { + return + } + glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name) if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { err := npc.Sync() @@ -162,24 +170,33 @@ func (npc *NetworkPolicyController) Sync() error { glog.Infof("sync iptables took %v", time.Since(start)) }() - npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo() - if err != nil { - return errors.New("Aborting sync. Failed to build network policies: %s" + err.Error()) + if npc.v1NetworkPolicy { + npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo() + if err != nil { + return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) + } + + } else { + npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo() + if err != nil { + return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) + } + } activePolicyChains, err := npc.syncNetworkPolicyChains() if err != nil { - return errors.New("Aborting sync. Failed to sync network policy chains: %s" + err.Error()) + return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error()) } activePodFwChains, err := npc.syncPodFirewallChains() if err != nil { - return errors.New("Aborting sync. Failed to sync pod firewalls: %s" + err.Error()) + return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error()) } err = cleanupStaleRules(activePolicyChains, activePodFwChains) if err != nil { - return errors.New("Aborting sync. Failed to cleanup stale iptable rules: %s" + err.Error()) + return errors.New("Aborting sync. Failed to cleanup stale iptable rules: " + err.Error()) } return nil @@ -334,11 +351,11 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er } // loop through the pods running on the node which has default ingress to be denied - podsOnNodeInfo, err := getPodsRunningOnNode(npc.nodeIP.String()) + firewallEnabledPods, err := npc.getFirewallEnabledPods(npc.nodeIP.String()) if err != nil { return nil, err } - for _, pod := range *podsOnNodeInfo { + for _, pod := range *firewallEnabledPods { // below condition occurs when we get trasient update while removing or adding pod // subseqent update will do the correct action @@ -346,7 +363,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er continue } - // ensure pod specfic firewall chain exist for all the pods running on this node + // ensure pod specfic firewall chain exist for all the pods that need ingress firewall podFwChainName := podFirewallChainName(pod.namespace, pod.name) err = iptablesCmdHandler.NewChain("filter", podFwChainName) if err != nil && err.(*iptables.Error).ExitStatus() != 1 { @@ -557,7 +574,7 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er return nil } -func getPodsRunningOnNode(nodeIp string) (*map[string]podInfo, error) { +func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[string]podInfo, error) { nodePods := make(map[string]podInfo) @@ -565,12 +582,39 @@ func getPodsRunningOnNode(nodeIp string) (*map[string]podInfo, error) { if strings.Compare(pod.Status.HostIP, nodeIp) != 0 { continue } - default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace) - if err != nil { - return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error()) - } - if strings.Compare(default_policy, "DefaultDeny") != 0 { - continue + if npc.v1NetworkPolicy { + podNeedsFirewall := false + for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { + policy, _ := policy_obj.(*networking.NetworkPolicy) + if policy.Namespace != pod.ObjectMeta.Namespace { + continue + } + matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, + policy.Spec.PodSelector.MatchLabels) + if err != nil { + return nil, fmt.Errorf("Failed to get the pods %s", err.Error()) + } + for _, matchingPod := range matchingPods { + if matchingPod.ObjectMeta.Name == pod.ObjectMeta.Name { + podNeedsFirewall = true + break + } + } + if podNeedsFirewall { + break + } + } + if !podNeedsFirewall { + continue + } + } else { + default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace) + if err != nil { + return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error()) + } + if strings.Compare(default_policy, "DefaultDeny") != 0 { + continue + } } nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, name: pod.ObjectMeta.Name, @@ -584,7 +628,66 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { NetworkPolicies := make([]networkPolicyInfo, 0) - for _, policy := range watchers.NetworkPolicyWatcher.List() { + for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { + + policy, ok := policy_obj.(*networking.NetworkPolicy) + if !ok { + return nil, fmt.Errorf("Failed to convert") + } + newPolicy := networkPolicyInfo{ + name: policy.Name, + namespace: policy.Namespace, + labels: policy.Spec.PodSelector.MatchLabels, + } + matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) + newPolicy.destPods = make(map[string]podInfo) + newPolicy.ingressRules = make([]ingressRule, 0) + if err == nil { + for _, matchingPod := range matchingPods { + newPolicy.destPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, + name: matchingPod.ObjectMeta.Name, + namespace: matchingPod.ObjectMeta.Namespace, + labels: matchingPod.ObjectMeta.Labels} + } + } + + for _, specIngressRule := range policy.Spec.Ingress { + ingressRule := ingressRule{} + + ingressRule.ports = make([]protocolAndPort, 0) + for _, port := range specIngressRule.Ports { + protocolAndPort := protocolAndPort{protocol: string(*port.Protocol), port: port.Port.String()} + ingressRule.ports = append(ingressRule.ports, protocolAndPort) + } + + ingressRule.srcPods = make([]podInfo, 0) + for _, peer := range specIngressRule.From { + matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels) + if err == nil { + for _, matchingPod := range matchingPods { + ingressRule.srcPods = append(ingressRule.srcPods, + podInfo{ip: matchingPod.Status.PodIP, + name: matchingPod.ObjectMeta.Name, + namespace: matchingPod.ObjectMeta.Namespace, + labels: matchingPod.ObjectMeta.Labels}) + } + } + } + newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule) + } + NetworkPolicies = append(NetworkPolicies, newPolicy) + } + + return &NetworkPolicies, nil +} + +func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { + + NetworkPolicies := make([]networkPolicyInfo, 0) + + for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { + + policy, _ := policy_obj.(*apiextensions.NetworkPolicy) newPolicy := networkPolicyInfo{ name: policy.Name, namespace: policy.Namespace, @@ -781,6 +884,13 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options npc.syncPeriod = config.IPTablesSyncPeriod + npc.v1NetworkPolicy = true + v, _ := clientset.Discovery().ServerVersion() + minorVer, _ := strconv.Atoi(v.Minor) + if v.Major == "1" && minorVer < 7 { + npc.v1NetworkPolicy = false + } + node, err := utils.GetNodeObject(clientset, config.HostnameOverride) if err != nil { return nil, err diff --git a/app/watchers/network_policy_watcher.go b/app/watchers/network_policy_watcher.go index b59885cf..b0c4b1cf 100644 --- a/app/watchers/network_policy_watcher.go +++ b/app/watchers/network_policy_watcher.go @@ -2,6 +2,7 @@ package watchers import ( "reflect" + "strconv" "time" "github.com/cloudnativelabs/kube-router/utils" @@ -9,11 +10,12 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" + networking "k8s.io/client-go/pkg/apis/networking/v1" cache "k8s.io/client-go/tools/cache" ) type NetworkPolicyUpdate struct { - NetworkPolicy *apiextensions.NetworkPolicy + NetworkPolicy interface{} Op Operation } @@ -33,28 +35,16 @@ type NetworkPolicyUpdatesHandler interface { } func (npw *networkPolicyWatcher) networkPolicyAddEventHandler(obj interface{}) { - policy, ok := obj.(*apiextensions.NetworkPolicy) - if !ok { - return - } - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: policy}) + npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: obj}) } func (npw *networkPolicyWatcher) networkPolicyDeleteEventHandler(obj interface{}) { - policy, ok := obj.(*apiextensions.NetworkPolicy) - if !ok { - return - } - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: policy}) + npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: obj}) } func (npw *networkPolicyWatcher) networkPolicyUpdateEventHandler(oldObj, newObj interface{}) { - policy, ok := newObj.(*apiextensions.NetworkPolicy) - if !ok { - return - } if !reflect.DeepEqual(newObj, oldObj) { - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: policy}) + npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: newObj}) } } @@ -64,13 +54,8 @@ func (npw *networkPolicyWatcher) RegisterHandler(handler NetworkPolicyUpdatesHan })) } -func (npw *networkPolicyWatcher) List() []*apiextensions.NetworkPolicy { - obj_list := npw.networkPolicyLister.List() - np_instances := make([]*apiextensions.NetworkPolicy, len(obj_list)) - for i, ins := range obj_list { - np_instances[i] = ins.(*apiextensions.NetworkPolicy) - } - return np_instances +func (npw *networkPolicyWatcher) List() []interface{} { + return npw.networkPolicyLister.List() } func (npw *networkPolicyWatcher) HasSynced() bool { @@ -91,13 +76,29 @@ func StartNetworkPolicyWatcher(clientset *kubernetes.Clientset, resyncPeriod tim } npw.clientset = clientset + + v1NetworkPolicy := true + v, _ := clientset.Discovery().ServerVersion() + minorVer, _ := strconv.Atoi(v.Minor) + if v.Major == "1" && minorVer < 7 { + v1NetworkPolicy = false + } + npw.broadcaster = utils.NewBroadcaster() - lw := cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything()) - npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer( - lw, - &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + var lw *cache.ListWatch + if v1NetworkPolicy { + lw = cache.NewListWatchFromClient(clientset.Networking().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything()) + npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer( + lw, &networking.NetworkPolicy{}, resyncPeriod, eventHandler, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + } else { + lw = cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything()) + npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer( + lw, &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + } networkPolicyStopCh = make(chan struct{}) go npw.networkPolicyController.Run(networkPolicyStopCh) return &npw, nil