controller: - Replace panics with errors

- Add context to errors for debugging
            - Refactor init() code so ipset isn't required to run
              "kube-router --help" for example
This commit is contained in:
bzub 2017-07-05 21:57:29 -05:00
parent a757ea3203
commit cb661f871c
3 changed files with 78 additions and 65 deletions

View File

@ -92,7 +92,10 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
glog.Infof("Performing periodic syn of the iptables to reflect network policies") glog.Infof("Performing periodic syn of the iptables to reflect network policies")
npc.Sync() err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: ", err)
}
} else { } else {
continue continue
} }
@ -109,7 +112,10 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro
func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) { func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
glog.Infof("Received pod update namspace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name) glog.Infof("Received pod update namspace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
npc.Sync() err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on pod update: ", err)
}
} else { } else {
glog.Infof("Received pod update, but controller not in sync") glog.Infof("Received pod update, but controller not in sync")
} }
@ -118,7 +124,10 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) { func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name) glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
npc.Sync() err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on network policy update: ", err)
}
} else { } else {
glog.Infof("Received network policy update, but controller not in sync") glog.Infof("Received network policy update, but controller not in sync")
} }
@ -127,19 +136,27 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w
func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) { func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {
glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name) glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() { if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
npc.Sync() err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on namespace update: ", err)
}
} else { } else {
glog.Infof("Received namspace update, but controller not in sync") glog.Infof("Received namspace update, but controller not in sync")
} }
} }
// Sync synchronizes iptables to desired state of network policies // Sync synchronizes iptables to desired state of network policies
func (npc *NetworkPolicyController) Sync() { func (npc *NetworkPolicyController) Sync() error {
var err error var err error
npc.mu.Lock() npc.mu.Lock()
defer npc.mu.Unlock() defer npc.mu.Unlock()
_, err = exec.LookPath("ipset")
if err != nil {
return errors.New("Ensure ipset package is installed: " + err.Error())
}
start := time.Now() start := time.Now()
defer func() { defer func() {
glog.Infof("sync iptables took %v", time.Since(start)) glog.Infof("sync iptables took %v", time.Since(start))
@ -147,27 +164,25 @@ func (npc *NetworkPolicyController) Sync() {
npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo() npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to build network policies: %s", err.Error()) return errors.New("Aborting sync. Failed to build network policies: %s" + err.Error())
return
} }
activePolicyChains, err := npc.syncNetworkPolicyChains() activePolicyChains, err := npc.syncNetworkPolicyChains()
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to sync network policy chains: %s", err.Error()) return errors.New("Aborting sync. Failed to sync network policy chains: %s" + err.Error())
return
} }
activePodFwChains, err := npc.syncPodFirewallChains() activePodFwChains, err := npc.syncPodFirewallChains()
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to sync pod firewalls: %s", err.Error()) return errors.New("Aborting sync. Failed to sync pod firewalls: %s" + err.Error())
return
} }
err = cleanupStaleRules(activePolicyChains, activePodFwChains) err = cleanupStaleRules(activePolicyChains, activePodFwChains)
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to cleanup stale iptable rules: %s", err.Error()) return errors.New("Aborting sync. Failed to cleanup stale iptable rules: %s" + err.Error())
return
} }
return nil
} }
// Configure iptable rules representing each network policy. All pod's matched by // Configure iptable rules representing each network policy. All pod's matched by
@ -763,13 +778,6 @@ func (npc *NetworkPolicyController) Cleanup() {
glog.Infof("Successfully cleaned the iptables configuration done by kube-router") glog.Infof("Successfully cleaned the iptables configuration done by kube-router")
} }
func init() {
_, err := exec.LookPath("ipset")
if err != nil {
panic("ipset command not found ensure ipset package is installed")
}
}
func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkPolicyController, error) { func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{} npc := NetworkPolicyController{}
@ -778,14 +786,14 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options
node, err := utils.GetNodeObject(clientset, config.HostnameOverride) node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
if err != nil { if err != nil {
panic(err.Error()) return nil, err
} }
npc.nodeHostName = node.Name npc.nodeHostName = node.Name
nodeIP, err := getNodeIP(node) nodeIP, err := getNodeIP(node)
if err != nil { if err != nil {
panic(err.Error()) return nil, err
} }
npc.nodeIP = nodeIP npc.nodeIP = nodeIP

View File

@ -1,6 +1,7 @@
package controllers package controllers
import ( import (
"errors"
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
@ -85,15 +86,17 @@ func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGr
// Wait till we are ready to launch BGP server // Wait till we are ready to launch BGP server
for { for {
ok := nrc.startBgpServer() err := nrc.startBgpServer()
if !ok { if err != nil {
glog.Errorf("Failed to start node BGP server: %s", err)
select { select {
case <-stopCh: case <-stopCh:
glog.Infof("Shutting down network routes controller") glog.Infof("Shutting down network routes controller")
return return
case <-t.C: case <-t.C:
glog.Infof("Retrying start of node BGP server")
continue
} }
continue
} else { } else {
break break
} }
@ -348,12 +351,12 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdat
} }
} }
func (nrc *NetworkRoutingController) startBgpServer() bool { func (nrc *NetworkRoutingController) startBgpServer() error {
var nodeAsnNumber uint32 var nodeAsnNumber uint32
node, err := utils.GetNodeObject(nrc.clientset, nrc.hostnameOverride) node, err := utils.GetNodeObject(nrc.clientset, nrc.hostnameOverride)
if err != nil { if err != nil {
panic("Failed to get node object from api server due to " + err.Error()) return errors.New("Failed to get node object from api server: " + err.Error())
} }
if nrc.bgpFullMeshMode { if nrc.bgpFullMeshMode {
@ -361,14 +364,12 @@ func (nrc *NetworkRoutingController) startBgpServer() bool {
} else { } else {
nodeasn, ok := node.ObjectMeta.Annotations["net.kuberouter.nodeasn"] nodeasn, ok := node.ObjectMeta.Annotations["net.kuberouter.nodeasn"]
if !ok { if !ok {
glog.Infof("Could not find ASN number for the node. Node need to be annotated with ASN number details to start BGP server.") return errors.New("Could not find ASN number for the node. Node need to be annotated with ASN number details to start BGP server.")
return false
} else { } else {
glog.Infof("Found ASN for the node to be %s from the node annotations", nodeasn) glog.Infof("Found ASN for the node to be %s from the node annotations", nodeasn)
asnNo, err := strconv.ParseUint(nodeasn, 0, 32) asnNo, err := strconv.ParseUint(nodeasn, 0, 32)
if err != nil { if err != nil {
glog.Errorf("Failed to parse ASN number specified for the the node") return errors.New("Failed to parse ASN number specified for the the node")
return false
} }
nodeAsnNumber = uint32(asnNo) nodeAsnNumber = uint32(asnNo)
} }
@ -389,7 +390,7 @@ func (nrc *NetworkRoutingController) startBgpServer() bool {
} }
if err := nrc.bgpServer.Start(global); err != nil { if err := nrc.bgpServer.Start(global); err != nil {
panic("Failed to start BGP server due to : " + err.Error()) return errors.New("Failed to start BGP server due to : " + err.Error())
} }
go nrc.watchBgpUpdates() go nrc.watchBgpUpdates()
@ -405,38 +406,38 @@ func (nrc *NetworkRoutingController) startBgpServer() bool {
}, },
} }
if err := nrc.bgpServer.AddNeighbor(n); err != nil { if err := nrc.bgpServer.AddNeighbor(n); err != nil {
panic("Failed to peer with global peer router due to: " + peer) return errors.New("Failed to peer with global peer router \"" + peer + "\" due to: " + err.Error())
} }
} }
} else { } else {
nodeBgpPeerAsn, ok := node.ObjectMeta.Annotations["net.kuberouter.node.bgppeer.asn"] nodeBgpPeerAsn, ok := node.ObjectMeta.Annotations["net.kuberouter.node.bgppeer.asn"]
if !ok { if !ok {
glog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.") glog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.")
return true return nil
} }
asnNo, err := strconv.ParseUint(nodeBgpPeerAsn, 0, 32) asnNo, err := strconv.ParseUint(nodeBgpPeerAsn, 0, 32)
if err != nil { if err != nil {
panic("Failed to parse ASN number specified for the the node in the annotations") return errors.New("Failed to parse ASN number specified for the the node in the annotations")
} }
peerAsnNo := uint32(asnNo) peerAsnNo := uint32(asnNo)
nodeBgpPeersAnnotation, ok := node.ObjectMeta.Annotations["net.kuberouter.node.bgppeer.address"] nodeBgpPeersAnnotation, ok := node.ObjectMeta.Annotations["net.kuberouter.node.bgppeer.address"]
if !ok { if !ok {
glog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.") glog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.")
return true return nil
} }
nodePeerRouters := make([]string, 0) nodePeerRouters := make([]string, 0)
if strings.Contains(nodeBgpPeersAnnotation, ",") { if strings.Contains(nodeBgpPeersAnnotation, ",") {
ips := strings.Split(nodeBgpPeersAnnotation, ",") ips := strings.Split(nodeBgpPeersAnnotation, ",")
for _, ip := range ips { for _, ip := range ips {
if net.ParseIP(ip) == nil { if net.ParseIP(ip) == nil {
panic("Invalid node BGP peer router ip in the annotation: " + ip) return errors.New("Invalid node BGP peer router ip in the annotation: " + ip)
} }
} }
nodePeerRouters = append(nodePeerRouters, ips...) nodePeerRouters = append(nodePeerRouters, ips...)
} else { } else {
if net.ParseIP(nodeBgpPeersAnnotation) == nil { if net.ParseIP(nodeBgpPeersAnnotation) == nil {
panic("Invalid node BGP peer router ip: " + nodeBgpPeersAnnotation) return errors.New("Invalid node BGP peer router ip: " + nodeBgpPeersAnnotation)
} }
nodePeerRouters = append(nodePeerRouters, nodeBgpPeersAnnotation) nodePeerRouters = append(nodePeerRouters, nodeBgpPeersAnnotation)
} }
@ -449,14 +450,14 @@ func (nrc *NetworkRoutingController) startBgpServer() bool {
}, },
} }
if err := nrc.bgpServer.AddNeighbor(n); err != nil { if err := nrc.bgpServer.AddNeighbor(n); err != nil {
panic("Failed to peer with node specific BGP peer router: " + peer + " due to " + err.Error()) return errors.New("Failed to peer with node specific BGP peer router: " + peer + " due to " + err.Error())
} }
} }
glog.Infof("Successfully configured %s in ASN %v as BGP peer to the node", nodeBgpPeersAnnotation, peerAsnNo) glog.Infof("Successfully configured %s in ASN %v as BGP peer to the node", nodeBgpPeersAnnotation, peerAsnNo)
} }
return true return nil
} }
func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) { func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) {
@ -471,10 +472,10 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf
if len(kubeRouterConfig.ClusterAsn) != 0 { if len(kubeRouterConfig.ClusterAsn) != 0 {
asn, err := strconv.ParseUint(kubeRouterConfig.ClusterAsn, 0, 32) asn, err := strconv.ParseUint(kubeRouterConfig.ClusterAsn, 0, 32)
if err != nil { if err != nil {
panic("Invalid cluster ASN: " + err.Error()) return nil, errors.New("Invalid cluster ASN: " + err.Error())
} }
if asn > 65534 || asn < 64512 { if asn > 65534 || asn < 64512 {
panic("Invalid ASN number for cluster ASN") return nil, errors.New("Invalid ASN number for cluster ASN")
} }
nrc.defaultNodeAsnNumber = uint32(asn) nrc.defaultNodeAsnNumber = uint32(asn)
} else { } else {
@ -485,7 +486,7 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf
if (len(kubeRouterConfig.PeerRouter) != 0 && len(kubeRouterConfig.PeerAsn) == 0) || if (len(kubeRouterConfig.PeerRouter) != 0 && len(kubeRouterConfig.PeerAsn) == 0) ||
(len(kubeRouterConfig.PeerRouter) == 0 && len(kubeRouterConfig.PeerAsn) != 0) { (len(kubeRouterConfig.PeerRouter) == 0 && len(kubeRouterConfig.PeerAsn) != 0) {
panic("Either both or none of the params --peer-asn, --peer-router must be specified") return nil, errors.New("Either both or none of the params --peer-asn, --peer-router must be specified")
} }
if len(kubeRouterConfig.PeerRouter) != 0 && len(kubeRouterConfig.PeerAsn) != 0 { if len(kubeRouterConfig.PeerRouter) != 0 && len(kubeRouterConfig.PeerAsn) != 0 {
@ -494,24 +495,24 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf
ips := strings.Split(kubeRouterConfig.PeerRouter, ",") ips := strings.Split(kubeRouterConfig.PeerRouter, ",")
for _, ip := range ips { for _, ip := range ips {
if net.ParseIP(ip) == nil { if net.ParseIP(ip) == nil {
panic("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter) return nil, errors.New("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter)
} }
} }
nrc.globalPeerRouters = append(nrc.globalPeerRouters, ips...) nrc.globalPeerRouters = append(nrc.globalPeerRouters, ips...)
} else { } else {
if net.ParseIP(kubeRouterConfig.PeerRouter) == nil { if net.ParseIP(kubeRouterConfig.PeerRouter) == nil {
panic("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter) return nil, errors.New("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter)
} }
nrc.globalPeerRouters = append(nrc.globalPeerRouters, kubeRouterConfig.PeerRouter) nrc.globalPeerRouters = append(nrc.globalPeerRouters, kubeRouterConfig.PeerRouter)
} }
asn, err := strconv.ParseUint(kubeRouterConfig.PeerAsn, 0, 32) asn, err := strconv.ParseUint(kubeRouterConfig.PeerAsn, 0, 32)
if err != nil { if err != nil {
panic("Invalid global BGP peer ASN: " + err.Error()) return nil, errors.New("Invalid global BGP peer ASN: " + err.Error())
} }
if asn > 65534 { if asn > 65534 {
panic("Invalid ASN number for global BGP peer") return nil, errors.New("Invalid ASN number for global BGP peer")
} }
nrc.globalPeerAsnNumber = uint32(asn) nrc.globalPeerAsnNumber = uint32(asn)
} }
@ -519,14 +520,14 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf
nrc.hostnameOverride = kubeRouterConfig.HostnameOverride nrc.hostnameOverride = kubeRouterConfig.HostnameOverride
node, err := utils.GetNodeObject(clientset, nrc.hostnameOverride) node, err := utils.GetNodeObject(clientset, nrc.hostnameOverride)
if err != nil { if err != nil {
panic(err.Error()) return nil, errors.New("Failed getting node object from API server: " + err.Error())
} }
nrc.nodeHostName = node.Name nrc.nodeHostName = node.Name
nodeIP, err := getNodeIP(node) nodeIP, err := getNodeIP(node)
if err != nil { if err != nil {
panic(err.Error()) return nil, errors.New("Failed getting IP address from node object: " + err.Error())
} }
nrc.nodeIP = nodeIP nrc.nodeIP = nodeIP

View File

@ -74,7 +74,7 @@ type endpointsInfo struct {
type endpointsInfoMap map[string][]endpointsInfo type endpointsInfoMap map[string][]endpointsInfo
// periodically sync ipvs configuration to reflect desired state of services and endpoints // periodically sync ipvs configuration to reflect desired state of services and endpoints
func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) error {
t := time.NewTicker(nsc.syncPeriod) t := time.NewTicker(nsc.syncPeriod)
defer t.Stop() defer t.Stop()
@ -85,13 +85,13 @@ func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitG
// enable masquerade rule // enable masquerade rule
err := ensureMasqueradeIptablesRule(nsc.masqueradeAll, nsc.podCidr) err := ensureMasqueradeIptablesRule(nsc.masqueradeAll, nsc.podCidr)
if err != nil { if err != nil {
panic("Failed to do add masqurade rule in POSTROUTING chain of nat table due to: %s" + err.Error()) return errors.New("Failed to do add masqurade rule in POSTROUTING chain of nat table due to: %s" + err.Error())
} }
// enable ipvs connection tracking // enable ipvs connection tracking
err = ensureIpvsConntrack() err = ensureIpvsConntrack()
if err != nil { if err != nil {
panic("Failed to do sysctl net.ipv4.vs.conntrack=1 due to: %s" + err.Error()) return errors.New("Failed to do sysctl net.ipv4.vs.conntrack=1 due to: %s" + err.Error())
} }
// loop forever unitl notified to stop on stopCh // loop forever unitl notified to stop on stopCh
@ -99,7 +99,7 @@ func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitG
select { select {
case <-stopCh: case <-stopCh:
glog.Infof("Shutting down network services controller") glog.Infof("Shutting down network services controller")
return return nil
default: default:
} }
@ -113,7 +113,7 @@ func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitG
select { select {
case <-stopCh: case <-stopCh:
glog.Infof("Shutting down network services controller") glog.Infof("Shutting down network services controller")
return return nil
case <-t.C: case <-t.C:
} }
} }
@ -174,14 +174,17 @@ func (nsc *NetworkServicesController) OnServiceUpdate(serviceUpdate *watchers.Se
// sync the ipvs service and server details configured to reflect the desired state of services and endpoint // sync the ipvs service and server details configured to reflect the desired state of services and endpoint
// as learned from services and endpoints information from the api server // as learned from services and endpoints information from the api server
func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) { func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
glog.Infof("sync ipvs servers took %v", time.Since(start)) glog.Infof("sync ipvs servers took %v", time.Since(start))
}() }()
dummyVipInterface := getKubeDummyInterface() dummyVipInterface, err := getKubeDummyInterface()
if err != nil {
return errors.New("Failed creating dummy interface: " + err.Error())
}
// map of active services and service endpoints // map of active services and service endpoints
activeServiceEndpointMap := make(map[string][]string) activeServiceEndpointMap := make(map[string][]string)
@ -253,7 +256,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
glog.Infof("Cleaning up if any, old ipvs service and servers which are no longer needed") glog.Infof("Cleaning up if any, old ipvs service and servers which are no longer needed")
ipvsSvcs, err := h.ListServices() ipvsSvcs, err := h.ListServices()
if err != nil { if err != nil {
panic(err) return errors.New("Failed to list IPVS services: " + err.Error())
} }
for _, ipvsSvc := range ipvsSvcs { for _, ipvsSvc := range ipvsSvcs {
key := generateIpPortId(ipvsSvc.Address.String(), ipvsSvc.Protocol.String(), strconv.Itoa(int(ipvsSvc.Port))) key := generateIpPortId(ipvsSvc.Address.String(), ipvsSvc.Protocol.String(), strconv.Itoa(int(ipvsSvc.Port)))
@ -290,6 +293,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
} }
} }
glog.Infof("IPVS servers and services are synced to desired state!!") glog.Infof("IPVS servers and services are synced to desired state!!")
return nil
} }
func buildServicesInfo() serviceInfoMap { func buildServicesInfo() serviceInfoMap {
@ -394,7 +398,7 @@ func deleteMasqueradeIptablesRule() error {
func ipvsAddService(vip net.IP, protocol, port uint16, persistent bool) (*libipvs.Service, error) { func ipvsAddService(vip net.IP, protocol, port uint16, persistent bool) (*libipvs.Service, error) {
svcs, err := h.ListServices() svcs, err := h.ListServices()
if err != nil { if err != nil {
panic(err) return nil, err
} }
for _, svc := range svcs { for _, svc := range svcs {
if strings.Compare(vip.String(), svc.Address.String()) == 0 && if strings.Compare(vip.String(), svc.Address.String()) == 0 &&
@ -455,22 +459,22 @@ func generateIpPortId(ip, protocol, port string) string {
return ip + "-" + protocol + "-" + port return ip + "-" + protocol + "-" + port
} }
func getKubeDummyInterface() netlink.Link { func getKubeDummyInterface() (netlink.Link, error) {
var dummyVipInterface netlink.Link var dummyVipInterface netlink.Link
dummyVipInterface, err := netlink.LinkByName(KUBE_DUMMY_IF) dummyVipInterface, err := netlink.LinkByName(KUBE_DUMMY_IF)
if err != nil && err.Error() == IFACE_NOT_FOUND { if err != nil && err.Error() == IFACE_NOT_FOUND {
glog.Infof("Could not find dummy interface: " + KUBE_DUMMY_IF + " to assign cluster ip's, so creating one") glog.Infof("Could not find dummy interface: " + KUBE_DUMMY_IF + " to assign cluster ip's, so creating one")
err = netlink.LinkAdd(&netlink.Dummy{netlink.LinkAttrs{Name: KUBE_DUMMY_IF}}) err = netlink.LinkAdd(&netlink.Dummy{netlink.LinkAttrs{Name: KUBE_DUMMY_IF}})
if err != nil { if err != nil {
panic("Failed to add dummy interface: " + err.Error()) return nil, errors.New("Failed to add dummy interface: " + err.Error())
} }
dummyVipInterface, err = netlink.LinkByName(KUBE_DUMMY_IF) dummyVipInterface, err = netlink.LinkByName(KUBE_DUMMY_IF)
err = netlink.LinkSetUp(dummyVipInterface) err = netlink.LinkSetUp(dummyVipInterface)
if err != nil { if err != nil {
panic("Failed to bring dummy interface up: " + err.Error()) return nil, errors.New("Failed to bring dummy interface up: " + err.Error())
} }
} }
return dummyVipInterface return dummyVipInterface, nil
} }
// clean up all the configurations (IPVS, iptables, links) // clean up all the configurations (IPVS, iptables, links)
@ -511,7 +515,7 @@ func NewNetworkServicesController(clientset *kubernetes.Clientset, config *optio
handle, err := libipvs.New() handle, err := libipvs.New()
if err != nil { if err != nil {
panic(err) return nil, err
} }
h = handle h = handle
@ -537,13 +541,13 @@ func NewNetworkServicesController(clientset *kubernetes.Clientset, config *optio
node, err := utils.GetNodeObject(clientset, config.HostnameOverride) node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
if err != nil { if err != nil {
panic(err.Error()) return nil, err
} }
nsc.nodeHostName = node.Name nsc.nodeHostName = node.Name
nodeIP, err := getNodeIP(node) nodeIP, err := getNodeIP(node)
if err != nil { if err != nil {
panic(err.Error()) return nil, err
} }
nsc.nodeIP = nodeIP nsc.nodeIP = nodeIP