From fcd21b4759c1c7c6a98a7e80fd874ea552f9fc1f Mon Sep 17 00:00:00 2001 From: Aaron U'Ren Date: Mon, 8 Jan 2024 16:47:23 -0600 Subject: [PATCH] feat: fully support service traffic policies Adds support for spec.internalTrafficPolicy and fixes support for spec.externalTrafficPolicy so that it only effects external traffic. Keeps existing support for kube-router.io/service-local annotation which overrides both to local when set to true. Any other value in this annotation is ignored. --- .../proxy/network_services_controller.go | 40 +++-- .../proxy/service_endpoints_sync.go | 31 ++-- pkg/controllers/routing/bgp_policies.go | 2 +- pkg/controllers/routing/ecmp_vip.go | 142 ++++++++++-------- .../routing/network_routes_controller.go | 2 +- 5 files changed, 126 insertions(+), 91 deletions(-) diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index ca10c9b6..e0e2642e 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -197,7 +197,8 @@ type serviceInfo struct { skipLbIps bool externalIPs []string loadBalancerIPs []string - local bool + intTrafficPolicy *v1.ServiceInternalTrafficPolicy + extTrafficPolicy *v1.ServiceExternalTrafficPolicy flags schedFlags } @@ -914,18 +915,21 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { continue } + intClusterPolicyDefault := v1.ServiceInternalTrafficPolicyCluster + extClusterPolicyDefault := v1.ServiceExternalTrafficPolicyCluster for _, port := range svc.Spec.Ports { svcInfo := serviceInfo{ - clusterIP: net.ParseIP(svc.Spec.ClusterIP), - clusterIPs: make([]string, len(svc.Spec.ClusterIPs)), - port: int(port.Port), - targetPort: port.TargetPort.String(), - protocol: strings.ToLower(string(port.Protocol)), - nodePort: int(port.NodePort), - name: svc.ObjectMeta.Name, - namespace: svc.ObjectMeta.Namespace, - externalIPs: make([]string, len(svc.Spec.ExternalIPs)), - local: false, + clusterIP: net.ParseIP(svc.Spec.ClusterIP), + clusterIPs: make([]string, len(svc.Spec.ClusterIPs)), + port: int(port.Port), + targetPort: port.TargetPort.String(), + protocol: strings.ToLower(string(port.Protocol)), + nodePort: int(port.NodePort), + name: svc.ObjectMeta.Name, + namespace: svc.ObjectMeta.Namespace, + externalIPs: make([]string, len(svc.Spec.ExternalIPs)), + intTrafficPolicy: &intClusterPolicyDefault, + extTrafficPolicy: &extClusterPolicyDefault, } dsrMethod, ok := svc.ObjectMeta.Annotations[svcDSRAnnotation] if ok { @@ -971,10 +975,18 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { } _, svcInfo.hairpin = svc.ObjectMeta.Annotations[svcHairpinAnnotation] _, svcInfo.hairpinExternalIPs = svc.ObjectMeta.Annotations[svcHairpinExternalIPsAnnotation] - _, svcInfo.local = svc.ObjectMeta.Annotations[svcLocalAnnotation] _, svcInfo.skipLbIps = svc.ObjectMeta.Annotations[svcSkipLbIpsAnnotation] - if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { - svcInfo.local = true + svcInfo.intTrafficPolicy = svc.Spec.InternalTrafficPolicy + svcInfo.extTrafficPolicy = &svc.Spec.ExternalTrafficPolicy + + // The kube-router.io/service.local annotation has the ability to override the internal and external traffic + // policy that is set in the spec. In this case we set both to local when the annotation is true so that + // previous functionality of the annotation is best preserved. + if svc.ObjectMeta.Annotations[svcLocalAnnotation] == "true" { + intTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal + extTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyLocal + svcInfo.intTrafficPolicy = &intTrafficPolicyLocal + svcInfo.extTrafficPolicy = &extTrafficPolicyLocal } svcID := generateServiceID(svc.Namespace, svc.Name, port.Name) diff --git a/pkg/controllers/proxy/service_endpoints_sync.go b/pkg/controllers/proxy/service_endpoints_sync.go index 41dfc6e0..69f45d79 100644 --- a/pkg/controllers/proxy/service_endpoints_sync.go +++ b/pkg/controllers/proxy/service_endpoints_sync.go @@ -114,7 +114,7 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv endpoints := endpointsInfoMap[k] // First we check to see if this is a local service and that it has any active endpoints, if it doesn't there // isn't any use doing any of the below work, let's save some compute cycles and break fast - if svc.local && !hasActiveEndpoints(endpoints) { + if *svc.intTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal && !hasActiveEndpoints(endpoints) { klog.V(1).Infof("Skipping setting up ClusterIP service %s/%s as it does not have active endpoints", svc.namespace, svc.name) continue @@ -159,7 +159,7 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv } // add IPVS remote server to the IPVS service - nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, clusterIP) + nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, clusterIP, true) } } } @@ -185,7 +185,8 @@ func (nsc *NetworkServicesController) addIPVSService(ipvsSvcs []*ipvs.Service, s } func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endpointSliceInfo, - svcEndpointMap map[string][]string, svc *serviceInfo, svcID string, ipvsSvc *ipvs.Service, vip net.IP) { + svcEndpointMap map[string][]string, svc *serviceInfo, svcID string, ipvsSvc *ipvs.Service, vip net.IP, + isClusterIP bool) { var family v1.IPFamily if vip.To4() != nil { family = v1.IPv4Protocol @@ -201,9 +202,14 @@ func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endp // 1) Service is not a local service // 2) Service is a local service, but has no active endpoints on this node // 3) Service is a local service, has active endpoints on this node, and this endpoint is one of them - if svc.local && !endpoint.isLocal { - klog.V(2).Info("service is local, but endpoint is not, continuing...") - continue + if !endpoint.isLocal { + if isClusterIP && *svc.intTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal { + klog.V(2).Info("service has an internal traffic policy of local, but endpoint is not, continuing...") + continue + } else if !isClusterIP && *svc.extTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal { + klog.V(2).Info("service has an external traffic policy of local, but endpoint is not, continuing...") + continue + } } var syscallINET uint16 eIP := net.ParseIP(endpoint.ip) @@ -260,7 +266,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi endpoints := endpointsInfoMap[k] // First we check to see if this is a local service and that it has any active endpoints, if it doesn't there // isn't any use doing any of the below work, let's save some compute cycles and break fast - if svc.local && !hasActiveEndpoints(endpoints) { + if *svc.extTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal && !hasActiveEndpoints(endpoints) { klog.V(1).Infof("Skipping setting up NodePort service %s/%s as it does not have active endpoints", svc.namespace, svc.name) continue @@ -301,7 +307,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi if svcID == "" { continue } - nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, addr) + nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, addr, false) } } } else { @@ -311,7 +317,8 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi if svcID == "" { continue } - nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, nsc.primaryIP) + nsc.addEndpointsToIPVSService(endpoints, activeServiceEndpointMap, svc, svcID, ipvsSvc, nsc.primaryIP, + false) } } @@ -324,7 +331,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser endpoints := endpointsInfoMap[k] // First we check to see if this is a local service and that it has any active endpoints, if it doesn't there // isn't any use doing any of the below work, let's save some compute cycles and break fast - if svc.local && !hasActiveEndpoints(endpoints) { + if *svc.extTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal && !hasActiveEndpoints(endpoints) { klog.V(1).Infof("Skipping setting up IPVS service for external IP and LoadBalancer IP "+ "for the service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name) continue @@ -427,7 +434,7 @@ func (nsc *NetworkServicesController) setupExternalIPForService(svc *serviceInfo } // add pod endpoints to the IPVS service - nsc.addEndpointsToIPVSService(endpoints, svcEndpointMap, svc, svcID, ipvsExternalIPSvc, externalIP) + nsc.addEndpointsToIPVSService(endpoints, svcEndpointMap, svc, svcID, ipvsExternalIPSvc, externalIP, false) return nil } @@ -508,7 +515,7 @@ func (nsc *NetworkServicesController) setupExternalIPForDSRService(svc *serviceI // 1) Service is not a local service // 2) Service is a local service, but has no active endpoints on this node // 3) Service is a local service, has active endpoints on this node, and this endpoint is one of them - if svc.local && !endpoint.isLocal { + if *svc.extTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal && !endpoint.isLocal { continue } var syscallINET uint16 diff --git a/pkg/controllers/routing/bgp_policies.go b/pkg/controllers/routing/bgp_policies.go index a4ba076e..311ae961 100644 --- a/pkg/controllers/routing/bgp_policies.go +++ b/pkg/controllers/routing/bgp_policies.go @@ -169,7 +169,7 @@ func (nrc *NetworkRoutingController) addServiceVIPsDefinedSet() error { } advIPPrefixList := make([]*gobgpapi.Prefix, 0) - advIps, _, _ := nrc.getAllVIPs() + advIps, _, _ := nrc.getVIPs() for _, ipStr := range advIps { ip := net.ParseIP(ipStr) if ip == nil { diff --git a/pkg/controllers/routing/ecmp_vip.go b/pkg/controllers/routing/ecmp_vip.go index e7826708..ff8eb49a 100644 --- a/pkg/controllers/routing/ecmp_vip.go +++ b/pkg/controllers/routing/ecmp_vip.go @@ -141,7 +141,7 @@ func (nrc *NetworkRoutingController) handleServiceUpdate(svcOld, svcNew *v1core. return } - toAdvertise, toWithdraw, err := nrc.getChangedVIPs(svcOld, svcNew, true) + toAdvertise, toWithdraw, err := nrc.getChangedVIPs(svcOld, svcNew) if err != nil { klog.Errorf("error getting routes for services: %s", err) return @@ -167,15 +167,21 @@ func (nrc *NetworkRoutingController) handleServiceDelete(oldSvc *v1core.Service) err := nrc.AddPolicies() if err != nil { - klog.Errorf("Error adding BGP policies: %s", err.Error()) + klog.Errorf("Error adding BGP policies during service update for %s/%s: %v", oldSvc.Namespace, oldSvc.Name, + err) } - activeVIPs, _, err := nrc.getActiveVIPs() + activeVIPs, _, err := nrc.getVIPs() if err != nil { - klog.Errorf("Failed to get active VIP's on service delete event due to: %s", err.Error()) + klog.Errorf("Failed to get active VIP's on service delete event for %s/%s due to: %v", oldSvc.Namespace, + oldSvc.Name, err) return } - advertiseIPList, unadvertiseIPList := nrc.getAllVIPsForService(oldSvc) + advertiseIPList, unadvertiseIPList, err := nrc.getAllVIPsForService(oldSvc) + if err != nil { + klog.Errorf("Error getting VIPs on service delete event for %s/%s due to: %v", oldSvc.Namespace, oldSvc.Name, + err) + } //nolint:gocritic // we understand that we're assigning to a new slice allIPList := append(advertiseIPList, unadvertiseIPList...) withdrawVIPs := make([]string, 0) @@ -364,23 +370,14 @@ func (nrc *NetworkRoutingController) getLoadBalancerIPs(svc *v1core.Service) []s return loadBalancerIPList } -func (nrc *NetworkRoutingController) getChangedVIPs(oldSvc, newSvc *v1core.Service, - onlyActiveEndpoints bool) ([]string, []string, error) { +func (nrc *NetworkRoutingController) getChangedVIPs(oldSvc, newSvc *v1core.Service) ([]string, []string, error) { advertiseService := true - _, hasLocalAnnotation := newSvc.Annotations[svcLocalAnnotation] - hasLocalTrafficPolicy := newSvc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal - isLocal := hasLocalAnnotation || hasLocalTrafficPolicy - - if onlyActiveEndpoints && isLocal { - var err error - advertiseService, err = nrc.nodeHasEndpointsForService(newSvc) - if err != nil { - return nil, nil, err - } + newAdvertiseServiceVIPs, newUnadvertiseServiceVIPs, err := nrc.getAllVIPsForService(newSvc) + if err != nil { + return nil, nil, fmt.Errorf("failed to get all VIPs for new service %s/%s due to: %v", newSvc.Namespace, + newSvc.Name, err) } - - newAdvertiseServiceVIPs, newUnadvertiseServiceVIPs := nrc.getAllVIPsForService(newSvc) // This function allows oldSvc to be nil, if this is the case, we don't have any old VIPs to compare against and // possibly withdraw instead treat all VIPs as new and return them as either toAdvertise or toWithdraw depending // on service configuration @@ -393,7 +390,11 @@ func (nrc *NetworkRoutingController) getChangedVIPs(oldSvc, newSvc *v1core.Servi return nil, allVIPs, nil } } - oldAdvertiseServiceVIPs, oldUnadvertiseServiceVIPs := nrc.getAllVIPsForService(oldSvc) + oldAdvertiseServiceVIPs, oldUnadvertiseServiceVIPs, err := nrc.getAllVIPsForService(oldSvc) + if err != nil { + return nil, nil, fmt.Errorf("failed to get all VIPs for old service %s/%s due to: %v", oldSvc.Namespace, + oldSvc.Name, err) + } //nolint:gocritic // we understand that we're assigning to a new slice oldAllServiceVIPs := append(oldAdvertiseServiceVIPs, oldUnadvertiseServiceVIPs...) @@ -416,7 +417,7 @@ func (nrc *NetworkRoutingController) getChangedVIPs(oldSvc, newSvc *v1core.Servi // It is possible that this host may have the same IP advertised from multiple services, and we don't want to // withdraw it if there is an active service for this VIP on a different service than the one that is changing. toWithdrawListFinal := make([]string, 0) - allVIPsOnServer, _, err := nrc.getVIPs(onlyActiveEndpoints) + allVIPsOnServer, _, err := nrc.getVIPs() if err != nil { return nil, nil, err } @@ -429,22 +430,14 @@ func (nrc *NetworkRoutingController) getChangedVIPs(oldSvc, newSvc *v1core.Servi return toAdvertiseListFinal, toWithdrawListFinal, nil } -func (nrc *NetworkRoutingController) getAllVIPs() ([]string, []string, error) { - return nrc.getVIPs(false) -} - -func (nrc *NetworkRoutingController) getActiveVIPs() ([]string, []string, error) { - return nrc.getVIPs(true) -} - -func (nrc *NetworkRoutingController) getVIPs(onlyActiveEndpoints bool) ([]string, []string, error) { +func (nrc *NetworkRoutingController) getVIPs() ([]string, []string, error) { toAdvertiseList := make([]string, 0) toWithdrawList := make([]string, 0) for _, obj := range nrc.svcLister.List() { svc := obj.(*v1core.Service) - toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, onlyActiveEndpoints) + toAdvertise, toWithdraw, err := nrc.getAllVIPsForService(svc) if err != nil { return nil, nil, err } @@ -472,52 +465,67 @@ func (nrc *NetworkRoutingController) getVIPs(onlyActiveEndpoints bool) ([]string } func (nrc *NetworkRoutingController) shouldAdvertiseService(svc *v1core.Service, annotation string, - defaultValue bool) bool { + defaultValue, isClusterIP bool) (bool, error) { returnValue := defaultValue stringValue, exists := svc.Annotations[annotation] if exists { // Service annotations overrides defaults. returnValue, _ = strconv.ParseBool(stringValue) } - return returnValue -} -func (nrc *NetworkRoutingController) getVIPsForService(svc *v1core.Service, - onlyActiveEndpoints bool) ([]string, []string, error) { - - advertise := true - - _, hasLocalAnnotation := svc.Annotations[svcLocalAnnotation] - hasLocalTrafficPolicy := svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal - isLocal := hasLocalAnnotation || hasLocalTrafficPolicy - - if onlyActiveEndpoints && isLocal { - var err error - advertise, err = nrc.nodeHasEndpointsForService(svc) - if err != nil { - return nil, nil, err - } + // If we already know that we shouldn't advertise the service, fail fast + if !returnValue { + return returnValue, nil } - advertiseIPList, unAdvertisedIPList := nrc.getAllVIPsForService(svc) - - if !advertise { - //nolint:gocritic // we understand that we're assigning to a new slice - allIPList := append(advertiseIPList, unAdvertisedIPList...) - return nil, allIPList, nil + hasLocalEndpoints, err := nrc.nodeHasEndpointsForService(svc) + if err != nil { + return returnValue, err } - return advertiseIPList, unAdvertisedIPList, nil + // If: + // - We are assessing the clusterIP of the service (the internally facing VIP) + // - The service has an internal traffic policy of "local" or the service has the service.local annotation on it + // - The service doesn't have any endpoints on the node we're executing on + // Then: return false + // We handle spec.internalTrafficPolicy different because it was introduced in v1.26 and may not be available in all + // clusters, in this case, it will be set to nil + serIntTrafPol := false + if svc.Spec.InternalTrafficPolicy != nil { + serIntTrafPol = *svc.Spec.InternalTrafficPolicy == v1core.ServiceInternalTrafficPolicyLocal + } + intLocalPol := (serIntTrafPol || svc.Annotations[svcLocalAnnotation] == "true") + if isClusterIP && intLocalPol && !hasLocalEndpoints { + return false, nil + } + + // If: + // - We are assessing something other than a clusterIP like an externalIP or nodePort (externally facing) + // - The service has an external traffic policy of "local" or the service has the service.local annotation on it + // - The service doesn't have any endpoints on the node we're executing on + // Then: return false + extLocalPol := (svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyLocal || + svc.Annotations[svcLocalAnnotation] == "true") + if !isClusterIP && extLocalPol && !hasLocalEndpoints { + return false, nil + } + + return returnValue, nil } -func (nrc *NetworkRoutingController) getAllVIPsForService(svc *v1core.Service) ([]string, []string) { +func (nrc *NetworkRoutingController) getAllVIPsForService(svc *v1core.Service) ([]string, []string, error) { advertisedIPList := make([]string, 0) unAdvertisedIPList := make([]string, 0) clusterIPs := nrc.getClusterIP(svc) if len(clusterIPs) > 0 { - if nrc.shouldAdvertiseService(svc, svcAdvertiseClusterAnnotation, nrc.advertiseClusterIP) { + shouldAdvCIP, err := nrc.shouldAdvertiseService(svc, svcAdvertiseClusterAnnotation, nrc.advertiseClusterIP, + true) + if err != nil { + return advertisedIPList, unAdvertisedIPList, err + } + if shouldAdvCIP { advertisedIPList = append(advertisedIPList, clusterIPs...) } else { unAdvertisedIPList = append(unAdvertisedIPList, clusterIPs...) @@ -526,7 +534,12 @@ func (nrc *NetworkRoutingController) getAllVIPsForService(svc *v1core.Service) ( externalIPs := nrc.getExternalIPs(svc) if len(externalIPs) > 0 { - if nrc.shouldAdvertiseService(svc, svcAdvertiseExternalAnnotation, nrc.advertiseExternalIP) { + shouldAdvEIP, err := nrc.shouldAdvertiseService(svc, svcAdvertiseExternalAnnotation, nrc.advertiseExternalIP, + false) + if err != nil { + return advertisedIPList, unAdvertisedIPList, err + } + if shouldAdvEIP { advertisedIPList = append(advertisedIPList, externalIPs...) } else { unAdvertisedIPList = append(unAdvertisedIPList, externalIPs...) @@ -537,16 +550,19 @@ func (nrc *NetworkRoutingController) getAllVIPsForService(svc *v1core.Service) ( lbIPs := nrc.getLoadBalancerIPs(svc) if len(lbIPs) > 0 { _, skiplbips := svc.Annotations[svcSkipLbIpsAnnotation] - advertiseLoadBalancer := nrc.shouldAdvertiseService(svc, svcAdvertiseLoadBalancerAnnotation, - nrc.advertiseLoadBalancerIP) - if advertiseLoadBalancer && !skiplbips { + shouldAdvLIP, err := nrc.shouldAdvertiseService(svc, svcAdvertiseLoadBalancerAnnotation, + nrc.advertiseLoadBalancerIP, false) + if err != nil { + return advertisedIPList, unAdvertisedIPList, err + } + if shouldAdvLIP && !skiplbips { advertisedIPList = append(advertisedIPList, lbIPs...) } else { unAdvertisedIPList = append(unAdvertisedIPList, lbIPs...) } } - return advertisedIPList, unAdvertisedIPList + return advertisedIPList, unAdvertisedIPList, nil } diff --git a/pkg/controllers/routing/network_routes_controller.go b/pkg/controllers/routing/network_routes_controller.go index 2655c36e..93689c27 100644 --- a/pkg/controllers/routing/network_routes_controller.go +++ b/pkg/controllers/routing/network_routes_controller.go @@ -371,7 +371,7 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll } // advertise or withdraw IPs for the services to be reachable via host - toAdvertise, toWithdraw, err := nrc.getActiveVIPs() + toAdvertise, toWithdraw, err := nrc.getVIPs() if err != nil { klog.Errorf("failed to get routes to advertise/withdraw %s", err) }