refactor to use shared informers (#373)

* delete package app/watchers since we're now using shared informers

* used shared informers for events and listing resources

* install moq in travis test script
This commit is contained in:
Andrew Sy Kim 2018-04-10 09:35:03 -04:00 committed by GitHub
parent ed0dc39098
commit 3763b200a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 340 additions and 940 deletions

View File

@ -154,7 +154,7 @@ func (mc *MetricsController) Run(healthChan chan<- *ControllerHeartbeat, stopCh
}
// NewMetricsController returns new MetricController object
func NewMetricsController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*MetricsController, error) {
func NewMetricsController(clientset kubernetes.Interface, config *options.KubeRouterConfig) (*MetricsController, error) {
mc := MetricsController{}
mc.MetricsPath = config.MetricsPath
mc.MetricsPort = config.MetricsPort

View File

@ -3,7 +3,6 @@ package controllers
import (
"crypto/sha256"
"encoding/base32"
"encoding/json"
"errors"
"fmt"
"net"
@ -13,15 +12,18 @@ import (
"time"
"github.com/cloudnativelabs/kube-router/app/options"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/cloudnativelabs/kube-router/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
api "k8s.io/api/core/v1"
apiextensions "k8s.io/api/extensions/v1beta1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
const (
@ -51,6 +53,14 @@ type NetworkPolicyController struct {
// list of all active network policies expressed as networkPolicyInfo
networkPoliciesInfo *[]networkPolicyInfo
ipSetHandler *utils.IPSet
podLister cache.Indexer
npLister cache.Indexer
nsLister cache.Indexer
PodEventHandler cache.ResourceEventHandler
NamespaceEventHandler cache.ResourceEventHandler
NetworkPolicyEventHandler cache.ResourceEventHandler
}
// internal structure to represent a network policy
@ -120,16 +130,12 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
default:
}
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
glog.V(1).Info("Performing periodic sync of iptables to reflect network policies")
err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: " + err.Error())
} else {
sendHeartBeat(healthChan, "NPC")
}
glog.V(1).Info("Performing periodic sync of iptables to reflect network policies")
err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: " + err.Error())
} else {
continue
sendHeartBeat(healthChan, "NPC")
}
select {
@ -142,46 +148,37 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
}
// OnPodUpdate handles updates to pods from the Kubernetes api server
func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
glog.V(2).Infof("Received pod update namespace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on pod update: %s", err)
}
} else {
glog.V(2).Infof("Received pod update, but controller not in sync")
func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
pod := obj.(*api.Pod)
glog.V(2).Infof("Received pod update namespace:%s pod name:%s", pod.Namespace, pod.Name)
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on pod update: %s", err)
}
}
// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on network policy update: %s", err)
}
} else {
glog.V(2).Info("Received network policy update, but controller not in sync")
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on network policy update: %s", err)
}
}
// OnNamespaceUpdate handles updates to namespace from kubernetes api server
func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {
func (npc *NetworkPolicyController) OnNamespaceUpdate(obj interface{}) {
// namespace (and annotations on it) has no significance in GA ver of network policy
if npc.v1NetworkPolicy {
return
}
glog.V(2).Infof("Received namespace update namespace:%s", namespaceUpdate.Namespace.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on namespace update: %s", err)
}
} else {
glog.V(2).Info("Received namespace update, but controller not in sync")
namespace := obj.(*api.Namespace)
glog.V(2).Infof("Received update for namespace: %s", namespace.Name)
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on namespace update: %s", err)
}
}
@ -204,13 +201,13 @@ func (npc *NetworkPolicyController) Sync() error {
glog.V(1).Info("Starting periodic sync of iptables")
if npc.v1NetworkPolicy {
npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}
} else {
// TODO remove the Beta support
npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo()
npc.networkPoliciesInfo, err = npc.buildBetaNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}
@ -948,7 +945,9 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(nodeIp string) (*map[string]podInfo, error) {
nodePods := make(map[string]podInfo)
for _, pod := range watchers.PodWatcher.List() {
for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod)
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
@ -975,7 +974,9 @@ func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp str
nodePods := make(map[string]podInfo)
for _, pod := range watchers.PodWatcher.List() {
for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod)
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
@ -997,11 +998,11 @@ func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp str
return &nodePods, nil
}
func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
NetworkPolicies := make([]networkPolicyInfo, 0)
for _, policyObj := range watchers.NetworkPolicyWatcher.List() {
for _, policyObj := range npc.npLister.List() {
policy, ok := policyObj.(*networking.NetworkPolicy)
if !ok {
@ -1042,7 +1043,7 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
}
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
newPolicy.targetPods = make(map[string]podInfo)
if err == nil {
for _, matchingPod := range matchingPods {
@ -1094,15 +1095,15 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
for _, peer := range specIngressRule.From {
// spec must have either of PodSelector or NamespaceSelector
if peer.PodSelector != nil {
matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace,
peer.PodSelector.MatchLabels)
} else if peer.NamespaceSelector != nil {
namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels)
namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
for _, namespace := range namespaces {
namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil)
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, nil)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
@ -1155,15 +1156,15 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
for _, peer := range specEgressRule.To {
// spec must have either of PodSelector or NamespaceSelector
if peer.PodSelector != nil {
matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace,
peer.PodSelector.MatchLabels)
} else if peer.NamespaceSelector != nil {
namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels)
namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
for _, namespace := range namespaces {
namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil)
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, nil)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
@ -1192,11 +1193,29 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
return &NetworkPolicies, nil
}
func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) {
podLister := listers.NewPodLister(npc.podLister)
allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector())
if err != nil {
return nil, err
}
return allMatchedNameSpacePods, nil
}
func (npc *NetworkPolicyController) ListNamespaceByLabels(set labels.Set) ([]*api.Namespace, error) {
namespaceLister := listers.NewNamespaceLister(npc.npLister)
matchedNamespaces, err := namespaceLister.List(set.AsSelector())
if err != nil {
return nil, err
}
return matchedNamespaces, nil
}
func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
NetworkPolicies := make([]networkPolicyInfo, 0)
for _, policyObj := range watchers.NetworkPolicyWatcher.List() {
for _, policyObj := range npc.npLister.List() {
policy, _ := policyObj.(*apiextensions.NetworkPolicy)
newPolicy := networkPolicyInfo{
@ -1204,7 +1223,7 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
namespace: policy.Namespace,
labels: policy.Spec.PodSelector.MatchLabels,
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
newPolicy.targetPods = make(map[string]podInfo)
newPolicy.ingressRules = make([]ingressRule, 0)
if err == nil {
@ -1227,7 +1246,7 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
ingressRule.srcPods = make([]podInfo, 0)
for _, peer := range specIngressRule.From {
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
if err == nil {
for _, matchingPod := range matchingPods {
ingressRule.srcPods = append(ingressRule.srcPods,
@ -1246,25 +1265,6 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
return &NetworkPolicies, nil
}
func getNameSpaceDefaultPolicy(namespace string) (string, error) {
for _, nspw := range watchers.NamespaceWatcher.List() {
if strings.Compare(namespace, nspw.Name) == 0 {
networkPolicy, ok := nspw.ObjectMeta.Annotations[networkPolicyAnnotation]
var annot map[string]map[string]string
if ok {
err := json.Unmarshal([]byte(networkPolicy), &annot)
if err == nil {
return annot["ingress"]["isolation"], nil
}
glog.Errorf("Skipping invalid network-policy for namespace \"%s\": %s", namespace, err)
return "DefaultAllow", errors.New("Invalid NetworkPolicy")
}
return "DefaultAllow", nil
}
}
return "", errors.New("Failed to get the default ingress policy for the namespace: " + namespace)
}
func podFirewallChainName(namespace, podName string) string {
hash := sha256.Sum256([]byte(namespace + podName))
encoded := base32.StdEncoding.EncodeToString(hash[:])
@ -1385,8 +1385,59 @@ func (npc *NetworkPolicyController) Cleanup() {
glog.Infof("Successfully cleaned the iptables configuration done by kube-router")
}
func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnPodUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnPodUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
npc.OnPodUpdate(obj)
},
}
}
func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnNamespaceUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnNamespaceUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
npc.OnNamespaceUpdate(obj)
},
}
}
func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnNetworkPolicyUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnNetworkPolicyUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
npc.OnNetworkPolicyUpdate(obj)
},
}
}
// NewNetworkPolicyController returns new NetworkPolicyController object
func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkPolicyController, error) {
func NewNetworkPolicyController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{}
if config.MetricsEnabled {
@ -1427,9 +1478,14 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options
}
npc.ipSetHandler = ipset
watchers.PodWatcher.RegisterHandler(&npc)
watchers.NetworkPolicyWatcher.RegisterHandler(&npc)
watchers.NamespaceWatcher.RegisterHandler(&npc)
npc.podLister = podInformer.GetIndexer()
npc.PodEventHandler = npc.newPodEventHandler()
npc.nsLister = nsInformer.GetIndexer()
npc.NamespaceEventHandler = npc.newNamespaceEventHandler()
npc.npLister = npInformer.GetIndexer()
npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler()
return &npc, nil
}

View File

@ -20,7 +20,6 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/cloudnativelabs/kube-router/app/options"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/cloudnativelabs/kube-router/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
@ -31,6 +30,7 @@ import (
"github.com/osrg/gobgp/table"
"github.com/prometheus/client_golang/prometheus"
"github.com/vishvananda/netlink"
v1core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@ -96,6 +96,12 @@ type NetworkRoutingController struct {
cniConfFile string
initSrcDstCheckDone bool
ec2IamAuthorized bool
nodeLister cache.Indexer
svcLister cache.Indexer
epLister cache.Indexer
NodeEventHandler cache.ResourceEventHandler
}
// Run runs forever until we are notified on stop channel
@ -438,7 +444,9 @@ func (nrc *NetworkRoutingController) getLoadBalancerIps(svc *v1core.Service) []s
func (nrc *NetworkRoutingController) getIpsToAdvertise(verifyEndpoints bool) ([]string, []string, error) {
ipsToAdvertise := make([]string, 0)
ipsToUnAdvertise := make([]string, 0)
for _, svc := range watchers.ServiceWatcher.List() {
for _, obj := range nrc.svcLister.List() {
svc := obj.(*v1core.Service)
ipList := make([]string, 0)
var err error
nodeHasEndpoints := true
@ -497,7 +505,7 @@ func (nrc *NetworkRoutingController) nodeHasEndpointsForService(svc *v1core.Serv
if err != nil {
return false, err
}
item, exists, err := watchers.EndpointsWatcher.GetByKey(key)
item, exists, err := nrc.epLister.GetByKey(key)
if err != nil {
return false, err
}
@ -1343,17 +1351,11 @@ func rtTablesAdd(tableNumber, tableName string) error {
// OnNodeUpdate Handle updates from Node watcher. Node watcher calls this method whenever there is
// new node is added or old node is deleted. So peer up with new node and drop peering
// from old node
func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdate) {
func (nrc *NetworkRoutingController) OnNodeUpdate(obj interface{}) {
if !nrc.bgpServerStarted {
return
}
node := nodeUpdate.Node
nodeIP, _ := utils.GetNodeIP(node)
if nodeUpdate.Op == watchers.ADD {
glog.V(2).Infof("Received node %s added update from watch API so peer with new node", nodeIP)
} else if nodeUpdate.Op == watchers.REMOVE {
glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP)
}
if nrc.bgpEnableInternal {
nrc.syncInternalPeers()
}
@ -1559,11 +1561,35 @@ func generateTunnelName(nodeIP string) string {
return "tun" + hash
}
func (nrc *NetworkRoutingController) newNodeEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1core.Node)
nodeIP, _ := utils.GetNodeIP(node)
glog.V(2).Infof("Received node %s added update from watch API so peer with new node", nodeIP)
},
UpdateFunc: func(oldObj, newObj interface{}) {
nrc.OnNodeUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
node := obj.(*v1core.Node)
nodeIP, _ := utils.GetNodeIP(node)
glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP)
},
}
}
// func (nrc *NetworkRoutingController) getExternalNodeIPs(
// NewNetworkRoutingController returns new NetworkRoutingController object
func NewNetworkRoutingController(clientset *kubernetes.Clientset,
kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) {
func NewNetworkRoutingController(clientset kubernetes.Interface,
kubeRouterConfig *options.KubeRouterConfig,
nodeInformer cache.SharedIndexInformer, svcInformer cache.SharedIndexInformer,
epInformer cache.SharedIndexInformer) (*NetworkRoutingController, error) {
var err error
@ -1676,9 +1702,11 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset,
"which its configured: " + err.Error())
}
if nrc.bgpEnableInternal {
watchers.NodeWatcher.RegisterHandler(&nrc)
}
nrc.svcLister = svcInformer.GetIndexer()
nrc.epLister = epInformer.GetIndexer()
nrc.nodeLister = nodeInformer.GetIndexer()
nrc.NodeEventHandler = nrc.newNodeEventHandler()
return &nrc, nil
}

View File

@ -8,15 +8,16 @@ import (
"testing"
"time"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/osrg/gobgp/config"
gobgp "github.com/osrg/gobgp/server"
"github.com/osrg/gobgp/table"
v1core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
)
func Test_advertiseClusterIPs(t *testing.T) {
@ -173,18 +174,15 @@ func Test_advertiseClusterIPs(t *testing.T) {
w := testcase.nrc.bgpServer.Watch(gobgp.WatchBestPath(false))
clientset := fake.NewSimpleClientset()
_, err = watchers.StartServiceWatcher(clientset, 0)
if err != nil {
t.Fatalf("failed to initialize service watcher: %v", err)
}
startInformersForRoutes(testcase.nrc, clientset)
err = createServices(clientset, testcase.existingServices)
if err != nil {
t.Fatalf("failed to create existing services: %v", err)
}
waitForListerWithTimeout(time.Second*10, t)
waitForListerWithTimeout(testcase.nrc.svcLister, time.Second*10, t)
// ClusterIPs
testcase.nrc.advertiseClusterIp = true
testcase.nrc.advertiseExternalIp = false
@ -488,18 +486,15 @@ func Test_advertiseExternalIPs(t *testing.T) {
w := testcase.nrc.bgpServer.Watch(gobgp.WatchBestPath(false))
clientset := fake.NewSimpleClientset()
_, err = watchers.StartServiceWatcher(clientset, 0)
if err != nil {
t.Fatalf("failed to initialize service watcher: %v", err)
}
startInformersForRoutes(testcase.nrc, clientset)
err = createServices(clientset, testcase.existingServices)
if err != nil {
t.Fatalf("failed to create existing services: %v", err)
}
waitForListerWithTimeout(time.Second*10, t)
waitForListerWithTimeout(testcase.nrc.svcLister, time.Second*10, t)
// ExternalIPs
testcase.nrc.advertiseClusterIp = false
testcase.nrc.advertiseExternalIp = true
@ -613,18 +608,9 @@ func Test_nodeHasEndpointsForService(t *testing.T) {
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
clientset := fake.NewSimpleClientset()
startInformersForRoutes(testcase.nrc, clientset)
_, err := watchers.StartServiceWatcher(clientset, 0)
if err != nil {
t.Fatalf("failed to initialize service watcher: %v", err)
}
_, err = watchers.StartEndpointsWatcher(clientset, 0)
if err != nil {
t.Fatalf("failed to initialize endpoints watcher: %v", err)
}
_, err = clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint)
_, err := clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint)
if err != nil {
t.Fatalf("failed to create existing endpoints: %v", err)
}
@ -634,7 +620,8 @@ func Test_nodeHasEndpointsForService(t *testing.T) {
t.Fatalf("failed to create existing services: %v", err)
}
waitForListerWithTimeout(time.Second*10, t)
waitForListerWithTimeout(testcase.nrc.svcLister, time.Second*10, t)
waitForListerWithTimeout(testcase.nrc.epLister, time.Second*10, t)
nodeHasEndpoints, err := testcase.nrc.nodeHasEndpointsForService(testcase.existingService)
if !reflect.DeepEqual(err, testcase.err) {
@ -1400,15 +1387,7 @@ func Test_addExportPolicies(t *testing.T) {
}
defer testcase.nrc.bgpServer.Stop()
_, err = watchers.StartServiceWatcher(testcase.nrc.clientset, 0)
if err != nil {
t.Fatalf("failed to initialize service watcher %v", err)
}
_, err = watchers.StartEndpointsWatcher(testcase.nrc.clientset, 0)
if err != nil {
t.Fatalf("failed to initialize endpoints watcher %v", err)
}
startInformersForRoutes(testcase.nrc, testcase.nrc.clientset)
if err = createNodes(testcase.nrc.clientset, testcase.existingNodes); err != nil {
t.Errorf("failed to create existing nodes: %v", err)
@ -1419,12 +1398,12 @@ func Test_addExportPolicies(t *testing.T) {
}
// ClusterIPs and ExternalIPs
waitForListerWithTimeout(testcase.nrc.svcLister, time.Second*10, t)
testcase.nrc.advertiseClusterIp = true
testcase.nrc.advertiseExternalIp = true
testcase.nrc.advertiseLoadBalancerIp = false
waitForListerWithTimeout(time.Second*10, t)
err = testcase.nrc.addExportPolicies()
if !reflect.DeepEqual(err, testcase.err) {
t.Logf("expected err %v", testcase.err)
@ -1565,7 +1544,19 @@ func createNodes(clientset kubernetes.Interface, nodes []*v1core.Node) error {
return nil
}
func waitForListerWithTimeout(timeout time.Duration, t *testing.T) {
func startInformersForRoutes(nrc *NetworkRoutingController, clientset kubernetes.Interface) {
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
svcInformer := informerFactory.Core().V1().Services().Informer()
epInformer := informerFactory.Core().V1().Endpoints().Informer()
go informerFactory.Start(nil)
informerFactory.WaitForCacheSync(nil)
nrc.svcLister = svcInformer.GetIndexer()
nrc.epLister = epInformer.GetIndexer()
}
func waitForListerWithTimeout(lister cache.Indexer, timeout time.Duration, t *testing.T) {
tick := time.Tick(100 * time.Millisecond)
timeoutCh := time.After(timeout)
for {
@ -1573,7 +1564,7 @@ func waitForListerWithTimeout(timeout time.Duration, t *testing.T) {
case <-timeoutCh:
t.Fatal("timeout exceeded waiting for service lister to fill cache")
case <-tick:
if len(watchers.ServiceWatcher.List()) != 0 {
if len(lister.List()) != 0 {
return
}
}

View File

@ -19,7 +19,6 @@ import (
"time"
"github.com/cloudnativelabs/kube-router/app/options"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/cloudnativelabs/kube-router/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/docker/docker/client"
@ -29,9 +28,11 @@ import (
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"golang.org/x/net/context"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
const (
@ -162,10 +163,17 @@ type NetworkServicesController struct {
podCidr string
masqueradeAll bool
globalHairpin bool
client *kubernetes.Clientset
client kubernetes.Interface
nodeportBindOnAllIp bool
MetricsEnabled bool
ln LinuxNetworking
svcLister cache.Indexer
epLister cache.Indexer
podLister cache.Indexer
ServiceEventHandler cache.ResourceEventHandler
EndpointsEventHandler cache.ResourceEventHandler
}
// internal representation of kubernetes service
@ -229,16 +237,12 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *ControllerHeartbeat
default:
}
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
glog.V(1).Info("Performing periodic sync of ipvs services")
err := nsc.sync()
if err != nil {
glog.Errorf("Error during periodic ipvs sync: " + err.Error())
} else {
sendHeartBeat(healthChan, "NSC")
}
glog.V(1).Info("Performing periodic sync of ipvs services")
err := nsc.sync()
if err != nil {
glog.Errorf("Error during periodic ipvs sync: " + err.Error())
} else {
continue
sendHeartBeat(healthChan, "NSC")
}
select {
@ -255,8 +259,8 @@ func (nsc *NetworkServicesController) sync() error {
nsc.mu.Lock()
defer nsc.mu.Unlock()
nsc.serviceMap = buildServicesInfo()
nsc.endpointsMap = buildEndpointsInfo()
nsc.serviceMap = nsc.buildServicesInfo()
nsc.endpointsMap = nsc.buildEndpointsInfo()
err = nsc.syncHairpinIptablesRules()
if err != nil {
glog.Errorf("Error syncing hairpin iptable rules: %s", err.Error())
@ -343,18 +347,14 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM
}
// OnEndpointsUpdate handle change in endpoints update from the API server
func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watchers.EndpointsUpdate) {
func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
nsc.mu.Lock()
defer nsc.mu.Unlock()
glog.V(1).Info("Received endpoints update from watch API")
if !(watchers.ServiceWatcher.HasSynced() && watchers.EndpointsWatcher.HasSynced()) {
glog.V(1).Info("Skipping ipvs server sync as local cache is not synced yet")
}
// build new endpoints map to reflect the change
newEndpointsMap := buildEndpointsInfo()
newEndpointsMap := nsc.buildEndpointsInfo()
if len(newEndpointsMap) != len(nsc.endpointsMap) || !reflect.DeepEqual(newEndpointsMap, nsc.endpointsMap) {
nsc.endpointsMap = newEndpointsMap
@ -365,18 +365,14 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watcher
}
// OnServiceUpdate handle change in service update from the API server
func (nsc *NetworkServicesController) OnServiceUpdate(serviceUpdate *watchers.ServiceUpdate) {
func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) {
nsc.mu.Lock()
defer nsc.mu.Unlock()
glog.V(1).Info("Received service update from watch API")
if !(watchers.ServiceWatcher.HasSynced() && watchers.EndpointsWatcher.HasSynced()) {
glog.V(1).Info("Skipping ipvs server sync as local cache is not synced yet")
}
// build new services map to reflect the change
newServiceMap := buildServicesInfo()
newServiceMap := nsc.buildServicesInfo()
if len(newServiceMap) != len(nsc.serviceMap) || !reflect.DeepEqual(newServiceMap, nsc.serviceMap) {
nsc.serviceMap = newServiceMap
@ -636,7 +632,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
// For now just support IPVS tunnel mode, we can add other ways of DSR in future
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
podObj, err := getPodObjectForEndpoint(endpoint.ip)
podObj, err := nsc.getPodObjectForEndpoint(endpoint.ip)
if err != nil {
glog.Errorf("Failed to find endpoint with ip: " + endpoint.ip + ". so skipping peparing endpoint for DSR")
continue
@ -746,8 +742,9 @@ func isLocalEndpoint(ip, podCidr string) (bool, error) {
return false, nil
}
func getPodObjectForEndpoint(endpointIP string) (*api.Pod, error) {
for _, pod := range watchers.PodWatcher.List() {
func (nsc *NetworkServicesController) getPodObjectForEndpoint(endpointIP string) (*api.Pod, error) {
for _, obj := range nsc.podLister.List() {
pod := obj.(*api.Pod)
if strings.Compare(pod.Status.PodIP, endpointIP) == 0 {
return pod, nil
}
@ -919,9 +916,10 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP
return nil
}
func buildServicesInfo() serviceInfoMap {
func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap {
serviceMap := make(serviceInfoMap)
for _, svc := range watchers.ServiceWatcher.List() {
for _, obj := range nsc.svcLister.List() {
svc := obj.(*api.Service)
if svc.Spec.ClusterIP == "None" || svc.Spec.ClusterIP == "" {
glog.V(2).Infof("Skipping service name:%s namespace:%s as there is no cluster IP", svc.Name, svc.Namespace)
@ -991,9 +989,11 @@ func shuffle(endPoints []endpointsInfo) []endpointsInfo {
return endPoints
}
func buildEndpointsInfo() endpointsInfoMap {
func (nsc *NetworkServicesController) buildEndpointsInfo() endpointsInfoMap {
endpointsMap := make(endpointsInfoMap)
for _, ep := range watchers.EndpointsWatcher.List() {
for _, obj := range nsc.epLister.List() {
ep := obj.(*api.Endpoints)
for _, epSubset := range ep.Subsets {
for _, port := range epSubset.Ports {
svcId := generateServiceId(ep.Namespace, ep.Name, port.Name)
@ -1718,8 +1718,41 @@ func (nsc *NetworkServicesController) Cleanup() {
glog.Infof("Successfully cleaned the ipvs configuration done by kube-router")
}
func (nsc *NetworkServicesController) newEndpointsEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
nsc.OnEndpointsUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
nsc.OnEndpointsUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
nsc.OnEndpointsUpdate(obj)
},
}
}
func (nsc *NetworkServicesController) newSvcEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
nsc.OnServiceUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
nsc.OnServiceUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
nsc.OnServiceUpdate(obj)
},
}
}
// NewNetworkServicesController returns NetworkServicesController object
func NewNetworkServicesController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkServicesController, error) {
func NewNetworkServicesController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer,
epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer) (*NetworkServicesController, error) {
var err error
ln, err := newLinuxNetworking()
@ -1781,8 +1814,13 @@ func NewNetworkServicesController(clientset *kubernetes.Clientset, config *optio
}
nsc.nodeIP = nodeIP
watchers.EndpointsWatcher.RegisterHandler(&nsc)
watchers.ServiceWatcher.RegisterHandler(&nsc)
nsc.podLister = podInformer.GetIndexer()
nsc.svcLister = svcInformer.GetIndexer()
nsc.ServiceEventHandler = nsc.newSvcEventHandler()
nsc.epLister = epInformer.GetIndexer()
nsc.EndpointsEventHandler = nsc.newEndpointsEventHandler()
rand.Seed(time.Now().UnixNano())

View File

@ -5,14 +5,16 @@ import (
"net"
"time"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/docker/libnetwork/ipvs"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vishvananda/netlink"
v1core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
)
type LinuxNetworkingMockImpl struct {
@ -87,7 +89,7 @@ func fatalf(format string, a ...interface{}) {
// that receives a 2nd *testing argument - mixing testing and ginkgo
// is discouraged (latter uses own GinkgoWriter), so need to create
// our own here.
func waitForListerWithTimeoutG(timeout time.Duration) {
func waitForListerWithTimeoutG(lister cache.Indexer, timeout time.Duration) {
tick := time.Tick(100 * time.Millisecond)
timeoutCh := time.After(timeout)
for {
@ -95,7 +97,7 @@ func waitForListerWithTimeoutG(timeout time.Duration) {
case <-timeoutCh:
fatalf("timeout exceeded waiting for service lister to fill cache")
case <-tick:
if len(watchers.ServiceWatcher.List()) != 0 {
if len(lister.List()) != 0 {
return
}
}
@ -132,17 +134,7 @@ var _ = Describe("NetworkServicesController", func() {
JustBeforeEach(func() {
clientset := fake.NewSimpleClientset()
_, err := watchers.StartServiceWatcher(clientset, 0)
if err != nil {
fatalf("failed to initialize service watcher: %v", err)
}
_, err = watchers.StartEndpointsWatcher(clientset, 0)
if err != nil {
fatalf("failed to initialize endpoints watcher: %v", err)
}
_, err = clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint)
_, err := clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint)
if err != nil {
fatalf("failed to create existing endpoints: %v", err)
}
@ -152,15 +144,18 @@ var _ = Describe("NetworkServicesController", func() {
fatalf("failed to create existing services: %v", err)
}
waitForListerWithTimeoutG(time.Second * 10)
nsc = &NetworkServicesController{
nodeIP: net.ParseIP("10.0.0.0"),
nodeHostName: "node-1",
ln: mockedLinuxNetworking,
serviceMap: buildServicesInfo(),
endpointsMap: buildEndpointsInfo(),
}
startInformersForServiceProxy(nsc, clientset)
waitForListerWithTimeoutG(nsc.svcLister, time.Second*10)
waitForListerWithTimeoutG(nsc.epLister, time.Second*10)
nsc.serviceMap = nsc.buildServicesInfo()
nsc.endpointsMap = nsc.buildEndpointsInfo()
})
Context("service no endpoints with externalIPs", func() {
var fooSvc1, fooSvc2 *ipvs.Service
@ -507,3 +502,17 @@ var _ = Describe("NetworkServicesController", func() {
})
})
})
func startInformersForServiceProxy(nsc *NetworkServicesController, clientset kubernetes.Interface) {
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
svcInformer := informerFactory.Core().V1().Services().Informer()
epInformer := informerFactory.Core().V1().Endpoints().Informer()
podInformer := informerFactory.Core().V1().Pods().Informer()
go informerFactory.Start(nil)
informerFactory.WaitForCacheSync(nil)
nsc.svcLister = svcInformer.GetIndexer()
nsc.epLister = epInformer.GetIndexer()
nsc.podLister = podInformer.GetIndexer()
}

View File

@ -11,8 +11,9 @@ import (
"github.com/cloudnativelabs/kube-router/app/controllers"
"github.com/cloudnativelabs/kube-router/app/options"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/golang/glog"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@ -24,7 +25,7 @@ var buildDate string
// KubeRouter holds the information needed to run server
type KubeRouter struct {
Client *kubernetes.Clientset
Client kubernetes.Interface
Config *options.KubeRouterConfig
}
@ -67,53 +68,6 @@ func CleanupConfigAndExit() {
nrc.Cleanup()
}
// start API watchers to get notification on changes
func (kr *KubeRouter) startApiWatchers() error {
var err error
_, err = watchers.StartPodWatcher(kr.Client, kr.Config.ConfigSyncPeriod)
if err != nil {
return errors.New("Failed to launch pod api watcher: " + err.Error())
}
_, err = watchers.StartEndpointsWatcher(kr.Client, kr.Config.ConfigSyncPeriod)
if err != nil {
return errors.New("Failed to launch endpoint api watcher: " + err.Error())
}
_, err = watchers.StartNetworkPolicyWatcher(kr.Client, kr.Config.ConfigSyncPeriod)
if err != nil {
return errors.New("Failed to launch network policy api watcher: " + err.Error())
}
_, err = watchers.StartNamespaceWatcher(kr.Client, kr.Config.ConfigSyncPeriod)
if err != nil {
return errors.New("Failed to launch namespace api watcher: " + err.Error())
}
_, err = watchers.StartServiceWatcher(kr.Client, kr.Config.ConfigSyncPeriod)
if err != nil {
return errors.New("Failed to launch service api watcher: " + err.Error())
}
_, err = watchers.StartNodeWatcher(kr.Client, kr.Config.ConfigSyncPeriod)
if err != nil {
return errors.New("Failed to launch nodes api watcher: " + err.Error())
}
return nil
}
func (kr *KubeRouter) stopApiWatchers() {
watchers.StopPodWatcher()
watchers.StopEndpointsWatcher()
watchers.StopNetworkPolicyWatcher()
watchers.StopNamespaceWatcher()
watchers.StopServiceWatcher()
watchers.StopNodeWatcher()
}
// Run starts the controllers and waits forever till we get SIGINT or SIGTERM
func (kr *KubeRouter) Run() error {
var err error
@ -124,11 +78,6 @@ func (kr *KubeRouter) Run() error {
stopCh := make(chan struct{})
err = kr.startApiWatchers()
if err != nil {
return errors.New("Failed to start API watchers: " + err.Error())
}
if !(kr.Config.RunFirewall || kr.Config.RunServiceProxy || kr.Config.RunRouter) {
glog.Info("Router, Firewall or Service proxy functionality must be specified. Exiting!")
os.Exit(0)
@ -157,32 +106,55 @@ func (kr *KubeRouter) Run() error {
kr.Config.MetricsEnabled = false
}
informerFactory := informers.NewSharedInformerFactory(kr.Client, kr.Config.ConfigSyncPeriod)
svcInformer := informerFactory.Core().V1().Services().Informer()
epInformer := informerFactory.Core().V1().Endpoints().Informer()
podInformer := informerFactory.Core().V1().Pods().Informer()
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
nsInformer := informerFactory.Core().V1().Namespaces().Informer()
npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer()
go informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)
if kr.Config.RunFirewall {
npc, err := controllers.NewNetworkPolicyController(kr.Client, kr.Config)
npc, err := controllers.NewNetworkPolicyController(kr.Client,
kr.Config, podInformer, npInformer, nsInformer)
if err != nil {
return errors.New("Failed to create network policy controller: " + err.Error())
}
podInformer.AddEventHandler(npc.PodEventHandler)
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)
wg.Add(1)
go npc.Run(healthChan, stopCh, &wg)
}
if kr.Config.RunRouter {
nrc, err := controllers.NewNetworkRoutingController(kr.Client, kr.Config)
nrc, err := controllers.NewNetworkRoutingController(kr.Client, kr.Config, nodeInformer, svcInformer, epInformer)
if err != nil {
return errors.New("Failed to create network routing controller: " + err.Error())
}
nodeInformer.AddEventHandler(nrc.NodeEventHandler)
wg.Add(1)
go nrc.Run(healthChan, stopCh, &wg)
}
if kr.Config.RunServiceProxy {
nsc, err := controllers.NewNetworkServicesController(kr.Client, kr.Config)
nsc, err := controllers.NewNetworkServicesController(kr.Client, kr.Config,
svcInformer, epInformer, podInformer)
if err != nil {
return errors.New("Failed to create network services controller: " + err.Error())
}
svcInformer.AddEventHandler(nsc.ServiceEventHandler)
epInformer.AddEventHandler(nsc.EndpointsEventHandler)
wg.Add(1)
go nsc.Run(healthChan, stopCh, &wg)
}
@ -195,8 +167,6 @@ func (kr *KubeRouter) Run() error {
glog.Infof("Shutting down the controllers")
close(stopCh)
kr.stopApiWatchers()
wg.Wait()
return nil
}

View File

@ -1,131 +0,0 @@
package watchers
import (
"reflect"
"time"
"github.com/cloudnativelabs/kube-router/utils"
api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type Operation int
const (
ADD Operation = iota
UPDATE
REMOVE
SYNCED
)
type EndpointsUpdate struct {
Endpoints *api.Endpoints
Op Operation
}
var (
EndpointsWatcher *endpointsWatcher
)
type endpointsWatcher struct {
clientset kubernetes.Interface
endpointsController cache.Controller
endpointsLister cache.Indexer
broadcaster *utils.Broadcaster
}
type EndpointsUpdatesHandler interface {
OnEndpointsUpdate(endpointsUpdate *EndpointsUpdate)
}
func (ew *endpointsWatcher) endpointsAddEventHandler(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
return
}
ew.broadcaster.Notify(&EndpointsUpdate{Op: ADD, Endpoints: endpoints})
}
func (ew *endpointsWatcher) endpointsDeleteEventHandler(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
return
}
ew.broadcaster.Notify(&EndpointsUpdate{Op: REMOVE, Endpoints: endpoints})
}
func (ew *endpointsWatcher) endpointsUpdateEventHandler(oldObj, newObj interface{}) {
endpoints, ok := newObj.(*api.Endpoints)
if !ok {
return
}
if !reflect.DeepEqual(newObj, oldObj) {
if endpoints.Name != "kube-scheduler" && endpoints.Name != "kube-controller-manager" {
ew.broadcaster.Notify(&EndpointsUpdate{Op: UPDATE, Endpoints: endpoints})
}
}
}
func (ew *endpointsWatcher) RegisterHandler(handler EndpointsUpdatesHandler) {
ew.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) {
handler.OnEndpointsUpdate(instance.(*EndpointsUpdate))
}))
}
func (ew *endpointsWatcher) List() []*api.Endpoints {
objList := ew.endpointsLister.List()
epInstances := make([]*api.Endpoints, len(objList))
for i, ins := range objList {
epInstances[i] = ins.(*api.Endpoints)
}
return epInstances
}
func (ew *endpointsWatcher) GetByKey(key string) (item interface{}, exists bool, err error) {
return ew.endpointsLister.GetByKey(key)
}
func (ew *endpointsWatcher) HasSynced() bool {
return ew.endpointsController.HasSynced()
}
var endpointsStopCh chan struct{}
func StartEndpointsWatcher(clientset kubernetes.Interface, resyncPeriod time.Duration) (*endpointsWatcher, error) {
ew := endpointsWatcher{}
EndpointsWatcher = &ew
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: ew.endpointsAddEventHandler,
DeleteFunc: ew.endpointsDeleteEventHandler,
UpdateFunc: ew.endpointsUpdateEventHandler,
}
ew.clientset = clientset
ew.broadcaster = utils.NewBroadcaster()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Endpoints(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Endpoints(metav1.NamespaceAll).Watch(options)
},
}
ew.endpointsLister, ew.endpointsController = cache.NewIndexerInformer(
lw,
&api.Endpoints{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
endpointsStopCh = make(chan struct{})
go ew.endpointsController.Run(endpointsStopCh)
return &ew, nil
}
func StopEndpointsWatcher() {
endpointsStopCh <- struct{}{}
}

View File

@ -1,113 +0,0 @@
package watchers
import (
"reflect"
"time"
"github.com/cloudnativelabs/kube-router/utils"
api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
type NamespaceUpdate struct {
Namespace *api.Namespace
Op Operation
}
var (
NamespaceWatcher *namespaceWatcher
namespaceStopCh chan struct{}
)
type namespaceWatcher struct {
clientset *kubernetes.Clientset
namespaceController cache.Controller
namespaceLister cache.Indexer
broadcaster *utils.Broadcaster
}
type NamespaceUpdatesHandler interface {
OnNamespaceUpdate(namespaceUpdate *NamespaceUpdate)
}
func (nsw *namespaceWatcher) namespaceAddEventHandler(obj interface{}) {
namespace, ok := obj.(*api.Namespace)
if !ok {
return
}
nsw.broadcaster.Notify(&NamespaceUpdate{Op: ADD, Namespace: namespace})
}
func (nsw *namespaceWatcher) namespaceDeleteEventHandler(obj interface{}) {
namespace, ok := obj.(*api.Namespace)
if !ok {
return
}
nsw.broadcaster.Notify(&NamespaceUpdate{Op: REMOVE, Namespace: namespace})
}
func (nsw *namespaceWatcher) namespaceUpdateEventHandler(oldObj, newObj interface{}) {
namespace, ok := newObj.(*api.Namespace)
if !ok {
return
}
if !reflect.DeepEqual(newObj, oldObj) {
nsw.broadcaster.Notify(&NamespaceUpdate{Op: UPDATE, Namespace: namespace})
}
}
func (nsw *namespaceWatcher) List() []*api.Namespace {
objList := nsw.namespaceLister.List()
namespaceInstances := make([]*api.Namespace, len(objList))
for i, ins := range objList {
namespaceInstances[i] = ins.(*api.Namespace)
}
return namespaceInstances
}
func (nsw *namespaceWatcher) ListByLabels(set labels.Set) ([]*api.Namespace, error) {
namespaceLister := listers.NewNamespaceLister(nsw.namespaceLister)
matchedNamespaces, err := namespaceLister.List(set.AsSelector())
if err != nil {
return nil, err
}
return matchedNamespaces, nil
}
func (nsw *namespaceWatcher) RegisterHandler(handler NamespaceUpdatesHandler) {
nsw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) {
handler.OnNamespaceUpdate(instance.(*NamespaceUpdate))
}))
}
func StartNamespaceWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*namespaceWatcher, error) {
nsw := namespaceWatcher{}
NamespaceWatcher = &nsw
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: nsw.namespaceAddEventHandler,
DeleteFunc: nsw.namespaceDeleteEventHandler,
UpdateFunc: nsw.namespaceUpdateEventHandler,
}
nsw.clientset = clientset
nsw.broadcaster = utils.NewBroadcaster()
lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "namespaces", metav1.NamespaceAll, fields.Everything())
nsw.namespaceLister, nsw.namespaceController = cache.NewIndexerInformer(
lw,
&api.Namespace{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
namespaceStopCh = make(chan struct{})
go nsw.namespaceController.Run(namespaceStopCh)
return &nsw, nil
}
func StopNamespaceWatcher() {
namespaceStopCh <- struct{}{}
}

View File

@ -1,114 +0,0 @@
package watchers
import (
"errors"
"reflect"
"strconv"
"time"
"github.com/cloudnativelabs/kube-router/utils"
apiextensions "k8s.io/api/extensions/v1beta1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type NetworkPolicyUpdate struct {
NetworkPolicy interface{}
Op Operation
}
var (
NetworkPolicyWatcher *networkPolicyWatcher
)
type networkPolicyWatcher struct {
clientset *kubernetes.Clientset
networkPolicyController cache.Controller
networkPolicyLister cache.Indexer
broadcaster *utils.Broadcaster
}
type NetworkPolicyUpdatesHandler interface {
OnNetworkPolicyUpdate(networkPolicyUpdate *NetworkPolicyUpdate)
}
func (npw *networkPolicyWatcher) networkPolicyAddEventHandler(obj interface{}) {
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: obj})
}
func (npw *networkPolicyWatcher) networkPolicyDeleteEventHandler(obj interface{}) {
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: obj})
}
func (npw *networkPolicyWatcher) networkPolicyUpdateEventHandler(oldObj, newObj interface{}) {
if !reflect.DeepEqual(newObj, oldObj) {
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: newObj})
}
}
func (npw *networkPolicyWatcher) RegisterHandler(handler NetworkPolicyUpdatesHandler) {
npw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) {
handler.OnNetworkPolicyUpdate(instance.(*NetworkPolicyUpdate))
}))
}
func (npw *networkPolicyWatcher) List() []interface{} {
return npw.networkPolicyLister.List()
}
func (npw *networkPolicyWatcher) HasSynced() bool {
return npw.networkPolicyController.HasSynced()
}
var networkPolicyStopCh chan struct{}
func StartNetworkPolicyWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*networkPolicyWatcher, error) {
npw := networkPolicyWatcher{}
NetworkPolicyWatcher = &npw
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: npw.networkPolicyAddEventHandler,
DeleteFunc: npw.networkPolicyDeleteEventHandler,
UpdateFunc: npw.networkPolicyUpdateEventHandler,
}
npw.clientset = clientset
v1NetworkPolicy := true
v, err := clientset.Discovery().ServerVersion()
if err != nil {
return nil, errors.New("Failed to get API server version due to " + err.Error())
}
minorVer, _ := strconv.Atoi(v.Minor)
if v.Major == "1" && minorVer < 7 {
v1NetworkPolicy = false
}
npw.broadcaster = utils.NewBroadcaster()
var lw *cache.ListWatch
if v1NetworkPolicy {
lw = cache.NewListWatchFromClient(clientset.NetworkingV1().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw, &networking.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
} else {
lw = cache.NewListWatchFromClient(clientset.ExtensionsV1beta1().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw, &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
}
networkPolicyStopCh = make(chan struct{})
go npw.networkPolicyController.Run(networkPolicyStopCh)
return &npw, nil
}
func StopNetworkPolicyWatcher() {
networkPolicyStopCh <- struct{}{}
}

View File

@ -1,101 +0,0 @@
package watchers
import (
"time"
"github.com/cloudnativelabs/kube-router/utils"
api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type NodeUpdate struct {
Node *api.Node
Op Operation
}
var (
NodeWatcher *nodeWatcher
)
type nodeWatcher struct {
clientset *kubernetes.Clientset
nodeController cache.Controller
nodeLister cache.Indexer
broadcaster *utils.Broadcaster
}
type NodeUpdatesHandler interface {
OnNodeUpdate(nodeUpdate *NodeUpdate)
}
func (nw *nodeWatcher) nodeAddEventHandler(obj interface{}) {
node, ok := obj.(*api.Node)
if !ok {
return
}
nw.broadcaster.Notify(&NodeUpdate{Op: ADD, Node: node})
}
func (nw *nodeWatcher) nodeDeleteEventHandler(obj interface{}) {
node, ok := obj.(*api.Node)
if !ok {
return
}
nw.broadcaster.Notify(&NodeUpdate{Op: REMOVE, Node: node})
}
func (nw *nodeWatcher) nodeUpdateEventHandler(oldObj, newObj interface{}) {
// we are interested only node add/delete, so skip update
return
}
func (nw *nodeWatcher) RegisterHandler(handler NodeUpdatesHandler) {
nw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) {
handler.OnNodeUpdate(instance.(*NodeUpdate))
}))
}
func (nw *nodeWatcher) List() []*api.Node {
objList := nw.nodeLister.List()
nodeInstances := make([]*api.Node, len(objList))
for i, ins := range objList {
nodeInstances[i] = ins.(*api.Node)
}
return nodeInstances
}
func (nw *nodeWatcher) HasSynced() bool {
return nw.nodeController.HasSynced()
}
var nodewatchStopCh chan struct{}
func StartNodeWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*nodeWatcher, error) {
nw := nodeWatcher{}
NodeWatcher = &nw
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: nw.nodeAddEventHandler,
DeleteFunc: nw.nodeDeleteEventHandler,
UpdateFunc: nw.nodeUpdateEventHandler,
}
nw.clientset = clientset
nw.broadcaster = utils.NewBroadcaster()
lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything())
nw.nodeLister, nw.nodeController = cache.NewIndexerInformer(
lw,
&api.Node{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodewatchStopCh = make(chan struct{})
go nw.nodeController.Run(nodewatchStopCh)
return &nw, nil
}
func StopNodeWatcher() {
nodewatchStopCh <- struct{}{}
}

View File

@ -1,118 +0,0 @@
package watchers
import (
"reflect"
"time"
"github.com/cloudnativelabs/kube-router/utils"
api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
type PodUpdate struct {
Pod *api.Pod
Op Operation
}
var (
PodWatcher *podWatcher
)
type podWatcher struct {
clientset *kubernetes.Clientset
podController cache.Controller
podLister cache.Indexer
broadcaster *utils.Broadcaster
}
type PodUpdatesHandler interface {
OnPodUpdate(podUpdate *PodUpdate)
}
func (pw *podWatcher) podAddEventHandler(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
return
}
pw.broadcaster.Notify(&PodUpdate{Op: ADD, Pod: pod})
}
func (pw *podWatcher) podDeleteEventHandler(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
return
}
pw.broadcaster.Notify(&PodUpdate{Op: REMOVE, Pod: pod})
}
func (pw *podWatcher) podAUpdateEventHandler(oldObj, newObj interface{}) {
pod, ok := newObj.(*api.Pod)
if !ok {
return
}
if !reflect.DeepEqual(newObj, oldObj) {
pw.broadcaster.Notify(&PodUpdate{Op: UPDATE, Pod: pod})
}
}
func (pw *podWatcher) RegisterHandler(handler PodUpdatesHandler) {
pw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) {
handler.OnPodUpdate(instance.(*PodUpdate))
}))
}
func (pw *podWatcher) List() []*api.Pod {
objList := pw.podLister.List()
podInstances := make([]*api.Pod, len(objList))
for i, ins := range objList {
podInstances[i] = ins.(*api.Pod)
}
return podInstances
}
func (pw *podWatcher) ListByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) {
podLister := listers.NewPodLister(pw.podLister)
allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector())
if err != nil {
return nil, err
}
return allMatchedNameSpacePods, nil
}
func (pw *podWatcher) HasSynced() bool {
return pw.podController.HasSynced()
}
var podwatchStopCh chan struct{}
func StartPodWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*podWatcher, error) {
pw := podWatcher{}
PodWatcher = &pw
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: pw.podAddEventHandler,
DeleteFunc: pw.podDeleteEventHandler,
UpdateFunc: pw.podAUpdateEventHandler,
}
pw.clientset = clientset
pw.broadcaster = utils.NewBroadcaster()
lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.Everything())
pw.podLister, pw.podController = cache.NewIndexerInformer(
lw,
&api.Pod{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podwatchStopCh = make(chan struct{})
go pw.podController.Run(podwatchStopCh)
return &pw, nil
}
func StopPodWatcher() {
podwatchStopCh <- struct{}{}
}

View File

@ -1,118 +0,0 @@
package watchers
import (
"reflect"
"time"
"github.com/cloudnativelabs/kube-router/utils"
api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type ServiceUpdate struct {
Service *api.Service
Op Operation
}
var (
ServiceWatcher *serviceWatcher
)
type serviceWatcher struct {
clientset kubernetes.Interface
serviceController cache.Controller
serviceLister cache.Indexer
broadcaster *utils.Broadcaster
}
type ServiceUpdatesHandler interface {
OnServiceUpdate(serviceUpdate *ServiceUpdate)
}
func (svcw *serviceWatcher) serviceAddEventHandler(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
return
}
svcw.broadcaster.Notify(&ServiceUpdate{Op: ADD, Service: service})
}
func (svcw *serviceWatcher) serviceDeleteEventHandler(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
return
}
svcw.broadcaster.Notify(&ServiceUpdate{Op: REMOVE, Service: service})
}
func (svcw *serviceWatcher) serviceAUpdateEventHandler(oldObj, newObj interface{}) {
service, ok := newObj.(*api.Service)
if !ok {
return
}
if !reflect.DeepEqual(newObj, oldObj) {
svcw.broadcaster.Notify(&ServiceUpdate{Op: UPDATE, Service: service})
}
}
func (svcw *serviceWatcher) RegisterHandler(handler ServiceUpdatesHandler) {
svcw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) {
handler.OnServiceUpdate(instance.(*ServiceUpdate))
}))
}
func (svcw *serviceWatcher) List() []*api.Service {
objList := svcw.serviceLister.List()
svcInstances := make([]*api.Service, len(objList))
for i, ins := range objList {
svcInstances[i] = ins.(*api.Service)
}
return svcInstances
}
func (svcw *serviceWatcher) HasSynced() bool {
return svcw.serviceController.HasSynced()
}
var servicesStopCh chan struct{}
// StartServiceWatcher: start watching updates for services from Kuberentes API server
func StartServiceWatcher(clientset kubernetes.Interface, resyncPeriod time.Duration) (*serviceWatcher, error) {
svcw := serviceWatcher{}
ServiceWatcher = &svcw
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: svcw.serviceAddEventHandler,
DeleteFunc: svcw.serviceDeleteEventHandler,
UpdateFunc: svcw.serviceAUpdateEventHandler,
}
svcw.clientset = clientset
svcw.broadcaster = utils.NewBroadcaster()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Services(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Services(metav1.NamespaceAll).Watch(options)
},
}
svcw.serviceLister, svcw.serviceController = cache.NewIndexerInformer(
lw,
&api.Service{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
servicesStopCh = make(chan struct{})
go svcw.serviceController.Run(servicesStopCh)
return &svcw, nil
}
func StopServiceWatcher() {
servicesStopCh <- struct{}{}
}

View File

@ -2,5 +2,8 @@
set -o errexit
set -o pipefail
echo "install moq"
go get github.com/matryer/moq
echo "Running tests on Travis"
make test