diff --git a/app/controllers/metrics_controller.go b/app/controllers/metrics_controller.go index f74f1c2a..1435c179 100644 --- a/app/controllers/metrics_controller.go +++ b/app/controllers/metrics_controller.go @@ -154,7 +154,7 @@ func (mc *MetricsController) Run(healthChan chan<- *ControllerHeartbeat, stopCh } // NewMetricsController returns new MetricController object -func NewMetricsController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*MetricsController, error) { +func NewMetricsController(clientset kubernetes.Interface, config *options.KubeRouterConfig) (*MetricsController, error) { mc := MetricsController{} mc.MetricsPath = config.MetricsPath mc.MetricsPort = config.MetricsPort diff --git a/app/controllers/network_policy_controller.go b/app/controllers/network_policy_controller.go index 035f3b22..f45d1480 100644 --- a/app/controllers/network_policy_controller.go +++ b/app/controllers/network_policy_controller.go @@ -3,7 +3,6 @@ package controllers import ( "crypto/sha256" "encoding/base32" - "encoding/json" "errors" "fmt" "net" @@ -13,15 +12,18 @@ import ( "time" "github.com/cloudnativelabs/kube-router/app/options" - "github.com/cloudnativelabs/kube-router/app/watchers" "github.com/cloudnativelabs/kube-router/utils" "github.com/coreos/go-iptables/iptables" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" + api "k8s.io/api/core/v1" apiextensions "k8s.io/api/extensions/v1beta1" networking "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" ) const ( @@ -51,6 +53,14 @@ type NetworkPolicyController struct { // list of all active network policies expressed as networkPolicyInfo networkPoliciesInfo *[]networkPolicyInfo ipSetHandler *utils.IPSet + + podLister cache.Indexer + npLister cache.Indexer + nsLister cache.Indexer + + PodEventHandler cache.ResourceEventHandler + NamespaceEventHandler cache.ResourceEventHandler + NetworkPolicyEventHandler cache.ResourceEventHandler } // internal structure to represent a network policy @@ -120,16 +130,12 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat, default: } - if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { - glog.V(1).Info("Performing periodic sync of iptables to reflect network policies") - err := npc.Sync() - if err != nil { - glog.Errorf("Error during periodic sync: " + err.Error()) - } else { - sendHeartBeat(healthChan, "NPC") - } + glog.V(1).Info("Performing periodic sync of iptables to reflect network policies") + err := npc.Sync() + if err != nil { + glog.Errorf("Error during periodic sync: " + err.Error()) } else { - continue + sendHeartBeat(healthChan, "NPC") } select { @@ -142,46 +148,37 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat, } // OnPodUpdate handles updates to pods from the Kubernetes api server -func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) { - glog.V(2).Infof("Received pod update namespace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name) - if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { - err := npc.Sync() - if err != nil { - glog.Errorf("Error syncing on pod update: %s", err) - } - } else { - glog.V(2).Infof("Received pod update, but controller not in sync") +func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) { + pod := obj.(*api.Pod) + glog.V(2).Infof("Received pod update namespace:%s pod name:%s", pod.Namespace, pod.Name) + + err := npc.Sync() + if err != nil { + glog.Errorf("Error syncing on pod update: %s", err) } } // OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server -func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) { - if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { - err := npc.Sync() - if err != nil { - glog.Errorf("Error syncing on network policy update: %s", err) - } - } else { - glog.V(2).Info("Received network policy update, but controller not in sync") +func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) { + err := npc.Sync() + if err != nil { + glog.Errorf("Error syncing on network policy update: %s", err) } } // OnNamespaceUpdate handles updates to namespace from kubernetes api server -func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) { - +func (npc *NetworkPolicyController) OnNamespaceUpdate(obj interface{}) { // namespace (and annotations on it) has no significance in GA ver of network policy if npc.v1NetworkPolicy { return } - glog.V(2).Infof("Received namespace update namespace:%s", namespaceUpdate.Namespace.Name) - if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { - err := npc.Sync() - if err != nil { - glog.Errorf("Error syncing on namespace update: %s", err) - } - } else { - glog.V(2).Info("Received namespace update, but controller not in sync") + namespace := obj.(*api.Namespace) + glog.V(2).Infof("Received update for namespace: %s", namespace.Name) + + err := npc.Sync() + if err != nil { + glog.Errorf("Error syncing on namespace update: %s", err) } } @@ -204,13 +201,13 @@ func (npc *NetworkPolicyController) Sync() error { glog.V(1).Info("Starting periodic sync of iptables") if npc.v1NetworkPolicy { - npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo() + npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo() if err != nil { return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) } } else { // TODO remove the Beta support - npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo() + npc.networkPoliciesInfo, err = npc.buildBetaNetworkPoliciesInfo() if err != nil { return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) } @@ -948,7 +945,9 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(nodeIp string) (*map[string]podInfo, error) { nodePods := make(map[string]podInfo) - for _, pod := range watchers.PodWatcher.List() { + for _, obj := range npc.podLister.List() { + pod := obj.(*api.Pod) + if strings.Compare(pod.Status.HostIP, nodeIp) != 0 { continue } @@ -975,7 +974,9 @@ func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp str nodePods := make(map[string]podInfo) - for _, pod := range watchers.PodWatcher.List() { + for _, obj := range npc.podLister.List() { + pod := obj.(*api.Pod) + if strings.Compare(pod.Status.HostIP, nodeIp) != 0 { continue } @@ -997,11 +998,11 @@ func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp str return &nodePods, nil } -func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { +func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { NetworkPolicies := make([]networkPolicyInfo, 0) - for _, policyObj := range watchers.NetworkPolicyWatcher.List() { + for _, policyObj := range npc.npLister.List() { policy, ok := policyObj.(*networking.NetworkPolicy) if !ok { @@ -1042,7 +1043,7 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { } } - matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) + matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) newPolicy.targetPods = make(map[string]podInfo) if err == nil { for _, matchingPod := range matchingPods { @@ -1094,15 +1095,15 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { for _, peer := range specIngressRule.From { // spec must have either of PodSelector or NamespaceSelector if peer.PodSelector != nil { - matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, + matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels) } else if peer.NamespaceSelector != nil { - namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels) + namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels) if err != nil { return nil, errors.New("Failed to build network policies info due to " + err.Error()) } for _, namespace := range namespaces { - namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil) + namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, nil) if err != nil { return nil, errors.New("Failed to build network policies info due to " + err.Error()) } @@ -1155,15 +1156,15 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { for _, peer := range specEgressRule.To { // spec must have either of PodSelector or NamespaceSelector if peer.PodSelector != nil { - matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, + matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels) } else if peer.NamespaceSelector != nil { - namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels) + namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels) if err != nil { return nil, errors.New("Failed to build network policies info due to " + err.Error()) } for _, namespace := range namespaces { - namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil) + namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, nil) if err != nil { return nil, errors.New("Failed to build network policies info due to " + err.Error()) } @@ -1192,11 +1193,29 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { return &NetworkPolicies, nil } -func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { +func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) { + podLister := listers.NewPodLister(npc.podLister) + allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector()) + if err != nil { + return nil, err + } + return allMatchedNameSpacePods, nil +} + +func (npc *NetworkPolicyController) ListNamespaceByLabels(set labels.Set) ([]*api.Namespace, error) { + namespaceLister := listers.NewNamespaceLister(npc.npLister) + matchedNamespaces, err := namespaceLister.List(set.AsSelector()) + if err != nil { + return nil, err + } + return matchedNamespaces, nil +} + +func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { NetworkPolicies := make([]networkPolicyInfo, 0) - for _, policyObj := range watchers.NetworkPolicyWatcher.List() { + for _, policyObj := range npc.npLister.List() { policy, _ := policyObj.(*apiextensions.NetworkPolicy) newPolicy := networkPolicyInfo{ @@ -1204,7 +1223,7 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { namespace: policy.Namespace, labels: policy.Spec.PodSelector.MatchLabels, } - matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) + matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) newPolicy.targetPods = make(map[string]podInfo) newPolicy.ingressRules = make([]ingressRule, 0) if err == nil { @@ -1227,7 +1246,7 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { ingressRule.srcPods = make([]podInfo, 0) for _, peer := range specIngressRule.From { - matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels) + matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels) if err == nil { for _, matchingPod := range matchingPods { ingressRule.srcPods = append(ingressRule.srcPods, @@ -1246,25 +1265,6 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { return &NetworkPolicies, nil } -func getNameSpaceDefaultPolicy(namespace string) (string, error) { - for _, nspw := range watchers.NamespaceWatcher.List() { - if strings.Compare(namespace, nspw.Name) == 0 { - networkPolicy, ok := nspw.ObjectMeta.Annotations[networkPolicyAnnotation] - var annot map[string]map[string]string - if ok { - err := json.Unmarshal([]byte(networkPolicy), &annot) - if err == nil { - return annot["ingress"]["isolation"], nil - } - glog.Errorf("Skipping invalid network-policy for namespace \"%s\": %s", namespace, err) - return "DefaultAllow", errors.New("Invalid NetworkPolicy") - } - return "DefaultAllow", nil - } - } - return "", errors.New("Failed to get the default ingress policy for the namespace: " + namespace) -} - func podFirewallChainName(namespace, podName string) string { hash := sha256.Sum256([]byte(namespace + podName)) encoded := base32.StdEncoding.EncodeToString(hash[:]) @@ -1385,8 +1385,59 @@ func (npc *NetworkPolicyController) Cleanup() { glog.Infof("Successfully cleaned the iptables configuration done by kube-router") } +func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + npc.OnPodUpdate(obj) + + }, + UpdateFunc: func(oldObj, newObj interface{}) { + npc.OnPodUpdate(newObj) + + }, + DeleteFunc: func(obj interface{}) { + npc.OnPodUpdate(obj) + }, + } +} + +func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + npc.OnNamespaceUpdate(obj) + + }, + UpdateFunc: func(oldObj, newObj interface{}) { + npc.OnNamespaceUpdate(newObj) + + }, + DeleteFunc: func(obj interface{}) { + npc.OnNamespaceUpdate(obj) + + }, + } +} + +func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + npc.OnNetworkPolicyUpdate(obj) + + }, + UpdateFunc: func(oldObj, newObj interface{}) { + npc.OnNetworkPolicyUpdate(newObj) + }, + DeleteFunc: func(obj interface{}) { + npc.OnNetworkPolicyUpdate(obj) + + }, + } +} + // NewNetworkPolicyController returns new NetworkPolicyController object -func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkPolicyController, error) { +func NewNetworkPolicyController(clientset kubernetes.Interface, + config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer, + npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) { npc := NetworkPolicyController{} if config.MetricsEnabled { @@ -1427,9 +1478,14 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options } npc.ipSetHandler = ipset - watchers.PodWatcher.RegisterHandler(&npc) - watchers.NetworkPolicyWatcher.RegisterHandler(&npc) - watchers.NamespaceWatcher.RegisterHandler(&npc) + npc.podLister = podInformer.GetIndexer() + npc.PodEventHandler = npc.newPodEventHandler() + + npc.nsLister = nsInformer.GetIndexer() + npc.NamespaceEventHandler = npc.newNamespaceEventHandler() + + npc.npLister = npInformer.GetIndexer() + npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler() return &npc, nil } diff --git a/app/controllers/network_routes_controller.go b/app/controllers/network_routes_controller.go index aefe5e94..ca2f0bd4 100644 --- a/app/controllers/network_routes_controller.go +++ b/app/controllers/network_routes_controller.go @@ -20,7 +20,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/cloudnativelabs/kube-router/app/options" - "github.com/cloudnativelabs/kube-router/app/watchers" "github.com/cloudnativelabs/kube-router/utils" "github.com/coreos/go-iptables/iptables" "github.com/golang/glog" @@ -31,6 +30,7 @@ import ( "github.com/osrg/gobgp/table" "github.com/prometheus/client_golang/prometheus" "github.com/vishvananda/netlink" + v1core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -96,6 +96,12 @@ type NetworkRoutingController struct { cniConfFile string initSrcDstCheckDone bool ec2IamAuthorized bool + + nodeLister cache.Indexer + svcLister cache.Indexer + epLister cache.Indexer + + NodeEventHandler cache.ResourceEventHandler } // Run runs forever until we are notified on stop channel @@ -438,7 +444,9 @@ func (nrc *NetworkRoutingController) getLoadBalancerIps(svc *v1core.Service) []s func (nrc *NetworkRoutingController) getIpsToAdvertise(verifyEndpoints bool) ([]string, []string, error) { ipsToAdvertise := make([]string, 0) ipsToUnAdvertise := make([]string, 0) - for _, svc := range watchers.ServiceWatcher.List() { + for _, obj := range nrc.svcLister.List() { + svc := obj.(*v1core.Service) + ipList := make([]string, 0) var err error nodeHasEndpoints := true @@ -497,7 +505,7 @@ func (nrc *NetworkRoutingController) nodeHasEndpointsForService(svc *v1core.Serv if err != nil { return false, err } - item, exists, err := watchers.EndpointsWatcher.GetByKey(key) + item, exists, err := nrc.epLister.GetByKey(key) if err != nil { return false, err } @@ -1343,17 +1351,11 @@ func rtTablesAdd(tableNumber, tableName string) error { // OnNodeUpdate Handle updates from Node watcher. Node watcher calls this method whenever there is // new node is added or old node is deleted. So peer up with new node and drop peering // from old node -func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdate) { +func (nrc *NetworkRoutingController) OnNodeUpdate(obj interface{}) { if !nrc.bgpServerStarted { return } - node := nodeUpdate.Node - nodeIP, _ := utils.GetNodeIP(node) - if nodeUpdate.Op == watchers.ADD { - glog.V(2).Infof("Received node %s added update from watch API so peer with new node", nodeIP) - } else if nodeUpdate.Op == watchers.REMOVE { - glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP) - } + if nrc.bgpEnableInternal { nrc.syncInternalPeers() } @@ -1559,11 +1561,35 @@ func generateTunnelName(nodeIP string) string { return "tun" + hash } +func (nrc *NetworkRoutingController) newNodeEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1core.Node) + nodeIP, _ := utils.GetNodeIP(node) + + glog.V(2).Infof("Received node %s added update from watch API so peer with new node", nodeIP) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + nrc.OnNodeUpdate(newObj) + + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*v1core.Node) + nodeIP, _ := utils.GetNodeIP(node) + + glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP) + + }, + } +} + // func (nrc *NetworkRoutingController) getExternalNodeIPs( // NewNetworkRoutingController returns new NetworkRoutingController object -func NewNetworkRoutingController(clientset *kubernetes.Clientset, - kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) { +func NewNetworkRoutingController(clientset kubernetes.Interface, + kubeRouterConfig *options.KubeRouterConfig, + nodeInformer cache.SharedIndexInformer, svcInformer cache.SharedIndexInformer, + epInformer cache.SharedIndexInformer) (*NetworkRoutingController, error) { var err error @@ -1676,9 +1702,11 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, "which its configured: " + err.Error()) } - if nrc.bgpEnableInternal { - watchers.NodeWatcher.RegisterHandler(&nrc) - } + nrc.svcLister = svcInformer.GetIndexer() + nrc.epLister = epInformer.GetIndexer() + + nrc.nodeLister = nodeInformer.GetIndexer() + nrc.NodeEventHandler = nrc.newNodeEventHandler() return &nrc, nil } diff --git a/app/controllers/network_routes_controller_test.go b/app/controllers/network_routes_controller_test.go index 3f02f8aa..681720ad 100644 --- a/app/controllers/network_routes_controller_test.go +++ b/app/controllers/network_routes_controller_test.go @@ -8,15 +8,16 @@ import ( "testing" "time" - "github.com/cloudnativelabs/kube-router/app/watchers" "github.com/osrg/gobgp/config" gobgp "github.com/osrg/gobgp/server" "github.com/osrg/gobgp/table" v1core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" ) func Test_advertiseClusterIPs(t *testing.T) { @@ -173,18 +174,15 @@ func Test_advertiseClusterIPs(t *testing.T) { w := testcase.nrc.bgpServer.Watch(gobgp.WatchBestPath(false)) clientset := fake.NewSimpleClientset() - - _, err = watchers.StartServiceWatcher(clientset, 0) - if err != nil { - t.Fatalf("failed to initialize service watcher: %v", err) - } + startInformersForRoutes(testcase.nrc, clientset) err = createServices(clientset, testcase.existingServices) if err != nil { t.Fatalf("failed to create existing services: %v", err) } - waitForListerWithTimeout(time.Second*10, t) + waitForListerWithTimeout(testcase.nrc.svcLister, time.Second*10, t) + // ClusterIPs testcase.nrc.advertiseClusterIp = true testcase.nrc.advertiseExternalIp = false @@ -488,18 +486,15 @@ func Test_advertiseExternalIPs(t *testing.T) { w := testcase.nrc.bgpServer.Watch(gobgp.WatchBestPath(false)) clientset := fake.NewSimpleClientset() - - _, err = watchers.StartServiceWatcher(clientset, 0) - if err != nil { - t.Fatalf("failed to initialize service watcher: %v", err) - } + startInformersForRoutes(testcase.nrc, clientset) err = createServices(clientset, testcase.existingServices) if err != nil { t.Fatalf("failed to create existing services: %v", err) } - waitForListerWithTimeout(time.Second*10, t) + waitForListerWithTimeout(testcase.nrc.svcLister, time.Second*10, t) + // ExternalIPs testcase.nrc.advertiseClusterIp = false testcase.nrc.advertiseExternalIp = true @@ -613,18 +608,9 @@ func Test_nodeHasEndpointsForService(t *testing.T) { for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { clientset := fake.NewSimpleClientset() + startInformersForRoutes(testcase.nrc, clientset) - _, err := watchers.StartServiceWatcher(clientset, 0) - if err != nil { - t.Fatalf("failed to initialize service watcher: %v", err) - } - - _, err = watchers.StartEndpointsWatcher(clientset, 0) - if err != nil { - t.Fatalf("failed to initialize endpoints watcher: %v", err) - } - - _, err = clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint) + _, err := clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint) if err != nil { t.Fatalf("failed to create existing endpoints: %v", err) } @@ -634,7 +620,8 @@ func Test_nodeHasEndpointsForService(t *testing.T) { t.Fatalf("failed to create existing services: %v", err) } - waitForListerWithTimeout(time.Second*10, t) + waitForListerWithTimeout(testcase.nrc.svcLister, time.Second*10, t) + waitForListerWithTimeout(testcase.nrc.epLister, time.Second*10, t) nodeHasEndpoints, err := testcase.nrc.nodeHasEndpointsForService(testcase.existingService) if !reflect.DeepEqual(err, testcase.err) { @@ -1400,15 +1387,7 @@ func Test_addExportPolicies(t *testing.T) { } defer testcase.nrc.bgpServer.Stop() - _, err = watchers.StartServiceWatcher(testcase.nrc.clientset, 0) - if err != nil { - t.Fatalf("failed to initialize service watcher %v", err) - } - - _, err = watchers.StartEndpointsWatcher(testcase.nrc.clientset, 0) - if err != nil { - t.Fatalf("failed to initialize endpoints watcher %v", err) - } + startInformersForRoutes(testcase.nrc, testcase.nrc.clientset) if err = createNodes(testcase.nrc.clientset, testcase.existingNodes); err != nil { t.Errorf("failed to create existing nodes: %v", err) @@ -1419,12 +1398,12 @@ func Test_addExportPolicies(t *testing.T) { } // ClusterIPs and ExternalIPs + waitForListerWithTimeout(testcase.nrc.svcLister, time.Second*10, t) + testcase.nrc.advertiseClusterIp = true testcase.nrc.advertiseExternalIp = true testcase.nrc.advertiseLoadBalancerIp = false - waitForListerWithTimeout(time.Second*10, t) - err = testcase.nrc.addExportPolicies() if !reflect.DeepEqual(err, testcase.err) { t.Logf("expected err %v", testcase.err) @@ -1565,7 +1544,19 @@ func createNodes(clientset kubernetes.Interface, nodes []*v1core.Node) error { return nil } -func waitForListerWithTimeout(timeout time.Duration, t *testing.T) { +func startInformersForRoutes(nrc *NetworkRoutingController, clientset kubernetes.Interface) { + informerFactory := informers.NewSharedInformerFactory(clientset, 0) + svcInformer := informerFactory.Core().V1().Services().Informer() + epInformer := informerFactory.Core().V1().Endpoints().Informer() + + go informerFactory.Start(nil) + informerFactory.WaitForCacheSync(nil) + + nrc.svcLister = svcInformer.GetIndexer() + nrc.epLister = epInformer.GetIndexer() +} + +func waitForListerWithTimeout(lister cache.Indexer, timeout time.Duration, t *testing.T) { tick := time.Tick(100 * time.Millisecond) timeoutCh := time.After(timeout) for { @@ -1573,7 +1564,7 @@ func waitForListerWithTimeout(timeout time.Duration, t *testing.T) { case <-timeoutCh: t.Fatal("timeout exceeded waiting for service lister to fill cache") case <-tick: - if len(watchers.ServiceWatcher.List()) != 0 { + if len(lister.List()) != 0 { return } } diff --git a/app/controllers/network_services_controller.go b/app/controllers/network_services_controller.go index 51127658..94eec967 100644 --- a/app/controllers/network_services_controller.go +++ b/app/controllers/network_services_controller.go @@ -19,7 +19,6 @@ import ( "time" "github.com/cloudnativelabs/kube-router/app/options" - "github.com/cloudnativelabs/kube-router/app/watchers" "github.com/cloudnativelabs/kube-router/utils" "github.com/coreos/go-iptables/iptables" "github.com/docker/docker/client" @@ -29,9 +28,11 @@ import ( "github.com/vishvananda/netlink" "github.com/vishvananda/netns" "golang.org/x/net/context" + api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) const ( @@ -162,10 +163,17 @@ type NetworkServicesController struct { podCidr string masqueradeAll bool globalHairpin bool - client *kubernetes.Clientset + client kubernetes.Interface nodeportBindOnAllIp bool MetricsEnabled bool ln LinuxNetworking + + svcLister cache.Indexer + epLister cache.Indexer + podLister cache.Indexer + + ServiceEventHandler cache.ResourceEventHandler + EndpointsEventHandler cache.ResourceEventHandler } // internal representation of kubernetes service @@ -229,16 +237,12 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *ControllerHeartbeat default: } - if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { - glog.V(1).Info("Performing periodic sync of ipvs services") - err := nsc.sync() - if err != nil { - glog.Errorf("Error during periodic ipvs sync: " + err.Error()) - } else { - sendHeartBeat(healthChan, "NSC") - } + glog.V(1).Info("Performing periodic sync of ipvs services") + err := nsc.sync() + if err != nil { + glog.Errorf("Error during periodic ipvs sync: " + err.Error()) } else { - continue + sendHeartBeat(healthChan, "NSC") } select { @@ -255,8 +259,8 @@ func (nsc *NetworkServicesController) sync() error { nsc.mu.Lock() defer nsc.mu.Unlock() - nsc.serviceMap = buildServicesInfo() - nsc.endpointsMap = buildEndpointsInfo() + nsc.serviceMap = nsc.buildServicesInfo() + nsc.endpointsMap = nsc.buildEndpointsInfo() err = nsc.syncHairpinIptablesRules() if err != nil { glog.Errorf("Error syncing hairpin iptable rules: %s", err.Error()) @@ -343,18 +347,14 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM } // OnEndpointsUpdate handle change in endpoints update from the API server -func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watchers.EndpointsUpdate) { - +func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) { nsc.mu.Lock() defer nsc.mu.Unlock() glog.V(1).Info("Received endpoints update from watch API") - if !(watchers.ServiceWatcher.HasSynced() && watchers.EndpointsWatcher.HasSynced()) { - glog.V(1).Info("Skipping ipvs server sync as local cache is not synced yet") - } // build new endpoints map to reflect the change - newEndpointsMap := buildEndpointsInfo() + newEndpointsMap := nsc.buildEndpointsInfo() if len(newEndpointsMap) != len(nsc.endpointsMap) || !reflect.DeepEqual(newEndpointsMap, nsc.endpointsMap) { nsc.endpointsMap = newEndpointsMap @@ -365,18 +365,14 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watcher } // OnServiceUpdate handle change in service update from the API server -func (nsc *NetworkServicesController) OnServiceUpdate(serviceUpdate *watchers.ServiceUpdate) { - +func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) { nsc.mu.Lock() defer nsc.mu.Unlock() glog.V(1).Info("Received service update from watch API") - if !(watchers.ServiceWatcher.HasSynced() && watchers.EndpointsWatcher.HasSynced()) { - glog.V(1).Info("Skipping ipvs server sync as local cache is not synced yet") - } // build new services map to reflect the change - newServiceMap := buildServicesInfo() + newServiceMap := nsc.buildServicesInfo() if len(newServiceMap) != len(nsc.serviceMap) || !reflect.DeepEqual(newServiceMap, nsc.serviceMap) { nsc.serviceMap = newServiceMap @@ -636,7 +632,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf // For now just support IPVS tunnel mode, we can add other ways of DSR in future if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" { - podObj, err := getPodObjectForEndpoint(endpoint.ip) + podObj, err := nsc.getPodObjectForEndpoint(endpoint.ip) if err != nil { glog.Errorf("Failed to find endpoint with ip: " + endpoint.ip + ". so skipping peparing endpoint for DSR") continue @@ -746,8 +742,9 @@ func isLocalEndpoint(ip, podCidr string) (bool, error) { return false, nil } -func getPodObjectForEndpoint(endpointIP string) (*api.Pod, error) { - for _, pod := range watchers.PodWatcher.List() { +func (nsc *NetworkServicesController) getPodObjectForEndpoint(endpointIP string) (*api.Pod, error) { + for _, obj := range nsc.podLister.List() { + pod := obj.(*api.Pod) if strings.Compare(pod.Status.PodIP, endpointIP) == 0 { return pod, nil } @@ -919,9 +916,10 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP return nil } -func buildServicesInfo() serviceInfoMap { +func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { serviceMap := make(serviceInfoMap) - for _, svc := range watchers.ServiceWatcher.List() { + for _, obj := range nsc.svcLister.List() { + svc := obj.(*api.Service) if svc.Spec.ClusterIP == "None" || svc.Spec.ClusterIP == "" { glog.V(2).Infof("Skipping service name:%s namespace:%s as there is no cluster IP", svc.Name, svc.Namespace) @@ -991,9 +989,11 @@ func shuffle(endPoints []endpointsInfo) []endpointsInfo { return endPoints } -func buildEndpointsInfo() endpointsInfoMap { +func (nsc *NetworkServicesController) buildEndpointsInfo() endpointsInfoMap { endpointsMap := make(endpointsInfoMap) - for _, ep := range watchers.EndpointsWatcher.List() { + for _, obj := range nsc.epLister.List() { + ep := obj.(*api.Endpoints) + for _, epSubset := range ep.Subsets { for _, port := range epSubset.Ports { svcId := generateServiceId(ep.Namespace, ep.Name, port.Name) @@ -1718,8 +1718,41 @@ func (nsc *NetworkServicesController) Cleanup() { glog.Infof("Successfully cleaned the ipvs configuration done by kube-router") } +func (nsc *NetworkServicesController) newEndpointsEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + nsc.OnEndpointsUpdate(obj) + + }, + UpdateFunc: func(oldObj, newObj interface{}) { + nsc.OnEndpointsUpdate(newObj) + }, + DeleteFunc: func(obj interface{}) { + nsc.OnEndpointsUpdate(obj) + + }, + } +} + +func (nsc *NetworkServicesController) newSvcEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + nsc.OnServiceUpdate(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + nsc.OnServiceUpdate(newObj) + }, + DeleteFunc: func(obj interface{}) { + nsc.OnServiceUpdate(obj) + }, + } + +} + // NewNetworkServicesController returns NetworkServicesController object -func NewNetworkServicesController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkServicesController, error) { +func NewNetworkServicesController(clientset kubernetes.Interface, + config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer, + epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer) (*NetworkServicesController, error) { var err error ln, err := newLinuxNetworking() @@ -1781,8 +1814,13 @@ func NewNetworkServicesController(clientset *kubernetes.Clientset, config *optio } nsc.nodeIP = nodeIP - watchers.EndpointsWatcher.RegisterHandler(&nsc) - watchers.ServiceWatcher.RegisterHandler(&nsc) + nsc.podLister = podInformer.GetIndexer() + + nsc.svcLister = svcInformer.GetIndexer() + nsc.ServiceEventHandler = nsc.newSvcEventHandler() + + nsc.epLister = epInformer.GetIndexer() + nsc.EndpointsEventHandler = nsc.newEndpointsEventHandler() rand.Seed(time.Now().UnixNano()) diff --git a/app/controllers/network_services_controller_test.go b/app/controllers/network_services_controller_test.go index 72a1684c..6cdfe471 100644 --- a/app/controllers/network_services_controller_test.go +++ b/app/controllers/network_services_controller_test.go @@ -5,14 +5,16 @@ import ( "net" "time" - "github.com/cloudnativelabs/kube-router/app/watchers" "github.com/docker/libnetwork/ipvs" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vishvananda/netlink" v1core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" ) type LinuxNetworkingMockImpl struct { @@ -87,7 +89,7 @@ func fatalf(format string, a ...interface{}) { // that receives a 2nd *testing argument - mixing testing and ginkgo // is discouraged (latter uses own GinkgoWriter), so need to create // our own here. -func waitForListerWithTimeoutG(timeout time.Duration) { +func waitForListerWithTimeoutG(lister cache.Indexer, timeout time.Duration) { tick := time.Tick(100 * time.Millisecond) timeoutCh := time.After(timeout) for { @@ -95,7 +97,7 @@ func waitForListerWithTimeoutG(timeout time.Duration) { case <-timeoutCh: fatalf("timeout exceeded waiting for service lister to fill cache") case <-tick: - if len(watchers.ServiceWatcher.List()) != 0 { + if len(lister.List()) != 0 { return } } @@ -132,17 +134,7 @@ var _ = Describe("NetworkServicesController", func() { JustBeforeEach(func() { clientset := fake.NewSimpleClientset() - _, err := watchers.StartServiceWatcher(clientset, 0) - if err != nil { - fatalf("failed to initialize service watcher: %v", err) - } - - _, err = watchers.StartEndpointsWatcher(clientset, 0) - if err != nil { - fatalf("failed to initialize endpoints watcher: %v", err) - } - - _, err = clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint) + _, err := clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint) if err != nil { fatalf("failed to create existing endpoints: %v", err) } @@ -152,15 +144,18 @@ var _ = Describe("NetworkServicesController", func() { fatalf("failed to create existing services: %v", err) } - waitForListerWithTimeoutG(time.Second * 10) - nsc = &NetworkServicesController{ nodeIP: net.ParseIP("10.0.0.0"), nodeHostName: "node-1", ln: mockedLinuxNetworking, - serviceMap: buildServicesInfo(), - endpointsMap: buildEndpointsInfo(), } + + startInformersForServiceProxy(nsc, clientset) + waitForListerWithTimeoutG(nsc.svcLister, time.Second*10) + waitForListerWithTimeoutG(nsc.epLister, time.Second*10) + + nsc.serviceMap = nsc.buildServicesInfo() + nsc.endpointsMap = nsc.buildEndpointsInfo() }) Context("service no endpoints with externalIPs", func() { var fooSvc1, fooSvc2 *ipvs.Service @@ -507,3 +502,17 @@ var _ = Describe("NetworkServicesController", func() { }) }) }) + +func startInformersForServiceProxy(nsc *NetworkServicesController, clientset kubernetes.Interface) { + informerFactory := informers.NewSharedInformerFactory(clientset, 0) + svcInformer := informerFactory.Core().V1().Services().Informer() + epInformer := informerFactory.Core().V1().Endpoints().Informer() + podInformer := informerFactory.Core().V1().Pods().Informer() + + go informerFactory.Start(nil) + informerFactory.WaitForCacheSync(nil) + + nsc.svcLister = svcInformer.GetIndexer() + nsc.epLister = epInformer.GetIndexer() + nsc.podLister = podInformer.GetIndexer() +} diff --git a/app/server.go b/app/server.go index d72c7f0a..50797691 100644 --- a/app/server.go +++ b/app/server.go @@ -11,8 +11,9 @@ import ( "github.com/cloudnativelabs/kube-router/app/controllers" "github.com/cloudnativelabs/kube-router/app/options" - "github.com/cloudnativelabs/kube-router/app/watchers" "github.com/golang/glog" + + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -24,7 +25,7 @@ var buildDate string // KubeRouter holds the information needed to run server type KubeRouter struct { - Client *kubernetes.Clientset + Client kubernetes.Interface Config *options.KubeRouterConfig } @@ -67,53 +68,6 @@ func CleanupConfigAndExit() { nrc.Cleanup() } -// start API watchers to get notification on changes -func (kr *KubeRouter) startApiWatchers() error { - - var err error - - _, err = watchers.StartPodWatcher(kr.Client, kr.Config.ConfigSyncPeriod) - if err != nil { - return errors.New("Failed to launch pod api watcher: " + err.Error()) - } - - _, err = watchers.StartEndpointsWatcher(kr.Client, kr.Config.ConfigSyncPeriod) - if err != nil { - return errors.New("Failed to launch endpoint api watcher: " + err.Error()) - } - - _, err = watchers.StartNetworkPolicyWatcher(kr.Client, kr.Config.ConfigSyncPeriod) - if err != nil { - return errors.New("Failed to launch network policy api watcher: " + err.Error()) - } - - _, err = watchers.StartNamespaceWatcher(kr.Client, kr.Config.ConfigSyncPeriod) - if err != nil { - return errors.New("Failed to launch namespace api watcher: " + err.Error()) - } - - _, err = watchers.StartServiceWatcher(kr.Client, kr.Config.ConfigSyncPeriod) - if err != nil { - return errors.New("Failed to launch service api watcher: " + err.Error()) - } - - _, err = watchers.StartNodeWatcher(kr.Client, kr.Config.ConfigSyncPeriod) - if err != nil { - return errors.New("Failed to launch nodes api watcher: " + err.Error()) - } - - return nil -} - -func (kr *KubeRouter) stopApiWatchers() { - watchers.StopPodWatcher() - watchers.StopEndpointsWatcher() - watchers.StopNetworkPolicyWatcher() - watchers.StopNamespaceWatcher() - watchers.StopServiceWatcher() - watchers.StopNodeWatcher() -} - // Run starts the controllers and waits forever till we get SIGINT or SIGTERM func (kr *KubeRouter) Run() error { var err error @@ -124,11 +78,6 @@ func (kr *KubeRouter) Run() error { stopCh := make(chan struct{}) - err = kr.startApiWatchers() - if err != nil { - return errors.New("Failed to start API watchers: " + err.Error()) - } - if !(kr.Config.RunFirewall || kr.Config.RunServiceProxy || kr.Config.RunRouter) { glog.Info("Router, Firewall or Service proxy functionality must be specified. Exiting!") os.Exit(0) @@ -157,32 +106,55 @@ func (kr *KubeRouter) Run() error { kr.Config.MetricsEnabled = false } + informerFactory := informers.NewSharedInformerFactory(kr.Client, kr.Config.ConfigSyncPeriod) + + svcInformer := informerFactory.Core().V1().Services().Informer() + epInformer := informerFactory.Core().V1().Endpoints().Informer() + podInformer := informerFactory.Core().V1().Pods().Informer() + nodeInformer := informerFactory.Core().V1().Nodes().Informer() + nsInformer := informerFactory.Core().V1().Namespaces().Informer() + npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer() + + go informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + if kr.Config.RunFirewall { - npc, err := controllers.NewNetworkPolicyController(kr.Client, kr.Config) + npc, err := controllers.NewNetworkPolicyController(kr.Client, + kr.Config, podInformer, npInformer, nsInformer) if err != nil { return errors.New("Failed to create network policy controller: " + err.Error()) } + podInformer.AddEventHandler(npc.PodEventHandler) + nsInformer.AddEventHandler(npc.NamespaceEventHandler) + npInformer.AddEventHandler(npc.NetworkPolicyEventHandler) + wg.Add(1) go npc.Run(healthChan, stopCh, &wg) } if kr.Config.RunRouter { - nrc, err := controllers.NewNetworkRoutingController(kr.Client, kr.Config) + nrc, err := controllers.NewNetworkRoutingController(kr.Client, kr.Config, nodeInformer, svcInformer, epInformer) if err != nil { return errors.New("Failed to create network routing controller: " + err.Error()) } + nodeInformer.AddEventHandler(nrc.NodeEventHandler) + wg.Add(1) go nrc.Run(healthChan, stopCh, &wg) } if kr.Config.RunServiceProxy { - nsc, err := controllers.NewNetworkServicesController(kr.Client, kr.Config) + nsc, err := controllers.NewNetworkServicesController(kr.Client, kr.Config, + svcInformer, epInformer, podInformer) if err != nil { return errors.New("Failed to create network services controller: " + err.Error()) } + svcInformer.AddEventHandler(nsc.ServiceEventHandler) + epInformer.AddEventHandler(nsc.EndpointsEventHandler) + wg.Add(1) go nsc.Run(healthChan, stopCh, &wg) } @@ -195,8 +167,6 @@ func (kr *KubeRouter) Run() error { glog.Infof("Shutting down the controllers") close(stopCh) - kr.stopApiWatchers() - wg.Wait() return nil } diff --git a/app/watchers/endpoints_watcher.go b/app/watchers/endpoints_watcher.go deleted file mode 100644 index 1f0ff2a1..00000000 --- a/app/watchers/endpoints_watcher.go +++ /dev/null @@ -1,131 +0,0 @@ -package watchers - -import ( - "reflect" - "time" - - "github.com/cloudnativelabs/kube-router/utils" - api "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -type Operation int - -const ( - ADD Operation = iota - UPDATE - REMOVE - SYNCED -) - -type EndpointsUpdate struct { - Endpoints *api.Endpoints - Op Operation -} - -var ( - EndpointsWatcher *endpointsWatcher -) - -type endpointsWatcher struct { - clientset kubernetes.Interface - endpointsController cache.Controller - endpointsLister cache.Indexer - broadcaster *utils.Broadcaster -} - -type EndpointsUpdatesHandler interface { - OnEndpointsUpdate(endpointsUpdate *EndpointsUpdate) -} - -func (ew *endpointsWatcher) endpointsAddEventHandler(obj interface{}) { - endpoints, ok := obj.(*api.Endpoints) - if !ok { - return - } - ew.broadcaster.Notify(&EndpointsUpdate{Op: ADD, Endpoints: endpoints}) -} - -func (ew *endpointsWatcher) endpointsDeleteEventHandler(obj interface{}) { - endpoints, ok := obj.(*api.Endpoints) - if !ok { - return - } - ew.broadcaster.Notify(&EndpointsUpdate{Op: REMOVE, Endpoints: endpoints}) -} - -func (ew *endpointsWatcher) endpointsUpdateEventHandler(oldObj, newObj interface{}) { - endpoints, ok := newObj.(*api.Endpoints) - if !ok { - return - } - if !reflect.DeepEqual(newObj, oldObj) { - if endpoints.Name != "kube-scheduler" && endpoints.Name != "kube-controller-manager" { - ew.broadcaster.Notify(&EndpointsUpdate{Op: UPDATE, Endpoints: endpoints}) - } - } -} - -func (ew *endpointsWatcher) RegisterHandler(handler EndpointsUpdatesHandler) { - ew.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) { - handler.OnEndpointsUpdate(instance.(*EndpointsUpdate)) - })) -} - -func (ew *endpointsWatcher) List() []*api.Endpoints { - objList := ew.endpointsLister.List() - epInstances := make([]*api.Endpoints, len(objList)) - for i, ins := range objList { - epInstances[i] = ins.(*api.Endpoints) - } - return epInstances -} - -func (ew *endpointsWatcher) GetByKey(key string) (item interface{}, exists bool, err error) { - return ew.endpointsLister.GetByKey(key) -} - -func (ew *endpointsWatcher) HasSynced() bool { - return ew.endpointsController.HasSynced() -} - -var endpointsStopCh chan struct{} - -func StartEndpointsWatcher(clientset kubernetes.Interface, resyncPeriod time.Duration) (*endpointsWatcher, error) { - - ew := endpointsWatcher{} - EndpointsWatcher = &ew - - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: ew.endpointsAddEventHandler, - DeleteFunc: ew.endpointsDeleteEventHandler, - UpdateFunc: ew.endpointsUpdateEventHandler, - } - - ew.clientset = clientset - ew.broadcaster = utils.NewBroadcaster() - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.CoreV1().Endpoints(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.CoreV1().Endpoints(metav1.NamespaceAll).Watch(options) - }, - } - ew.endpointsLister, ew.endpointsController = cache.NewIndexerInformer( - lw, - &api.Endpoints{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - endpointsStopCh = make(chan struct{}) - go ew.endpointsController.Run(endpointsStopCh) - return &ew, nil -} - -func StopEndpointsWatcher() { - endpointsStopCh <- struct{}{} -} diff --git a/app/watchers/namespace_watcher.go b/app/watchers/namespace_watcher.go deleted file mode 100644 index 91e30805..00000000 --- a/app/watchers/namespace_watcher.go +++ /dev/null @@ -1,113 +0,0 @@ -package watchers - -import ( - "reflect" - "time" - - "github.com/cloudnativelabs/kube-router/utils" - api "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/kubernetes" - listers "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" -) - -type NamespaceUpdate struct { - Namespace *api.Namespace - Op Operation -} - -var ( - NamespaceWatcher *namespaceWatcher - namespaceStopCh chan struct{} -) - -type namespaceWatcher struct { - clientset *kubernetes.Clientset - namespaceController cache.Controller - namespaceLister cache.Indexer - broadcaster *utils.Broadcaster -} - -type NamespaceUpdatesHandler interface { - OnNamespaceUpdate(namespaceUpdate *NamespaceUpdate) -} - -func (nsw *namespaceWatcher) namespaceAddEventHandler(obj interface{}) { - namespace, ok := obj.(*api.Namespace) - if !ok { - return - } - nsw.broadcaster.Notify(&NamespaceUpdate{Op: ADD, Namespace: namespace}) -} - -func (nsw *namespaceWatcher) namespaceDeleteEventHandler(obj interface{}) { - namespace, ok := obj.(*api.Namespace) - if !ok { - return - } - nsw.broadcaster.Notify(&NamespaceUpdate{Op: REMOVE, Namespace: namespace}) -} - -func (nsw *namespaceWatcher) namespaceUpdateEventHandler(oldObj, newObj interface{}) { - namespace, ok := newObj.(*api.Namespace) - if !ok { - return - } - if !reflect.DeepEqual(newObj, oldObj) { - nsw.broadcaster.Notify(&NamespaceUpdate{Op: UPDATE, Namespace: namespace}) - } -} - -func (nsw *namespaceWatcher) List() []*api.Namespace { - objList := nsw.namespaceLister.List() - namespaceInstances := make([]*api.Namespace, len(objList)) - for i, ins := range objList { - namespaceInstances[i] = ins.(*api.Namespace) - } - return namespaceInstances -} - -func (nsw *namespaceWatcher) ListByLabels(set labels.Set) ([]*api.Namespace, error) { - namespaceLister := listers.NewNamespaceLister(nsw.namespaceLister) - matchedNamespaces, err := namespaceLister.List(set.AsSelector()) - if err != nil { - return nil, err - } - return matchedNamespaces, nil -} - -func (nsw *namespaceWatcher) RegisterHandler(handler NamespaceUpdatesHandler) { - nsw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) { - handler.OnNamespaceUpdate(instance.(*NamespaceUpdate)) - })) -} - -func StartNamespaceWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*namespaceWatcher, error) { - - nsw := namespaceWatcher{} - NamespaceWatcher = &nsw - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: nsw.namespaceAddEventHandler, - DeleteFunc: nsw.namespaceDeleteEventHandler, - UpdateFunc: nsw.namespaceUpdateEventHandler, - } - - nsw.clientset = clientset - nsw.broadcaster = utils.NewBroadcaster() - lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "namespaces", metav1.NamespaceAll, fields.Everything()) - nsw.namespaceLister, nsw.namespaceController = cache.NewIndexerInformer( - lw, - &api.Namespace{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - namespaceStopCh = make(chan struct{}) - go nsw.namespaceController.Run(namespaceStopCh) - return &nsw, nil -} - -func StopNamespaceWatcher() { - namespaceStopCh <- struct{}{} -} diff --git a/app/watchers/network_policy_watcher.go b/app/watchers/network_policy_watcher.go deleted file mode 100644 index 456d1346..00000000 --- a/app/watchers/network_policy_watcher.go +++ /dev/null @@ -1,114 +0,0 @@ -package watchers - -import ( - "errors" - "reflect" - "strconv" - "time" - - "github.com/cloudnativelabs/kube-router/utils" - apiextensions "k8s.io/api/extensions/v1beta1" - networking "k8s.io/api/networking/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -type NetworkPolicyUpdate struct { - NetworkPolicy interface{} - Op Operation -} - -var ( - NetworkPolicyWatcher *networkPolicyWatcher -) - -type networkPolicyWatcher struct { - clientset *kubernetes.Clientset - networkPolicyController cache.Controller - networkPolicyLister cache.Indexer - broadcaster *utils.Broadcaster -} - -type NetworkPolicyUpdatesHandler interface { - OnNetworkPolicyUpdate(networkPolicyUpdate *NetworkPolicyUpdate) -} - -func (npw *networkPolicyWatcher) networkPolicyAddEventHandler(obj interface{}) { - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: obj}) -} - -func (npw *networkPolicyWatcher) networkPolicyDeleteEventHandler(obj interface{}) { - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: obj}) -} - -func (npw *networkPolicyWatcher) networkPolicyUpdateEventHandler(oldObj, newObj interface{}) { - if !reflect.DeepEqual(newObj, oldObj) { - npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: newObj}) - } -} - -func (npw *networkPolicyWatcher) RegisterHandler(handler NetworkPolicyUpdatesHandler) { - npw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) { - handler.OnNetworkPolicyUpdate(instance.(*NetworkPolicyUpdate)) - })) -} - -func (npw *networkPolicyWatcher) List() []interface{} { - return npw.networkPolicyLister.List() -} - -func (npw *networkPolicyWatcher) HasSynced() bool { - return npw.networkPolicyController.HasSynced() -} - -var networkPolicyStopCh chan struct{} - -func StartNetworkPolicyWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*networkPolicyWatcher, error) { - - npw := networkPolicyWatcher{} - NetworkPolicyWatcher = &npw - - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: npw.networkPolicyAddEventHandler, - DeleteFunc: npw.networkPolicyDeleteEventHandler, - UpdateFunc: npw.networkPolicyUpdateEventHandler, - } - - npw.clientset = clientset - - v1NetworkPolicy := true - v, err := clientset.Discovery().ServerVersion() - if err != nil { - return nil, errors.New("Failed to get API server version due to " + err.Error()) - } - - minorVer, _ := strconv.Atoi(v.Minor) - if v.Major == "1" && minorVer < 7 { - v1NetworkPolicy = false - } - - npw.broadcaster = utils.NewBroadcaster() - var lw *cache.ListWatch - if v1NetworkPolicy { - lw = cache.NewListWatchFromClient(clientset.NetworkingV1().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything()) - npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer( - lw, &networking.NetworkPolicy{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - } else { - lw = cache.NewListWatchFromClient(clientset.ExtensionsV1beta1().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything()) - npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer( - lw, &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - } - networkPolicyStopCh = make(chan struct{}) - go npw.networkPolicyController.Run(networkPolicyStopCh) - return &npw, nil -} - -func StopNetworkPolicyWatcher() { - networkPolicyStopCh <- struct{}{} -} diff --git a/app/watchers/node_watcher.go b/app/watchers/node_watcher.go deleted file mode 100644 index 7dc3ae80..00000000 --- a/app/watchers/node_watcher.go +++ /dev/null @@ -1,101 +0,0 @@ -package watchers - -import ( - "time" - - "github.com/cloudnativelabs/kube-router/utils" - api "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -type NodeUpdate struct { - Node *api.Node - Op Operation -} - -var ( - NodeWatcher *nodeWatcher -) - -type nodeWatcher struct { - clientset *kubernetes.Clientset - nodeController cache.Controller - nodeLister cache.Indexer - broadcaster *utils.Broadcaster -} - -type NodeUpdatesHandler interface { - OnNodeUpdate(nodeUpdate *NodeUpdate) -} - -func (nw *nodeWatcher) nodeAddEventHandler(obj interface{}) { - node, ok := obj.(*api.Node) - if !ok { - return - } - nw.broadcaster.Notify(&NodeUpdate{Op: ADD, Node: node}) -} - -func (nw *nodeWatcher) nodeDeleteEventHandler(obj interface{}) { - node, ok := obj.(*api.Node) - if !ok { - return - } - nw.broadcaster.Notify(&NodeUpdate{Op: REMOVE, Node: node}) -} - -func (nw *nodeWatcher) nodeUpdateEventHandler(oldObj, newObj interface{}) { - // we are interested only node add/delete, so skip update - return -} - -func (nw *nodeWatcher) RegisterHandler(handler NodeUpdatesHandler) { - nw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) { - handler.OnNodeUpdate(instance.(*NodeUpdate)) - })) -} - -func (nw *nodeWatcher) List() []*api.Node { - objList := nw.nodeLister.List() - nodeInstances := make([]*api.Node, len(objList)) - for i, ins := range objList { - nodeInstances[i] = ins.(*api.Node) - } - return nodeInstances -} - -func (nw *nodeWatcher) HasSynced() bool { - return nw.nodeController.HasSynced() -} - -var nodewatchStopCh chan struct{} - -func StartNodeWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*nodeWatcher, error) { - - nw := nodeWatcher{} - NodeWatcher = &nw - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: nw.nodeAddEventHandler, - DeleteFunc: nw.nodeDeleteEventHandler, - UpdateFunc: nw.nodeUpdateEventHandler, - } - - nw.clientset = clientset - nw.broadcaster = utils.NewBroadcaster() - lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything()) - nw.nodeLister, nw.nodeController = cache.NewIndexerInformer( - lw, - &api.Node{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - nodewatchStopCh = make(chan struct{}) - go nw.nodeController.Run(nodewatchStopCh) - return &nw, nil -} - -func StopNodeWatcher() { - nodewatchStopCh <- struct{}{} -} diff --git a/app/watchers/pods_watcher.go b/app/watchers/pods_watcher.go deleted file mode 100644 index 1c912b06..00000000 --- a/app/watchers/pods_watcher.go +++ /dev/null @@ -1,118 +0,0 @@ -package watchers - -import ( - "reflect" - "time" - - "github.com/cloudnativelabs/kube-router/utils" - api "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/kubernetes" - listers "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" -) - -type PodUpdate struct { - Pod *api.Pod - Op Operation -} - -var ( - PodWatcher *podWatcher -) - -type podWatcher struct { - clientset *kubernetes.Clientset - podController cache.Controller - podLister cache.Indexer - broadcaster *utils.Broadcaster -} - -type PodUpdatesHandler interface { - OnPodUpdate(podUpdate *PodUpdate) -} - -func (pw *podWatcher) podAddEventHandler(obj interface{}) { - pod, ok := obj.(*api.Pod) - if !ok { - return - } - pw.broadcaster.Notify(&PodUpdate{Op: ADD, Pod: pod}) -} - -func (pw *podWatcher) podDeleteEventHandler(obj interface{}) { - pod, ok := obj.(*api.Pod) - if !ok { - return - } - pw.broadcaster.Notify(&PodUpdate{Op: REMOVE, Pod: pod}) -} - -func (pw *podWatcher) podAUpdateEventHandler(oldObj, newObj interface{}) { - pod, ok := newObj.(*api.Pod) - if !ok { - return - } - if !reflect.DeepEqual(newObj, oldObj) { - pw.broadcaster.Notify(&PodUpdate{Op: UPDATE, Pod: pod}) - } -} - -func (pw *podWatcher) RegisterHandler(handler PodUpdatesHandler) { - pw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) { - handler.OnPodUpdate(instance.(*PodUpdate)) - })) -} - -func (pw *podWatcher) List() []*api.Pod { - objList := pw.podLister.List() - podInstances := make([]*api.Pod, len(objList)) - for i, ins := range objList { - podInstances[i] = ins.(*api.Pod) - } - return podInstances -} - -func (pw *podWatcher) ListByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) { - podLister := listers.NewPodLister(pw.podLister) - allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector()) - if err != nil { - return nil, err - } - return allMatchedNameSpacePods, nil -} - -func (pw *podWatcher) HasSynced() bool { - return pw.podController.HasSynced() -} - -var podwatchStopCh chan struct{} - -func StartPodWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*podWatcher, error) { - - pw := podWatcher{} - PodWatcher = &pw - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: pw.podAddEventHandler, - DeleteFunc: pw.podDeleteEventHandler, - UpdateFunc: pw.podAUpdateEventHandler, - } - - pw.clientset = clientset - pw.broadcaster = utils.NewBroadcaster() - lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.Everything()) - pw.podLister, pw.podController = cache.NewIndexerInformer( - lw, - &api.Pod{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - podwatchStopCh = make(chan struct{}) - go pw.podController.Run(podwatchStopCh) - return &pw, nil -} - -func StopPodWatcher() { - podwatchStopCh <- struct{}{} -} diff --git a/app/watchers/services_watcher.go b/app/watchers/services_watcher.go deleted file mode 100644 index 2374091a..00000000 --- a/app/watchers/services_watcher.go +++ /dev/null @@ -1,118 +0,0 @@ -package watchers - -import ( - "reflect" - "time" - - "github.com/cloudnativelabs/kube-router/utils" - - api "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -type ServiceUpdate struct { - Service *api.Service - Op Operation -} - -var ( - ServiceWatcher *serviceWatcher -) - -type serviceWatcher struct { - clientset kubernetes.Interface - serviceController cache.Controller - serviceLister cache.Indexer - broadcaster *utils.Broadcaster -} - -type ServiceUpdatesHandler interface { - OnServiceUpdate(serviceUpdate *ServiceUpdate) -} - -func (svcw *serviceWatcher) serviceAddEventHandler(obj interface{}) { - service, ok := obj.(*api.Service) - if !ok { - return - } - svcw.broadcaster.Notify(&ServiceUpdate{Op: ADD, Service: service}) -} - -func (svcw *serviceWatcher) serviceDeleteEventHandler(obj interface{}) { - service, ok := obj.(*api.Service) - if !ok { - return - } - svcw.broadcaster.Notify(&ServiceUpdate{Op: REMOVE, Service: service}) -} - -func (svcw *serviceWatcher) serviceAUpdateEventHandler(oldObj, newObj interface{}) { - service, ok := newObj.(*api.Service) - if !ok { - return - } - if !reflect.DeepEqual(newObj, oldObj) { - svcw.broadcaster.Notify(&ServiceUpdate{Op: UPDATE, Service: service}) - } -} - -func (svcw *serviceWatcher) RegisterHandler(handler ServiceUpdatesHandler) { - svcw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) { - handler.OnServiceUpdate(instance.(*ServiceUpdate)) - })) -} - -func (svcw *serviceWatcher) List() []*api.Service { - objList := svcw.serviceLister.List() - svcInstances := make([]*api.Service, len(objList)) - for i, ins := range objList { - svcInstances[i] = ins.(*api.Service) - } - return svcInstances -} - -func (svcw *serviceWatcher) HasSynced() bool { - return svcw.serviceController.HasSynced() -} - -var servicesStopCh chan struct{} - -// StartServiceWatcher: start watching updates for services from Kuberentes API server -func StartServiceWatcher(clientset kubernetes.Interface, resyncPeriod time.Duration) (*serviceWatcher, error) { - - svcw := serviceWatcher{} - ServiceWatcher = &svcw - - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: svcw.serviceAddEventHandler, - DeleteFunc: svcw.serviceDeleteEventHandler, - UpdateFunc: svcw.serviceAUpdateEventHandler, - } - - svcw.clientset = clientset - svcw.broadcaster = utils.NewBroadcaster() - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.CoreV1().Services(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.CoreV1().Services(metav1.NamespaceAll).Watch(options) - }, - } - - svcw.serviceLister, svcw.serviceController = cache.NewIndexerInformer( - lw, - &api.Service{}, resyncPeriod, eventHandler, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - servicesStopCh = make(chan struct{}) - go svcw.serviceController.Run(servicesStopCh) - return &svcw, nil -} -func StopServiceWatcher() { - servicesStopCh <- struct{}{} -} diff --git a/build/travis-test.sh b/build/travis-test.sh index 4f80cfe7..427228ff 100755 --- a/build/travis-test.sh +++ b/build/travis-test.sh @@ -2,5 +2,8 @@ set -o errexit set -o pipefail +echo "install moq" +go get github.com/matryer/moq + echo "Running tests on Travis" make test