From cb661f871ca0076e700fd9d2d1473bc7d3134f5c Mon Sep 17 00:00:00 2001 From: bzub Date: Wed, 5 Jul 2017 21:57:29 -0500 Subject: [PATCH] controller: - Replace panics with errors - Add context to errors for debugging - Refactor init() code so ipset isn't required to run "kube-router --help" for example --- app/controllers/network_policy_controller.go | 52 ++++++++++-------- app/controllers/network_routes_controller.go | 55 ++++++++++--------- .../network_services_controller.go | 36 ++++++------ 3 files changed, 78 insertions(+), 65 deletions(-) diff --git a/app/controllers/network_policy_controller.go b/app/controllers/network_policy_controller.go index f129ebd3..1fd14ba5 100644 --- a/app/controllers/network_policy_controller.go +++ b/app/controllers/network_policy_controller.go @@ -92,7 +92,10 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { glog.Infof("Performing periodic syn of the iptables to reflect network policies") - npc.Sync() + err := npc.Sync() + if err != nil { + glog.Errorf("Error during periodic sync: ", err) + } } else { continue } @@ -109,7 +112,10 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) { glog.Infof("Received pod update namspace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name) if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { - npc.Sync() + err := npc.Sync() + if err != nil { + glog.Errorf("Error syncing on pod update: ", err) + } } else { glog.Infof("Received pod update, but controller not in sync") } @@ -118,7 +124,10 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) { func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) { glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name) if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { - npc.Sync() + err := npc.Sync() + if err != nil { + glog.Errorf("Error syncing on network policy update: ", err) + } } else { glog.Infof("Received network policy update, but controller not in sync") } @@ -127,19 +136,27 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) { glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name) if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { - npc.Sync() + err := npc.Sync() + if err != nil { + glog.Errorf("Error syncing on namespace update: ", err) + } } else { glog.Infof("Received namspace update, but controller not in sync") } } // Sync synchronizes iptables to desired state of network policies -func (npc *NetworkPolicyController) Sync() { +func (npc *NetworkPolicyController) Sync() error { var err error npc.mu.Lock() defer npc.mu.Unlock() + _, err = exec.LookPath("ipset") + if err != nil { + return errors.New("Ensure ipset package is installed: " + err.Error()) + } + start := time.Now() defer func() { glog.Infof("sync iptables took %v", time.Since(start)) @@ -147,27 +164,25 @@ func (npc *NetworkPolicyController) Sync() { npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo() if err != nil { - glog.Errorf("Aborting sync. Failed to build network policies: %s", err.Error()) - return + return errors.New("Aborting sync. Failed to build network policies: %s" + err.Error()) } activePolicyChains, err := npc.syncNetworkPolicyChains() if err != nil { - glog.Errorf("Aborting sync. Failed to sync network policy chains: %s", err.Error()) - return + return errors.New("Aborting sync. Failed to sync network policy chains: %s" + err.Error()) } activePodFwChains, err := npc.syncPodFirewallChains() if err != nil { - glog.Errorf("Aborting sync. Failed to sync pod firewalls: %s", err.Error()) - return + return errors.New("Aborting sync. Failed to sync pod firewalls: %s" + err.Error()) } err = cleanupStaleRules(activePolicyChains, activePodFwChains) if err != nil { - glog.Errorf("Aborting sync. Failed to cleanup stale iptable rules: %s", err.Error()) - return + return errors.New("Aborting sync. Failed to cleanup stale iptable rules: %s" + err.Error()) } + + return nil } // Configure iptable rules representing each network policy. All pod's matched by @@ -763,13 +778,6 @@ func (npc *NetworkPolicyController) Cleanup() { glog.Infof("Successfully cleaned the iptables configuration done by kube-router") } -func init() { - _, err := exec.LookPath("ipset") - if err != nil { - panic("ipset command not found ensure ipset package is installed") - } -} - func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkPolicyController, error) { npc := NetworkPolicyController{} @@ -778,14 +786,14 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options node, err := utils.GetNodeObject(clientset, config.HostnameOverride) if err != nil { - panic(err.Error()) + return nil, err } npc.nodeHostName = node.Name nodeIP, err := getNodeIP(node) if err != nil { - panic(err.Error()) + return nil, err } npc.nodeIP = nodeIP diff --git a/app/controllers/network_routes_controller.go b/app/controllers/network_routes_controller.go index c5f6e9b7..d0b1fa3f 100644 --- a/app/controllers/network_routes_controller.go +++ b/app/controllers/network_routes_controller.go @@ -1,6 +1,7 @@ package controllers import ( + "errors" "fmt" "net" "strconv" @@ -85,15 +86,17 @@ func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGr // Wait till we are ready to launch BGP server for { - ok := nrc.startBgpServer() - if !ok { + err := nrc.startBgpServer() + if err != nil { + glog.Errorf("Failed to start node BGP server: %s", err) select { case <-stopCh: glog.Infof("Shutting down network routes controller") return case <-t.C: + glog.Infof("Retrying start of node BGP server") + continue } - continue } else { break } @@ -348,12 +351,12 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdat } } -func (nrc *NetworkRoutingController) startBgpServer() bool { +func (nrc *NetworkRoutingController) startBgpServer() error { var nodeAsnNumber uint32 node, err := utils.GetNodeObject(nrc.clientset, nrc.hostnameOverride) if err != nil { - panic("Failed to get node object from api server due to " + err.Error()) + return errors.New("Failed to get node object from api server: " + err.Error()) } if nrc.bgpFullMeshMode { @@ -361,14 +364,12 @@ func (nrc *NetworkRoutingController) startBgpServer() bool { } else { nodeasn, ok := node.ObjectMeta.Annotations["net.kuberouter.nodeasn"] if !ok { - glog.Infof("Could not find ASN number for the node. Node need to be annotated with ASN number details to start BGP server.") - return false + return errors.New("Could not find ASN number for the node. Node need to be annotated with ASN number details to start BGP server.") } else { glog.Infof("Found ASN for the node to be %s from the node annotations", nodeasn) asnNo, err := strconv.ParseUint(nodeasn, 0, 32) if err != nil { - glog.Errorf("Failed to parse ASN number specified for the the node") - return false + return errors.New("Failed to parse ASN number specified for the the node") } nodeAsnNumber = uint32(asnNo) } @@ -389,7 +390,7 @@ func (nrc *NetworkRoutingController) startBgpServer() bool { } if err := nrc.bgpServer.Start(global); err != nil { - panic("Failed to start BGP server due to : " + err.Error()) + return errors.New("Failed to start BGP server due to : " + err.Error()) } go nrc.watchBgpUpdates() @@ -405,38 +406,38 @@ func (nrc *NetworkRoutingController) startBgpServer() bool { }, } if err := nrc.bgpServer.AddNeighbor(n); err != nil { - panic("Failed to peer with global peer router due to: " + peer) + return errors.New("Failed to peer with global peer router \"" + peer + "\" due to: " + err.Error()) } } } else { nodeBgpPeerAsn, ok := node.ObjectMeta.Annotations["net.kuberouter.node.bgppeer.asn"] if !ok { glog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.") - return true + return nil } asnNo, err := strconv.ParseUint(nodeBgpPeerAsn, 0, 32) if err != nil { - panic("Failed to parse ASN number specified for the the node in the annotations") + return errors.New("Failed to parse ASN number specified for the the node in the annotations") } peerAsnNo := uint32(asnNo) nodeBgpPeersAnnotation, ok := node.ObjectMeta.Annotations["net.kuberouter.node.bgppeer.address"] if !ok { glog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.") - return true + return nil } nodePeerRouters := make([]string, 0) if strings.Contains(nodeBgpPeersAnnotation, ",") { ips := strings.Split(nodeBgpPeersAnnotation, ",") for _, ip := range ips { if net.ParseIP(ip) == nil { - panic("Invalid node BGP peer router ip in the annotation: " + ip) + return errors.New("Invalid node BGP peer router ip in the annotation: " + ip) } } nodePeerRouters = append(nodePeerRouters, ips...) } else { if net.ParseIP(nodeBgpPeersAnnotation) == nil { - panic("Invalid node BGP peer router ip: " + nodeBgpPeersAnnotation) + return errors.New("Invalid node BGP peer router ip: " + nodeBgpPeersAnnotation) } nodePeerRouters = append(nodePeerRouters, nodeBgpPeersAnnotation) } @@ -449,14 +450,14 @@ func (nrc *NetworkRoutingController) startBgpServer() bool { }, } if err := nrc.bgpServer.AddNeighbor(n); err != nil { - panic("Failed to peer with node specific BGP peer router: " + peer + " due to " + err.Error()) + return errors.New("Failed to peer with node specific BGP peer router: " + peer + " due to " + err.Error()) } } glog.Infof("Successfully configured %s in ASN %v as BGP peer to the node", nodeBgpPeersAnnotation, peerAsnNo) } - return true + return nil } func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) { @@ -471,10 +472,10 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf if len(kubeRouterConfig.ClusterAsn) != 0 { asn, err := strconv.ParseUint(kubeRouterConfig.ClusterAsn, 0, 32) if err != nil { - panic("Invalid cluster ASN: " + err.Error()) + return nil, errors.New("Invalid cluster ASN: " + err.Error()) } if asn > 65534 || asn < 64512 { - panic("Invalid ASN number for cluster ASN") + return nil, errors.New("Invalid ASN number for cluster ASN") } nrc.defaultNodeAsnNumber = uint32(asn) } else { @@ -485,7 +486,7 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf if (len(kubeRouterConfig.PeerRouter) != 0 && len(kubeRouterConfig.PeerAsn) == 0) || (len(kubeRouterConfig.PeerRouter) == 0 && len(kubeRouterConfig.PeerAsn) != 0) { - panic("Either both or none of the params --peer-asn, --peer-router must be specified") + return nil, errors.New("Either both or none of the params --peer-asn, --peer-router must be specified") } if len(kubeRouterConfig.PeerRouter) != 0 && len(kubeRouterConfig.PeerAsn) != 0 { @@ -494,24 +495,24 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf ips := strings.Split(kubeRouterConfig.PeerRouter, ",") for _, ip := range ips { if net.ParseIP(ip) == nil { - panic("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter) + return nil, errors.New("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter) } } nrc.globalPeerRouters = append(nrc.globalPeerRouters, ips...) } else { if net.ParseIP(kubeRouterConfig.PeerRouter) == nil { - panic("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter) + return nil, errors.New("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter) } nrc.globalPeerRouters = append(nrc.globalPeerRouters, kubeRouterConfig.PeerRouter) } asn, err := strconv.ParseUint(kubeRouterConfig.PeerAsn, 0, 32) if err != nil { - panic("Invalid global BGP peer ASN: " + err.Error()) + return nil, errors.New("Invalid global BGP peer ASN: " + err.Error()) } if asn > 65534 { - panic("Invalid ASN number for global BGP peer") + return nil, errors.New("Invalid ASN number for global BGP peer") } nrc.globalPeerAsnNumber = uint32(asn) } @@ -519,14 +520,14 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf nrc.hostnameOverride = kubeRouterConfig.HostnameOverride node, err := utils.GetNodeObject(clientset, nrc.hostnameOverride) if err != nil { - panic(err.Error()) + return nil, errors.New("Failed getting node object from API server: " + err.Error()) } nrc.nodeHostName = node.Name nodeIP, err := getNodeIP(node) if err != nil { - panic(err.Error()) + return nil, errors.New("Failed getting IP address from node object: " + err.Error()) } nrc.nodeIP = nodeIP diff --git a/app/controllers/network_services_controller.go b/app/controllers/network_services_controller.go index 7af1fda5..795b788f 100644 --- a/app/controllers/network_services_controller.go +++ b/app/controllers/network_services_controller.go @@ -74,7 +74,7 @@ type endpointsInfo struct { type endpointsInfoMap map[string][]endpointsInfo // periodically sync ipvs configuration to reflect desired state of services and endpoints -func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) error { t := time.NewTicker(nsc.syncPeriod) defer t.Stop() @@ -85,13 +85,13 @@ func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitG // enable masquerade rule err := ensureMasqueradeIptablesRule(nsc.masqueradeAll, nsc.podCidr) if err != nil { - panic("Failed to do add masqurade rule in POSTROUTING chain of nat table due to: %s" + err.Error()) + return errors.New("Failed to do add masqurade rule in POSTROUTING chain of nat table due to: %s" + err.Error()) } // enable ipvs connection tracking err = ensureIpvsConntrack() if err != nil { - panic("Failed to do sysctl net.ipv4.vs.conntrack=1 due to: %s" + err.Error()) + return errors.New("Failed to do sysctl net.ipv4.vs.conntrack=1 due to: %s" + err.Error()) } // loop forever unitl notified to stop on stopCh @@ -99,7 +99,7 @@ func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitG select { case <-stopCh: glog.Infof("Shutting down network services controller") - return + return nil default: } @@ -113,7 +113,7 @@ func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitG select { case <-stopCh: glog.Infof("Shutting down network services controller") - return + return nil case <-t.C: } } @@ -174,14 +174,17 @@ func (nsc *NetworkServicesController) OnServiceUpdate(serviceUpdate *watchers.Se // sync the ipvs service and server details configured to reflect the desired state of services and endpoint // as learned from services and endpoints information from the api server -func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) { +func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) error { start := time.Now() defer func() { glog.Infof("sync ipvs servers took %v", time.Since(start)) }() - dummyVipInterface := getKubeDummyInterface() + dummyVipInterface, err := getKubeDummyInterface() + if err != nil { + return errors.New("Failed creating dummy interface: " + err.Error()) + } // map of active services and service endpoints activeServiceEndpointMap := make(map[string][]string) @@ -253,7 +256,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf glog.Infof("Cleaning up if any, old ipvs service and servers which are no longer needed") ipvsSvcs, err := h.ListServices() if err != nil { - panic(err) + return errors.New("Failed to list IPVS services: " + err.Error()) } for _, ipvsSvc := range ipvsSvcs { key := generateIpPortId(ipvsSvc.Address.String(), ipvsSvc.Protocol.String(), strconv.Itoa(int(ipvsSvc.Port))) @@ -290,6 +293,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf } } glog.Infof("IPVS servers and services are synced to desired state!!") + return nil } func buildServicesInfo() serviceInfoMap { @@ -394,7 +398,7 @@ func deleteMasqueradeIptablesRule() error { func ipvsAddService(vip net.IP, protocol, port uint16, persistent bool) (*libipvs.Service, error) { svcs, err := h.ListServices() if err != nil { - panic(err) + return nil, err } for _, svc := range svcs { if strings.Compare(vip.String(), svc.Address.String()) == 0 && @@ -455,22 +459,22 @@ func generateIpPortId(ip, protocol, port string) string { return ip + "-" + protocol + "-" + port } -func getKubeDummyInterface() netlink.Link { +func getKubeDummyInterface() (netlink.Link, error) { var dummyVipInterface netlink.Link dummyVipInterface, err := netlink.LinkByName(KUBE_DUMMY_IF) if err != nil && err.Error() == IFACE_NOT_FOUND { glog.Infof("Could not find dummy interface: " + KUBE_DUMMY_IF + " to assign cluster ip's, so creating one") err = netlink.LinkAdd(&netlink.Dummy{netlink.LinkAttrs{Name: KUBE_DUMMY_IF}}) if err != nil { - panic("Failed to add dummy interface: " + err.Error()) + return nil, errors.New("Failed to add dummy interface: " + err.Error()) } dummyVipInterface, err = netlink.LinkByName(KUBE_DUMMY_IF) err = netlink.LinkSetUp(dummyVipInterface) if err != nil { - panic("Failed to bring dummy interface up: " + err.Error()) + return nil, errors.New("Failed to bring dummy interface up: " + err.Error()) } } - return dummyVipInterface + return dummyVipInterface, nil } // clean up all the configurations (IPVS, iptables, links) @@ -511,7 +515,7 @@ func NewNetworkServicesController(clientset *kubernetes.Clientset, config *optio handle, err := libipvs.New() if err != nil { - panic(err) + return nil, err } h = handle @@ -537,13 +541,13 @@ func NewNetworkServicesController(clientset *kubernetes.Clientset, config *optio node, err := utils.GetNodeObject(clientset, config.HostnameOverride) if err != nil { - panic(err.Error()) + return nil, err } nsc.nodeHostName = node.Name nodeIP, err := getNodeIP(node) if err != nil { - panic(err.Error()) + return nil, err } nsc.nodeIP = nodeIP