diff --git a/app/controllers/network_policy_controller.go b/app/controllers/network_policy_controller.go index 1a9a2369..cea4c39b 100644 --- a/app/controllers/network_policy_controller.go +++ b/app/controllers/network_policy_controller.go @@ -36,6 +36,7 @@ import ( // by one or more network policy chains, till there is a match which will accept the packet, or gets // dropped by the rule in the pod chain, if there is no match. +// strcut to hold information required by NetworkPolicyController type NetworkPolicyController struct { nodeIP net.IP nodeHostName string @@ -80,6 +81,7 @@ type protocolAndPort struct { port string } +// Run: runs forver till we recive notification on stopCh func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { t := time.NewTicker(npc.syncPeriod) defer t.Stop() @@ -115,6 +117,7 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro } } +// OnPodUpdate: handles updates to pods from the Kubernetes api server func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) { glog.Infof("Received pod update namspace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name) if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { @@ -127,6 +130,7 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) { } } +// OnNetworkPolicyUpdate: handles updates to network policy from the kubernetes api server func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) { if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { err := npc.Sync() @@ -138,6 +142,7 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w } } +// OnNamespaceUpdate: handles updates to namespace from kubernetes api server func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) { // namespace (and annotations on it) has no significance in GA ver of network policy @@ -511,7 +516,7 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er } // TODO delete rule by spec, than rule number to avoid extra loop - var realRuleNo int = 0 + var realRuleNo int for i, rule := range forwardChainRules { if strings.Contains(rule, chain) { err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo)) @@ -593,8 +598,8 @@ func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[ } if npc.v1NetworkPolicy { podNeedsFirewall := false - for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { - policy, _ := policy_obj.(*networking.NetworkPolicy) + for _, policyObj:= range watchers.NetworkPolicyWatcher.List() { + policy, _ := policyObj.(*networking.NetworkPolicy) // we are only interested in the network policies in same namespace that of pod if policy.Namespace != pod.ObjectMeta.Namespace { @@ -627,11 +632,11 @@ func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[ continue } } else { - default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace) + defaultPolicy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace) if err != nil { return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error()) } - if strings.Compare(default_policy, "DefaultDeny") != 0 { + if strings.Compare(defaultPolicy, "DefaultDeny") != 0 { continue } } @@ -647,9 +652,9 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { NetworkPolicies := make([]networkPolicyInfo, 0) - for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { + for _, policyObj:= range watchers.NetworkPolicyWatcher.List() { - policy, ok := policy_obj.(*networking.NetworkPolicy) + policy, ok := policyObj.(*networking.NetworkPolicy) if !ok { return nil, fmt.Errorf("Failed to convert") } @@ -742,9 +747,9 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { NetworkPolicies := make([]networkPolicyInfo, 0) - for _, policy_obj := range watchers.NetworkPolicyWatcher.List() { + for _, policyObj:= range watchers.NetworkPolicyWatcher.List() { - policy, _ := policy_obj.(*apiextensions.NetworkPolicy) + policy, _ := policyObj.(*apiextensions.NetworkPolicy) newPolicy := networkPolicyInfo{ name: policy.Name, namespace: policy.Namespace, @@ -852,6 +857,7 @@ func getNodeIP(node *apiv1.Node) (net.IP, error) { return nil, errors.New("host IP unknown") } +// Cleanup: cleanup configurations done func (npc *NetworkPolicyController) Cleanup() { glog.Infof("Cleaning up iptables configuration permanently done by kube-router") @@ -869,7 +875,7 @@ func (npc *NetworkPolicyController) Cleanup() { } // TODO: need a better way to delte rule with out using number - var realRuleNo int = 0 + var realRuleNo int for i, rule := range forwardChainRules { if strings.Contains(rule, "KUBE-POD-FW-") { err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo)) diff --git a/app/controllers/network_routes_controller.go b/app/controllers/network_routes_controller.go index c1f9a905..e1d9a625 100644 --- a/app/controllers/network_routes_controller.go +++ b/app/controllers/network_routes_controller.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/kubernetes" ) +// struct to hold necessary information required by controller type NetworkRoutingController struct { nodeIP net.IP nodeHostName string @@ -67,6 +68,7 @@ const ( podSubnetIpSetName = "kube-router-pod-subnets" ) +// Run: run forever till until we are notified on stop channel func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { cidr, err := utils.GetPodCidrFromCniSpec("/etc/cni/net.d/10-kuberouter.conf") if err != nil { @@ -308,6 +310,7 @@ func (nrc *NetworkRoutingController) getClusterIps() ([]string, error) { return clusterIpList, nil } +// AdvertiseClusterIp: advertises the service cluster ip the configured peers func (nrc *NetworkRoutingController) AdvertiseClusterIp(clusterIp string) error { attrs := []bgp.PathAttributeInterface{ @@ -520,6 +523,7 @@ func (nrc *NetworkRoutingController) injectRoute(path *table.Path) error { return netlink.RouteReplace(route) } +// Cleanup: performs the cleanup of configurations done func (nrc *NetworkRoutingController) Cleanup() { err := deletePodEgressRule() if err != nil { @@ -793,7 +797,7 @@ func (nrc *NetworkRoutingController) enablePolicyBasedRouting() error { return nil } -// Handle updates from Node watcher. Node watcher calls this method whenever there is +// OnNodeUpdate: Handle updates from Node watcher. Node watcher calls this method whenever there is // new node is added or old node is deleted. So peer up with new node and drop peering // from old node func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdate) { diff --git a/app/controllers/network_services_controller.go b/app/controllers/network_services_controller.go index 087ca910..c0d5bef8 100644 --- a/app/controllers/network_services_controller.go +++ b/app/controllers/network_services_controller.go @@ -27,6 +27,7 @@ import ( ) const ( + // name of the dummy interface to which cluster ip are assigned KUBE_DUMMY_IF = "kube-dummy-if" IFACE_NOT_FOUND = "Link not found" IFACE_HAS_ADDR = "file exists" @@ -58,13 +59,14 @@ var ( }, []string{"namespace", "service_name", "backend"}) ) -// Network services controller enables local node as network service proxy through IPVS/LVS. +// NetworkServicesController enables local node as network service proxy through IPVS/LVS. // Support only Kubernetes network services of type NodePort, ClusterIP, and LoadBalancer. For each service a // IPVS service is created and for each service endpoint a server is added to the IPVS service. // As services and endpoints are updated, network service controller gets the updates from // the kubernetes api server and syncs the ipvs configuration to reflect state of services // and endpoints +// struct for storing information needed by the controller type NetworkServicesController struct { nodeIP net.IP nodeHostName string @@ -102,7 +104,7 @@ type endpointsInfo struct { // map of all endpoints, with unique service id(namespace name, service name, port) as key type endpointsInfoMap map[string][]endpointsInfo -// periodically sync ipvs configuration to reflect desired state of services and endpoints +// Run: periodically sync ipvs configuration to reflect desired state of services and endpoints func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) error { t := time.NewTicker(nsc.syncPeriod) @@ -170,7 +172,7 @@ func (nsc *NetworkServicesController) sync() { nsc.publishMetrics(nsc.serviceMap) } -// handle change in endpoints update from the API server +// OnEndpointsUpdate: handle change in endpoints update from the API server func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watchers.EndpointsUpdate) { nsc.mu.Lock() @@ -192,7 +194,7 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watcher } } -// handle change in service update from the API server +// OnServiceUpdate: handle change in service update from the API server func (nsc *NetworkServicesController) OnServiceUpdate(serviceUpdate *watchers.ServiceUpdate) { nsc.mu.Lock() @@ -241,7 +243,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf } // assign cluster IP of the service to the dummy interface so that its routable from the pod's on the node - vip := &netlink.Addr{IPNet: &net.IPNet{svc.clusterIP, net.IPv4Mask(255, 255, 255, 255)}, Scope: syscall.RT_SCOPE_LINK} + vip := &netlink.Addr{IPNet: &net.IPNet{IP: svc.clusterIP, Mask: net.IPv4Mask(255, 255, 255, 255)}, Scope: syscall.RT_SCOPE_LINK} err := netlink.AddrAdd(dummyVipInterface, vip) if err != nil && err.Error() != IFACE_HAS_ADDR { glog.Errorf("Failed to assign cluster ip to dummy interface %s", err) @@ -249,7 +251,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf } // create IPVS service for the service to be exposed through the cluster ip - ipvs_cluster_vip_svc, err := ipvsAddService(svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity) + ipvsClusterVipSvc, err := ipvsAddService(svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity) if err != nil { glog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error()) continue @@ -258,10 +260,10 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf activeServiceEndpointMap[clusterServiceId] = make([]string, 0) // create IPVS service for the service to be exposed through the nodeport - var ipvs_nodeport_svc *ipvs.Service + var ipvsNodeportSvc*ipvs.Service var nodeServiceId string if svc.nodePort != 0 { - ipvs_nodeport_svc, err = ipvsAddService(nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity) + ipvsNodeportSvc, err = ipvsAddService(nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity) if err != nil { glog.Errorf("Failed to create ipvs service for node port") continue @@ -280,7 +282,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf Weight: 1, } - err := ipvsAddServer(ipvs_cluster_vip_svc, &dst) + err := ipvsAddServer(ipvsClusterVipSvc, &dst) if err != nil { glog.Errorf(err.Error()) } @@ -289,7 +291,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf append(activeServiceEndpointMap[clusterServiceId], endpoint.ip) if svc.nodePort != 0 { - err := ipvsAddServer(ipvs_nodeport_svc, &dst) + err := ipvsAddServer(ipvsNodeportSvc, &dst) if err != nil { glog.Errorf(err.Error()) } @@ -434,11 +436,11 @@ func shuffle(endPoints []endpointsInfo) []endpointsInfo { func buildEndpointsInfo() endpointsInfoMap { endpointsMap := make(endpointsInfoMap) for _, ep := range watchers.EndpointsWatcher.List() { - for _, ep_subset := range ep.Subsets { - for _, port := range ep_subset.Ports { + for _, epSubset:= range ep.Subsets { + for _, port := range epSubset.Ports { svcId := generateServiceId(ep.Namespace, ep.Name, port.Name) endpoints := make([]endpointsInfo, 0) - for _, addr := range ep_subset.Addresses { + for _, addr := range epSubset.Addresses { endpoints = append(endpoints, endpointsInfo{ip: addr.IP, port: int(port.Port)}) } endpointsMap[svcId] = shuffle(endpoints) @@ -726,7 +728,7 @@ func ipvsAddService(vip net.IP, protocol, port uint16, persistent bool) (*ipvs.S svc.Timeout = 180 * 60 } if err := h.NewService(&svc); err != nil { - return nil, fmt.Errorf("Failed to create service: %s:%s:%s", vip.String(), protocol, strconv.Itoa(int(port))) + return nil, fmt.Errorf("Failed to create service: %s:%s:%s", vip.String(), strconv.Itoa(int(protocol)), strconv.Itoa(int(port))) } glog.Infof("Successfully added service: %s:%s:%s", vip.String(), protocol, strconv.Itoa(int(port))) return &svc, nil @@ -743,10 +745,10 @@ func ipvsAddServer(service *ipvs.Service, dest *ipvs.Destination) error { if strings.Contains(err.Error(), IPVS_SERVER_EXISTS) { glog.Infof("ipvs destination %s:%s already exists in the ipvs service %s:%s:%s so not adding destination", dest.Address, - strconv.Itoa(int(dest.Port)), service.Address, service.Protocol, strconv.Itoa(int(service.Port))) + strconv.Itoa(int(dest.Port)), service.Address, strconv.Itoa(int(service.Protocol)), strconv.Itoa(int(service.Port))) } else { return fmt.Errorf("Failed to add ipvs destination %s:%s to the ipvs service %s:%s:%s due to : %s", dest.Address, - strconv.Itoa(int(dest.Port)), service.Address, service.Protocol, strconv.Itoa(int(service.Port)), err.Error()) + strconv.Itoa(int(dest.Port)), service.Address, strconv.Itoa(int(service.Protocol)), strconv.Itoa(int(service.Port)), err.Error()) } return nil } @@ -779,7 +781,7 @@ func getKubeDummyInterface() (netlink.Link, error) { return dummyVipInterface, nil } -// clean up all the configurations (IPVS, iptables, links) +// Cleanup: clean all the configurations (IPVS, iptables, links) done func (nsc *NetworkServicesController) Cleanup() { // cleanup ipvs rules by flush glog.Infof("Cleaning up IPVS configuration permanently") diff --git a/app/watchers/endpoints_watcher.go b/app/watchers/endpoints_watcher.go index 235e72cd..b535a84c 100644 --- a/app/watchers/endpoints_watcher.go +++ b/app/watchers/endpoints_watcher.go @@ -76,12 +76,12 @@ func (ew *endpointsWatcher) RegisterHandler(handler EndpointsUpdatesHandler) { } func (ew *endpointsWatcher) List() []*api.Endpoints { - obj_list := ew.endpointsLister.List() - ep_instances := make([]*api.Endpoints, len(obj_list)) - for i, ins := range obj_list { - ep_instances[i] = ins.(*api.Endpoints) + objList := ew.endpointsLister.List() + epInstances := make([]*api.Endpoints, len(objList)) + for i, ins := range objList { + epInstances[i] = ins.(*api.Endpoints) } - return ep_instances + return epInstances } func (ew *endpointsWatcher) HasSynced() bool { diff --git a/app/watchers/namespace_watcher.go b/app/watchers/namespace_watcher.go index eff3982b..90d0b559 100644 --- a/app/watchers/namespace_watcher.go +++ b/app/watchers/namespace_watcher.go @@ -7,11 +7,11 @@ import ( "github.com/cloudnativelabs/kube-router/utils" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" api "k8s.io/client-go/pkg/api/v1" cache "k8s.io/client-go/tools/cache" - listers "k8s.io/client-go/listers/core/v1" - "k8s.io/apimachinery/pkg/labels" ) type NamespaceUpdate struct { @@ -62,12 +62,12 @@ func (nsw *namespaceWatcher) namespaceUpdateEventHandler(oldObj, newObj interfac } func (nsw *namespaceWatcher) List() []*api.Namespace { - obj_list := nsw.namespaceLister.List() - namespace_instances := make([]*api.Namespace, len(obj_list)) - for i, ins := range obj_list { - namespace_instances[i] = ins.(*api.Namespace) + objList := nsw.namespaceLister.List() + namespaceInstances := make([]*api.Namespace, len(objList)) + for i, ins := range objList { + namespaceInstances[i] = ins.(*api.Namespace) } - return namespace_instances + return namespaceInstances } func (nsw *namespaceWatcher) ListByLabels(set labels.Set) ([]*api.Namespace, error) { @@ -75,9 +75,8 @@ func (nsw *namespaceWatcher) ListByLabels(set labels.Set) ([]*api.Namespace, err matchedNamespaces, err := namespaceLister.List(set.AsSelector()) if err != nil { return nil, err - } else { - return matchedNamespaces, nil } + return matchedNamespaces, nil } func (nsw *namespaceWatcher) RegisterHandler(handler NamespaceUpdatesHandler) { diff --git a/app/watchers/node_watcher.go b/app/watchers/node_watcher.go index e46ac19b..c3c52d48 100644 --- a/app/watchers/node_watcher.go +++ b/app/watchers/node_watcher.go @@ -59,12 +59,12 @@ func (nw *nodeWatcher) RegisterHandler(handler NodeUpdatesHandler) { } func (nw *nodeWatcher) List() []*api.Node { - obj_list := nw.nodeLister.List() - node_instances := make([]*api.Node, len(obj_list)) - for i, ins := range obj_list { - node_instances[i] = ins.(*api.Node) + objList := nw.nodeLister.List() + nodeInstances := make([]*api.Node, len(objList)) + for i, ins := range objList { + nodeInstances[i] = ins.(*api.Node) } - return node_instances + return nodeInstances } func (nw *nodeWatcher) HasSynced() bool { diff --git a/app/watchers/pods_watcher.go b/app/watchers/pods_watcher.go index 93f57ba8..1db901a9 100644 --- a/app/watchers/pods_watcher.go +++ b/app/watchers/pods_watcher.go @@ -67,12 +67,12 @@ func (pw *podWatcher) RegisterHandler(handler PodUpdatesHandler) { } func (pw *podWatcher) List() []*api.Pod { - obj_list := pw.podLister.List() - pod_instances := make([]*api.Pod, len(obj_list)) - for i, ins := range obj_list { - pod_instances[i] = ins.(*api.Pod) + objList := pw.podLister.List() + podInstances := make([]*api.Pod, len(objList)) + for i, ins := range objList { + podInstances[i] = ins.(*api.Pod) } - return pod_instances + return podInstances } func (pw *podWatcher) ListByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) { diff --git a/app/watchers/services_watcher.go b/app/watchers/services_watcher.go index eb08d836..0df58560 100644 --- a/app/watchers/services_watcher.go +++ b/app/watchers/services_watcher.go @@ -65,12 +65,12 @@ func (svcw *serviceWatcher) RegisterHandler(handler ServiceUpdatesHandler) { } func (svcw *serviceWatcher) List() []*api.Service { - obj_list := svcw.serviceLister.List() - svc_instances := make([]*api.Service, len(obj_list)) - for i, ins := range obj_list { - svc_instances[i] = ins.(*api.Service) + objList := svcw.serviceLister.List() + svcInstances := make([]*api.Service, len(objList)) + for i, ins := range objList{ + svcInstances[i] = ins.(*api.Service) } - return svc_instances + return svcInstances } func (svcw *serviceWatcher) HasSynced() bool { @@ -79,6 +79,7 @@ func (svcw *serviceWatcher) HasSynced() bool { var servicesStopCh chan struct{} +// StartServiceWatcher: start watching updates for services from Kuberentes API server func StartServiceWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*serviceWatcher, error) { svcw := serviceWatcher{}