diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index cb679aea..eef5f32d 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -808,6 +808,20 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *api.Endpoints) { return } + // If the service is headless and the previous version of the service is either non-existent or also headless, + // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't + // need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a + // ClusterIP before. + svc, err := utils.ServiceForEndpoints(&nsc.svcLister, ep) + if err != nil { + glog.Errorf("failed to convert endpoints resource to service: %s", err) + return + } + if utils.ServiceIsHeadless(svc) { + glog.V(1).Infof("The service associated with endpoint: %s/%s is headless, skipping...", ep.Namespace, ep.Name) + return + } + // build new service and endpoints map to reflect the change newServiceMap := nsc.buildServicesInfo() newEndpointsMap := nsc.buildEndpointsInfo() @@ -834,6 +848,15 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *api.Service) { return } + // If the service is headless and the previous version of the service is either non-existent or also headless, + // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't + // need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a + // ClusterIP before. + if utils.ServiceIsHeadless(svc) { + glog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name) + return + } + // build new service and endpoints map to reflect the change newServiceMap := nsc.buildServicesInfo() newEndpointsMap := nsc.buildEndpointsInfo() @@ -1310,7 +1333,7 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { for _, obj := range nsc.svcLister.List() { svc := obj.(*api.Service) - if svc.Spec.ClusterIP == "None" || svc.Spec.ClusterIP == "" { + if utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) { glog.V(2).Infof("Skipping service name:%s namespace:%s as there is no cluster IP", svc.Name, svc.Namespace) continue } diff --git a/pkg/controllers/routing/ecmp_vip.go b/pkg/controllers/routing/ecmp_vip.go index 55e00477..febf51c7 100644 --- a/pkg/controllers/routing/ecmp_vip.go +++ b/pkg/controllers/routing/ecmp_vip.go @@ -6,6 +6,8 @@ import ( "fmt" "strconv" + "github.com/cloudnativelabs/kube-router/pkg/utils" + "strings" "github.com/golang/glog" @@ -168,6 +170,16 @@ func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) { func (nrc *NetworkRoutingController) tryHandleServiceUpdate(obj interface{}, logMsgFormat string) { if svc := getServiceObject(obj); svc != nil { glog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name) + + // If the service is headless and the previous version of the service is either non-existent or also headless, + // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't + // need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a + // ClusterIP before. + if utils.ServiceIsHeadless(obj) { + glog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name) + return + } + nrc.handleServiceUpdate(svc) } } @@ -185,6 +197,14 @@ func (nrc *NetworkRoutingController) tryHandleServiceDelete(obj interface{}, log return } } + glog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name) + + // If the service is headless skip processing as we only work with VIPs in the next section. + if utils.ServiceIsHeadless(obj) { + glog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name) + return + } + nrc.handleServiceDelete(svc) } @@ -197,10 +217,20 @@ func (nrc *NetworkRoutingController) OnServiceCreate(obj interface{}) { func (nrc *NetworkRoutingController) OnServiceUpdate(objNew interface{}, objOld interface{}) { nrc.tryHandleServiceUpdate(objNew, "Received update on service: %s/%s from watch API") - nrc.withdrawVIPs(nrc.getWithdraw(getServiceObject(objOld), getServiceObject(objNew))) + // This extra call needs to be here, because during the update the list of externalIPs may have changed and + // externalIPs is the only service VIP field that is: + // a) mutable after first creation + // b) an array + // + // This means that while we only need to withdraw ClusterIP VIPs and LoadBalancer VIPs on delete, we may need + // to withdraw ExternalIPs on update. + // + // As such, it needs to be handled differently as nrc.handleServiceUpdate only withdraws VIPs if the service + // endpoint is no longer scheduled on this node and its a local type service. + nrc.withdrawVIPs(nrc.getExternalIPsToWithdraw(getServiceObject(objOld), getServiceObject(objNew))) } -func (nrc *NetworkRoutingController) getWithdraw(svcOld, svcNew *v1core.Service) (out []string) { +func (nrc *NetworkRoutingController) getExternalIPsToWithdraw(svcOld, svcNew *v1core.Service) (out []string) { if svcOld != nil && svcNew != nil { out = getMissingPrevGen(nrc.getExternalIPs(svcOld), nrc.getExternalIPs(svcNew)) } @@ -248,11 +278,6 @@ func (nrc *NetworkRoutingController) OnEndpointsAdd(obj interface{}) { return } - err := nrc.AddPolicies() - if err != nil { - glog.Errorf("error adding BGP policies: %s", err) - } - nrc.OnEndpointsUpdate(obj) } @@ -274,7 +299,7 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) { return } - svc, err := nrc.serviceForEndpoints(ep) + svc, err := utils.ServiceForEndpoints(&nrc.svcLister, ep) if err != nil { glog.Errorf("failed to convert endpoints resource to service: %s", err) return @@ -283,30 +308,12 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) { nrc.tryHandleServiceUpdate(svc, "Updating service %s/%s triggered by endpoint update event") } -func (nrc *NetworkRoutingController) serviceForEndpoints(ep *v1core.Endpoints) (interface{}, error) { - key, err := cache.MetaNamespaceKeyFunc(ep) - if err != nil { - return nil, err - } - - item, exists, err := nrc.svcLister.GetByKey(key) - if err != nil { - return nil, err - } - - if !exists { - return nil, fmt.Errorf("service resource doesn't exist for endpoints: %q", ep.Name) - } - - return item, nil -} - func (nrc *NetworkRoutingController) getClusterIP(svc *v1core.Service) string { clusterIP := "" if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" || svc.Spec.Type == "LoadBalancer" { // skip headless services - if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" { + if !utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) { clusterIP = svc.Spec.ClusterIP } } @@ -318,7 +325,7 @@ func (nrc *NetworkRoutingController) getExternalIPs(svc *v1core.Service) []strin if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" || svc.Spec.Type == "LoadBalancer" { // skip headless services - if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" { + if !utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) { externalIPList = append(externalIPList, svc.Spec.ExternalIPs...) } } @@ -329,7 +336,7 @@ func (nrc *NetworkRoutingController) getLoadBalancerIPs(svc *v1core.Service) []s loadBalancerIPList := make([]string, 0) if svc.Spec.Type == "LoadBalancer" { // skip headless services - if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" { + if !utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) { for _, lbIngress := range svc.Status.LoadBalancer.Ingress { if len(lbIngress.IP) > 0 { loadBalancerIPList = append(loadBalancerIPList, lbIngress.IP) diff --git a/pkg/utils/service.go b/pkg/utils/service.go new file mode 100644 index 00000000..53524bd5 --- /dev/null +++ b/pkg/utils/service.go @@ -0,0 +1,60 @@ +package utils + +import ( + "fmt" + "strings" + + v1core "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +func ServiceForEndpoints(ci *cache.Indexer, ep *v1core.Endpoints) (interface{}, error) { + key, err := cache.MetaNamespaceKeyFunc(ep) + if err != nil { + return nil, err + } + + item, exists, err := (*ci).GetByKey(key) + if err != nil { + return nil, err + } + + if !exists { + return nil, fmt.Errorf("service resource doesn't exist for endpoints: %q", ep.Name) + } + + return item, nil +} + +// ServiceIsHeadless decides whether or not the this service is a headless service which is often useful to kube-router +// as there is no need to execute logic on most headless changes. Function takes a generic interface as its input +// parameter so that it can be used more easily in early processing if needed. If a non-service object is given, +// function will return false. +func ServiceIsHeadless(obj interface{}) bool { + if svc, _ := obj.(*v1core.Service); svc != nil { + if svc.Spec.Type == v1core.ServiceTypeClusterIP { + if ClusterIPIsNone(svc.Spec.ClusterIP) && containsOnlyNone(svc.Spec.ClusterIPs) { + return true + } + } + } + return false +} + +// ClusterIPIsNone checks to see whether the ClusterIP contains "None" which would indicate that it is headless +func ClusterIPIsNone(clusterIP string) bool { + return strings.ToLower(clusterIP) == "none" +} + +func ClusterIPIsNoneOrBlank(clusterIP string) bool { + return ClusterIPIsNone(clusterIP) || clusterIP == "" +} + +func containsOnlyNone(clusterIPs []string) bool { + for _, clusterIP := range clusterIPs { + if !ClusterIPIsNone(clusterIP) { + return false + } + } + return true +}