Handle headless services (#1047)

* doc(ecmp_vip.go): add info around extra withdraw

Rename getWithdraw to make it more explicit what its doing here. Also
add documentation as to why this is needed on Update and not
Create/Delete as well as why we only treat externalIPs.

* fix(ecmp_vip.go): remove superfluous AddPolicies

AddPolicies is already called downstream of nrc.OnEndpointsUpdate() so
there is no need to do it here as well, the only result is that this
expensive operation and idempotent operation is run twice.

* feat: better handling of headless services

Also introduces a consolidated Service utilities section for controller
functionality related to services that is shared.

* fix: add logging back to tryHandleServiceDelete
This commit is contained in:
Aaron U'Ren 2021-03-23 22:01:39 -05:00 committed by GitHub
parent 1fb0820044
commit 43c3c9de86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 120 additions and 30 deletions

View File

@ -808,6 +808,20 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *api.Endpoints) {
return
}
// If the service is headless and the previous version of the service is either non-existent or also headless,
// skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't
// need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a
// ClusterIP before.
svc, err := utils.ServiceForEndpoints(&nsc.svcLister, ep)
if err != nil {
glog.Errorf("failed to convert endpoints resource to service: %s", err)
return
}
if utils.ServiceIsHeadless(svc) {
glog.V(1).Infof("The service associated with endpoint: %s/%s is headless, skipping...", ep.Namespace, ep.Name)
return
}
// build new service and endpoints map to reflect the change
newServiceMap := nsc.buildServicesInfo()
newEndpointsMap := nsc.buildEndpointsInfo()
@ -834,6 +848,15 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *api.Service) {
return
}
// If the service is headless and the previous version of the service is either non-existent or also headless,
// skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't
// need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a
// ClusterIP before.
if utils.ServiceIsHeadless(svc) {
glog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name)
return
}
// build new service and endpoints map to reflect the change
newServiceMap := nsc.buildServicesInfo()
newEndpointsMap := nsc.buildEndpointsInfo()
@ -1310,7 +1333,7 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap {
for _, obj := range nsc.svcLister.List() {
svc := obj.(*api.Service)
if svc.Spec.ClusterIP == "None" || svc.Spec.ClusterIP == "" {
if utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) {
glog.V(2).Infof("Skipping service name:%s namespace:%s as there is no cluster IP", svc.Name, svc.Namespace)
continue
}

View File

@ -6,6 +6,8 @@ import (
"fmt"
"strconv"
"github.com/cloudnativelabs/kube-router/pkg/utils"
"strings"
"github.com/golang/glog"
@ -168,6 +170,16 @@ func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) {
func (nrc *NetworkRoutingController) tryHandleServiceUpdate(obj interface{}, logMsgFormat string) {
if svc := getServiceObject(obj); svc != nil {
glog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name)
// If the service is headless and the previous version of the service is either non-existent or also headless,
// skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't
// need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a
// ClusterIP before.
if utils.ServiceIsHeadless(obj) {
glog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name)
return
}
nrc.handleServiceUpdate(svc)
}
}
@ -185,6 +197,14 @@ func (nrc *NetworkRoutingController) tryHandleServiceDelete(obj interface{}, log
return
}
}
glog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name)
// If the service is headless skip processing as we only work with VIPs in the next section.
if utils.ServiceIsHeadless(obj) {
glog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name)
return
}
nrc.handleServiceDelete(svc)
}
@ -197,10 +217,20 @@ func (nrc *NetworkRoutingController) OnServiceCreate(obj interface{}) {
func (nrc *NetworkRoutingController) OnServiceUpdate(objNew interface{}, objOld interface{}) {
nrc.tryHandleServiceUpdate(objNew, "Received update on service: %s/%s from watch API")
nrc.withdrawVIPs(nrc.getWithdraw(getServiceObject(objOld), getServiceObject(objNew)))
// This extra call needs to be here, because during the update the list of externalIPs may have changed and
// externalIPs is the only service VIP field that is:
// a) mutable after first creation
// b) an array
//
// This means that while we only need to withdraw ClusterIP VIPs and LoadBalancer VIPs on delete, we may need
// to withdraw ExternalIPs on update.
//
// As such, it needs to be handled differently as nrc.handleServiceUpdate only withdraws VIPs if the service
// endpoint is no longer scheduled on this node and its a local type service.
nrc.withdrawVIPs(nrc.getExternalIPsToWithdraw(getServiceObject(objOld), getServiceObject(objNew)))
}
func (nrc *NetworkRoutingController) getWithdraw(svcOld, svcNew *v1core.Service) (out []string) {
func (nrc *NetworkRoutingController) getExternalIPsToWithdraw(svcOld, svcNew *v1core.Service) (out []string) {
if svcOld != nil && svcNew != nil {
out = getMissingPrevGen(nrc.getExternalIPs(svcOld), nrc.getExternalIPs(svcNew))
}
@ -248,11 +278,6 @@ func (nrc *NetworkRoutingController) OnEndpointsAdd(obj interface{}) {
return
}
err := nrc.AddPolicies()
if err != nil {
glog.Errorf("error adding BGP policies: %s", err)
}
nrc.OnEndpointsUpdate(obj)
}
@ -274,7 +299,7 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
return
}
svc, err := nrc.serviceForEndpoints(ep)
svc, err := utils.ServiceForEndpoints(&nrc.svcLister, ep)
if err != nil {
glog.Errorf("failed to convert endpoints resource to service: %s", err)
return
@ -283,30 +308,12 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
nrc.tryHandleServiceUpdate(svc, "Updating service %s/%s triggered by endpoint update event")
}
func (nrc *NetworkRoutingController) serviceForEndpoints(ep *v1core.Endpoints) (interface{}, error) {
key, err := cache.MetaNamespaceKeyFunc(ep)
if err != nil {
return nil, err
}
item, exists, err := nrc.svcLister.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("service resource doesn't exist for endpoints: %q", ep.Name)
}
return item, nil
}
func (nrc *NetworkRoutingController) getClusterIP(svc *v1core.Service) string {
clusterIP := ""
if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" || svc.Spec.Type == "LoadBalancer" {
// skip headless services
if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" {
if !utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) {
clusterIP = svc.Spec.ClusterIP
}
}
@ -318,7 +325,7 @@ func (nrc *NetworkRoutingController) getExternalIPs(svc *v1core.Service) []strin
if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" || svc.Spec.Type == "LoadBalancer" {
// skip headless services
if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" {
if !utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) {
externalIPList = append(externalIPList, svc.Spec.ExternalIPs...)
}
}
@ -329,7 +336,7 @@ func (nrc *NetworkRoutingController) getLoadBalancerIPs(svc *v1core.Service) []s
loadBalancerIPList := make([]string, 0)
if svc.Spec.Type == "LoadBalancer" {
// skip headless services
if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" {
if !utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) {
for _, lbIngress := range svc.Status.LoadBalancer.Ingress {
if len(lbIngress.IP) > 0 {
loadBalancerIPList = append(loadBalancerIPList, lbIngress.IP)

60
pkg/utils/service.go Normal file
View File

@ -0,0 +1,60 @@
package utils
import (
"fmt"
"strings"
v1core "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
func ServiceForEndpoints(ci *cache.Indexer, ep *v1core.Endpoints) (interface{}, error) {
key, err := cache.MetaNamespaceKeyFunc(ep)
if err != nil {
return nil, err
}
item, exists, err := (*ci).GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("service resource doesn't exist for endpoints: %q", ep.Name)
}
return item, nil
}
// ServiceIsHeadless decides whether or not the this service is a headless service which is often useful to kube-router
// as there is no need to execute logic on most headless changes. Function takes a generic interface as its input
// parameter so that it can be used more easily in early processing if needed. If a non-service object is given,
// function will return false.
func ServiceIsHeadless(obj interface{}) bool {
if svc, _ := obj.(*v1core.Service); svc != nil {
if svc.Spec.Type == v1core.ServiceTypeClusterIP {
if ClusterIPIsNone(svc.Spec.ClusterIP) && containsOnlyNone(svc.Spec.ClusterIPs) {
return true
}
}
}
return false
}
// ClusterIPIsNone checks to see whether the ClusterIP contains "None" which would indicate that it is headless
func ClusterIPIsNone(clusterIP string) bool {
return strings.ToLower(clusterIP) == "none"
}
func ClusterIPIsNoneOrBlank(clusterIP string) bool {
return ClusterIPIsNone(clusterIP) || clusterIP == ""
}
func containsOnlyNone(clusterIPs []string) bool {
for _, clusterIP := range clusterIPs {
if !ClusterIPIsNone(clusterIP) {
return false
}
}
return true
}