From da73dea69b06a9dd8a54d92f557ff994dc447eb4 Mon Sep 17 00:00:00 2001 From: Aaron U'Ren Date: Thu, 14 Sep 2023 16:59:42 -0500 Subject: [PATCH] feat(NSC): use EndpointSlice instead of Endpoints With the advent of IPv6 integrated into the NSC we no longer get all IPs from endpoints, but rather just the primary IP of the pod (which is often, but not always the IPv4 address). In order to get all possible endpoint addresses for a given service we need to switch to using EndpointSlice which also nicely groups addresses into IPv4 and IPv6 by AddressType and also gives us more information about the endpoint status by giving us attributes for serving and terminating, instead of just ready or not ready. This does mean that users will need to add another permission to their RBAC in order for kube-router to access these objects. --- ...erouter-all-features-advertise-routes.yaml | 8 + .../generic-kuberouter-all-features.yaml | 8 + ...eric-kuberouter-only-advertise-routes.yaml | 10 +- daemonset/generic-kuberouter.yaml | 10 +- .../kubeadm-kuberouter-all-features-dsr.yaml | 9 + ...eadm-kuberouter-all-features-hostport.yaml | 9 + .../kubeadm-kuberouter-all-features.yaml | 9 + daemonset/kubeadm-kuberouter.yaml | 9 + pkg/cmd/kube-router.go | 5 +- .../proxy/network_services_controller.go | 180 ++++++++++++------ .../proxy/network_services_controller_test.go | 6 +- .../proxy/service_endpoints_sync.go | 18 +- pkg/controllers/proxy/utils.go | 4 +- pkg/utils/service.go | 85 +++++++++ 14 files changed, 298 insertions(+), 72 deletions(-) diff --git a/daemonset/generic-kuberouter-all-features-advertise-routes.yaml b/daemonset/generic-kuberouter-all-features-advertise-routes.yaml index bc896456..157d82aa 100644 --- a/daemonset/generic-kuberouter-all-features-advertise-routes.yaml +++ b/daemonset/generic-kuberouter-all-features-advertise-routes.yaml @@ -224,6 +224,14 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch --- kind: ClusterRoleBinding diff --git a/daemonset/generic-kuberouter-all-features.yaml b/daemonset/generic-kuberouter-all-features.yaml index ff31bd3d..cd0e0db1 100644 --- a/daemonset/generic-kuberouter-all-features.yaml +++ b/daemonset/generic-kuberouter-all-features.yaml @@ -220,6 +220,14 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/generic-kuberouter-only-advertise-routes.yaml b/daemonset/generic-kuberouter-only-advertise-routes.yaml index 6a3d0d3f..d905604b 100644 --- a/daemonset/generic-kuberouter-only-advertise-routes.yaml +++ b/daemonset/generic-kuberouter-only-advertise-routes.yaml @@ -132,7 +132,15 @@ rules: - services/status verbs: - update - + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/generic-kuberouter.yaml b/daemonset/generic-kuberouter.yaml index 52eab6da..a44dd61d 100644 --- a/daemonset/generic-kuberouter.yaml +++ b/daemonset/generic-kuberouter.yaml @@ -187,7 +187,15 @@ rules: - services/status verbs: - update - + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter-all-features-dsr.yaml b/daemonset/kubeadm-kuberouter-all-features-dsr.yaml index 301b04f9..974f1d81 100644 --- a/daemonset/kubeadm-kuberouter-all-features-dsr.yaml +++ b/daemonset/kubeadm-kuberouter-all-features-dsr.yaml @@ -203,6 +203,15 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter-all-features-hostport.yaml b/daemonset/kubeadm-kuberouter-all-features-hostport.yaml index 13e6061b..72e8b196 100644 --- a/daemonset/kubeadm-kuberouter-all-features-hostport.yaml +++ b/daemonset/kubeadm-kuberouter-all-features-hostport.yaml @@ -202,6 +202,15 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter-all-features.yaml b/daemonset/kubeadm-kuberouter-all-features.yaml index 9e25d9fa..a91f8785 100644 --- a/daemonset/kubeadm-kuberouter-all-features.yaml +++ b/daemonset/kubeadm-kuberouter-all-features.yaml @@ -195,6 +195,15 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter.yaml b/daemonset/kubeadm-kuberouter.yaml index 8093176f..b5df7b47 100644 --- a/daemonset/kubeadm-kuberouter.yaml +++ b/daemonset/kubeadm-kuberouter.yaml @@ -191,6 +191,15 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/pkg/cmd/kube-router.go b/pkg/cmd/kube-router.go index 59b8c134..d64604f9 100644 --- a/pkg/cmd/kube-router.go +++ b/pkg/cmd/kube-router.go @@ -97,6 +97,7 @@ func (kr *KubeRouter) Run() error { informerFactory := informers.NewSharedInformerFactory(kr.Client, 0) svcInformer := informerFactory.Core().V1().Services().Informer() epInformer := informerFactory.Core().V1().Endpoints().Informer() + epSliceInformer := informerFactory.Discovery().V1().EndpointSlices().Informer() podInformer := informerFactory.Core().V1().Pods().Informer() nodeInformer := informerFactory.Core().V1().Nodes().Informer() nsInformer := informerFactory.Core().V1().Namespaces().Informer() @@ -177,7 +178,7 @@ func (kr *KubeRouter) Run() error { if kr.Config.RunServiceProxy { nsc, err := proxy.NewNetworkServicesController(kr.Client, kr.Config, - svcInformer, epInformer, podInformer, &ipsetMutex) + svcInformer, epSliceInformer, podInformer, &ipsetMutex) if err != nil { return fmt.Errorf("failed to create network services controller: %v", err) } @@ -186,7 +187,7 @@ func (kr *KubeRouter) Run() error { if err != nil { return fmt.Errorf("failed to add ServiceEventHandler: %v", err) } - _, err = epInformer.AddEventHandler(nsc.EndpointsEventHandler) + _, err = epSliceInformer.AddEventHandler(nsc.EndpointSliceEventHandler) if err != nil { return fmt.Errorf("failed to add EndpointsEventHandler: %v", err) } diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 5386ca4d..1395ab80 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -20,6 +20,7 @@ import ( "github.com/moby/ipvs" "github.com/vishvananda/netlink" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -103,7 +104,7 @@ type NetworkServicesController struct { syncPeriod time.Duration mu sync.Mutex serviceMap serviceInfoMap - endpointsMap endpointsInfoMap + endpointsMap endpointSliceInfoMap podCidr string excludedCidrs []net.IPNet masqueradeAll bool @@ -124,12 +125,12 @@ type NetworkServicesController struct { serviceIPPortsIPSet map[v1.IPFamily]*utils.Set serviceIPsIPSet map[v1.IPFamily]*utils.Set - svcLister cache.Indexer - epLister cache.Indexer - podLister cache.Indexer + svcLister cache.Indexer + epSliceLister cache.Indexer + podLister cache.Indexer - EndpointsEventHandler cache.ResourceEventHandler - ServiceEventHandler cache.ResourceEventHandler + EndpointSliceEventHandler cache.ResourceEventHandler + ServiceEventHandler cache.ResourceEventHandler gracefulPeriod time.Duration gracefulQueue gracefulQueue @@ -204,14 +205,16 @@ type schedFlags struct { type serviceInfoMap map[string]*serviceInfo // internal representation of endpoints -type endpointsInfo struct { +type endpointSliceInfo struct { ip string port int isLocal bool + isIPv4 bool + isIPv6 bool } // map of all endpoints, with unique service id(namespace name, service name, port) as key -type endpointsInfoMap map[string][]endpointsInfo +type endpointSliceInfoMap map[string][]endpointSliceInfo // Run periodically sync ipvs configuration to reflect desired state of services and endpoints func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, @@ -369,7 +372,7 @@ func (nsc *NetworkServicesController) doSync() error { } nsc.serviceMap = nsc.buildServicesInfo() - nsc.endpointsMap = nsc.buildEndpointsInfo() + nsc.endpointsMap = nsc.buildEndpointSliceInfo() err = nsc.syncHairpinIptablesRules() if err != nil { klog.Errorf("Error syncing hairpin iptables rules: %s", err.Error()) @@ -766,19 +769,15 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM } // OnEndpointsUpdate handle change in endpoints update from the API server -func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *v1.Endpoints) { - - if isEndpointsForLeaderElection(ep) { - return - } +func (nsc *NetworkServicesController) OnEndpointsUpdate(es *discovery.EndpointSlice) { nsc.mu.Lock() defer nsc.mu.Unlock() - klog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name) + klog.V(1).Infof("Received update to EndpointSlice: %s/%s from watch API", es.Namespace, es.Name) if !nsc.readyForUpdates { - klog.V(3).Infof( - "Skipping update to endpoint: %s/%s as controller is not ready to process service and endpoints updates", - ep.Namespace, ep.Name) + klog.V(1).Infof( + "Skipping update to EndpointSlice: %s/%s as controller is not ready to process service and endpoints "+ + "updates", es.Namespace, es.Name) return } @@ -786,33 +785,34 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *v1.Endpoints) { // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't // need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a // ClusterIP before. - svc, exists, err := utils.ServiceForEndpoints(&nsc.svcLister, ep) + svc, exists, err := utils.ServiceForEndpointSlice(&nsc.svcLister, es) if err != nil { - klog.Errorf("failed to convert endpoints resource to service: %s", err) + klog.Errorf("failed to convert endpoints resource to service for %s/%s: %v", es.Namespace, es.Name, err) return } // ignore updates to Endpoints object with no corresponding Service object if !exists { + klog.Warningf("failed to lookup any service as an owner for %s/%s", es.Namespace, es.Name) return } if utils.ServiceIsHeadless(svc) { klog.V(1).Infof("The service associated with endpoint: %s/%s is headless, skipping...", - ep.Namespace, ep.Name) + es.Namespace, es.Name) return } // build new service and endpoints map to reflect the change newServiceMap := nsc.buildServicesInfo() - newEndpointsMap := nsc.buildEndpointsInfo() + newEndpointsMap := nsc.buildEndpointSliceInfo() if !endpointsMapsEquivalent(newEndpointsMap, nsc.endpointsMap) { nsc.endpointsMap = newEndpointsMap nsc.serviceMap = newServiceMap - klog.V(1).Infof("Syncing IPVS services sync for update to endpoint: %s/%s", ep.Namespace, ep.Name) + klog.V(1).Infof("Syncing IPVS services sync for update to endpoint: %s/%s", es.Namespace, es.Name) nsc.sync(synctypeIpvs) } else { klog.V(1).Infof("Skipping IPVS services sync on endpoint: %s/%s update as nothing changed", - ep.Namespace, ep.Name) + es.Namespace, es.Name) } } @@ -841,7 +841,7 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *v1.Service) { // build new service and endpoints map to reflect the change newServiceMap := nsc.buildServicesInfo() - newEndpointsMap := nsc.buildEndpointsInfo() + newEndpointsMap := nsc.buildEndpointSliceInfo() if len(newServiceMap) != len(nsc.serviceMap) || !reflect.DeepEqual(newServiceMap, nsc.serviceMap) { nsc.endpointsMap = newEndpointsMap @@ -854,7 +854,7 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *v1.Service) { } } -func hasActiveEndpoints(endpoints []endpointsInfo) bool { +func hasActiveEndpoints(endpoints []endpointSliceInfo) bool { for _, endpoint := range endpoints { if endpoint.isLocal { return true @@ -983,7 +983,7 @@ func parseSchedFlags(value string) schedFlags { return schedFlags{flag1, flag2, flag3} } -func shuffle(endPoints []endpointsInfo) []endpointsInfo { +func shuffle(endPoints []endpointSliceInfo) []endpointSliceInfo { for index1 := range endPoints { randBitInt, err := rand.Int(rand.Reader, big.NewInt(int64(index1+1))) index2 := randBitInt.Int64() @@ -995,18 +995,84 @@ func shuffle(endPoints []endpointsInfo) []endpointsInfo { return endPoints } -func (nsc *NetworkServicesController) buildEndpointsInfo() endpointsInfoMap { - endpointsMap := make(endpointsInfoMap) - for _, obj := range nsc.epLister.List() { - ep := obj.(*v1.Endpoints) +// buildEndpointSliceInfo creates a map of EndpointSlices taken at a moment in time +func (nsc *NetworkServicesController) buildEndpointSliceInfo() endpointSliceInfoMap { + endpointsMap := make(endpointSliceInfoMap) + for _, obj := range nsc.epSliceLister.List() { + var isIPv4, isIPv6 bool + es := obj.(*discovery.EndpointSlice) + switch es.AddressType { + case discovery.AddressTypeIPv4: + isIPv4 = true + case discovery.AddressTypeIPv6: + isIPv6 = true + case discovery.AddressTypeFQDN: + // At this point we don't handle FQDN type EndpointSlices, at some point in the future this might change + continue + default: + // If at some point k8s adds more AddressTypes, we'd prefer to handle them manually to ensure consistent + // functionality within kube-router + continue + } - for _, epSubset := range ep.Subsets { - for _, port := range epSubset.Ports { - svcID := generateServiceID(ep.Namespace, ep.Name, port.Name) - endpoints := make([]endpointsInfo, 0) - for _, addr := range epSubset.Addresses { - isLocal := addr.NodeName != nil && *addr.NodeName == nsc.nodeHostName - endpoints = append(endpoints, endpointsInfo{ip: addr.IP, port: int(port.Port), isLocal: isLocal}) + // In order to properly link the endpoint with the service, we need the service's name + svcName, err := utils.ServiceNameforEndpointSlice(es) + if err != nil { + klog.Errorf("unable to lookup service from EndpointSlice, skipping: %v", err) + continue + } + + // Keep in mind that ports aren't embedded in Endpoints, but we do need to make an endpointSliceInfo and a svcID + // for each pair, so we consume them as an inter and outer loop. Actual structure of EndpointSlice looks like: + // + // metadata: + // name: ... + // namespace: ... + // endpoints: + // - addresses: + // - 10.0.0.1 + // conditions: + // ready: (true|false) + // nodeName: foo + // targetRef: + // kind: Pod + // name: bar + // zone: z1 + // ports: + // - name: baz + // port: 8080 + // protocol: TCP + // + for _, ep := range es.Endpoints { + // Previously, when we used endpoints, we only looked at subsets.addresses and not subsets.notReadyAddresses + // so here we need to limit our endpoints to only the ones that are ready. In the future, we could consider + // changing this to .Serving which continues to include pods that are in Terminating state. For now we keep + // it the same. + if !*ep.Conditions.Ready { + continue + } + + for _, port := range es.Ports { + var endpoints []endpointSliceInfo + var ok bool + + svcID := generateServiceID(es.Namespace, svcName, *port.Name) + + // we may have already started to populate endpoints for this service from another EndpointSlice, if so + // continue where we left off, otherwise create a new slice + if endpoints, ok = endpointsMap[svcID]; !ok { + endpoints = make([]endpointSliceInfo, 0) + } + + for _, addr := range ep.Addresses { + isLocal := ep.NodeName != nil && *ep.NodeName == nsc.nodeHostName + endpoints = append(endpoints, endpointSliceInfo{ + ip: addr, + port: int(*port.Port), + isLocal: isLocal, + isIPv4: isIPv4, + isIPv6: isIPv6, + }) } endpointsMap[svcID] = shuffle(endpoints) } @@ -1589,6 +1655,12 @@ func routeVIPTrafficToDirector(fwmark string, family v1.IPFamily) error { return nil } +// isEndpointsForLeaderElection checks to see if this change has to do with leadership elections +// +// Deprecated: this is no longer used because we use EndpointSlices instead of Endpoints in the NSC now, this is +// currently preserved for posterity, but will be removed in the future if it is no longer used +// +//nolint:unused // We understand that this function is unused, but we want to keep it for now func isEndpointsForLeaderElection(ep *v1.Endpoints) bool { _, isLeaderElection := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey] return isLeaderElection @@ -1664,23 +1736,23 @@ func (nsc *NetworkServicesController) Cleanup() { klog.Infof("Successfully cleaned the NetworkServiceController configuration done by kube-router") } -func (nsc *NetworkServicesController) newEndpointsEventHandler() cache.ResourceEventHandler { +func (nsc *NetworkServicesController) newEndpointSliceEventHandler() cache.ResourceEventHandler { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - nsc.handleEndpointsAdd(obj) + nsc.handleEndpointSliceAdd(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { - nsc.handleEndpointsUpdate(oldObj, newObj) + nsc.handleEndpointSliceUpdate(oldObj, newObj) }, DeleteFunc: func(obj interface{}) { - nsc.handleEndpointsDelete(obj) + nsc.handleEndpointSliceDelete(obj) }, } } -func (nsc *NetworkServicesController) handleEndpointsAdd(obj interface{}) { - endpoints, ok := obj.(*v1.Endpoints) +func (nsc *NetworkServicesController) handleEndpointSliceAdd(obj interface{}) { + endpoints, ok := obj.(*discovery.EndpointSlice) if !ok { klog.Errorf("unexpected object type: %v", obj) return @@ -1688,13 +1760,13 @@ func (nsc *NetworkServicesController) handleEndpointsAdd(obj interface{}) { nsc.OnEndpointsUpdate(endpoints) } -func (nsc *NetworkServicesController) handleEndpointsUpdate(oldObj, newObj interface{}) { - _, ok := oldObj.(*v1.Endpoints) +func (nsc *NetworkServicesController) handleEndpointSliceUpdate(oldObj, newObj interface{}) { + _, ok := oldObj.(*discovery.EndpointSlice) if !ok { klog.Errorf("unexpected object type: %v", oldObj) return } - newEndpoints, ok := newObj.(*v1.Endpoints) + newEndpoints, ok := newObj.(*discovery.EndpointSlice) if !ok { klog.Errorf("unexpected object type: %v", newObj) return @@ -1702,15 +1774,15 @@ func (nsc *NetworkServicesController) handleEndpointsUpdate(oldObj, newObj inter nsc.OnEndpointsUpdate(newEndpoints) } -func (nsc *NetworkServicesController) handleEndpointsDelete(obj interface{}) { - endpoints, ok := obj.(*v1.Endpoints) +func (nsc *NetworkServicesController) handleEndpointSliceDelete(obj interface{}) { + endpoints, ok := obj.(*discovery.EndpointSlice) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { klog.Errorf("unexpected object type: %v", obj) return } - if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok { + if endpoints, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok { klog.Errorf("unexpected object type: %v", obj) return } @@ -1774,7 +1846,7 @@ func (nsc *NetworkServicesController) handleServiceDelete(obj interface{}) { // NewNetworkServicesController returns NetworkServicesController object func NewNetworkServicesController(clientset kubernetes.Interface, config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer, - epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, + epSliceInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkServicesController, error) { var err error @@ -1810,7 +1882,7 @@ func NewNetworkServicesController(clientset kubernetes.Interface, nsc.globalHairpin = config.GlobalHairpinMode nsc.serviceMap = make(serviceInfoMap) - nsc.endpointsMap = make(endpointsInfoMap) + nsc.endpointsMap = make(endpointSliceInfoMap) nsc.client = clientset nsc.ProxyFirewallSetup = sync.NewCond(&sync.Mutex{}) @@ -1929,8 +2001,8 @@ func NewNetworkServicesController(clientset kubernetes.Interface, nsc.ipvsPermitAll = config.IpvsPermitAll - nsc.epLister = epInformer.GetIndexer() - nsc.EndpointsEventHandler = nsc.newEndpointsEventHandler() + nsc.epSliceLister = epSliceInformer.GetIndexer() + nsc.EndpointSliceEventHandler = nsc.newEndpointSliceEventHandler() return &nsc, nil } diff --git a/pkg/controllers/proxy/network_services_controller_test.go b/pkg/controllers/proxy/network_services_controller_test.go index 91f22e9b..7ae6486d 100644 --- a/pkg/controllers/proxy/network_services_controller_test.go +++ b/pkg/controllers/proxy/network_services_controller_test.go @@ -148,10 +148,10 @@ var _ = Describe("NetworkServicesController", func() { startInformersForServiceProxy(nsc, clientset) waitForListerWithTimeoutG(nsc.svcLister, time.Second*10) - waitForListerWithTimeoutG(nsc.epLister, time.Second*10) + waitForListerWithTimeoutG(nsc.epSliceLister, time.Second*10) nsc.serviceMap = nsc.buildServicesInfo() - nsc.endpointsMap = nsc.buildEndpointsInfo() + nsc.endpointsMap = nsc.buildEndpointSliceInfo() }) Context("service no endpoints with externalIPs", func() { var fooSvc1, fooSvc2 *ipvs.Service @@ -503,7 +503,7 @@ func startInformersForServiceProxy(nsc *NetworkServicesController, clientset kub informerFactory.WaitForCacheSync(nil) nsc.svcLister = svcInformer.GetIndexer() - nsc.epLister = epInformer.GetIndexer() + nsc.epSliceLister = epInformer.GetIndexer() nsc.podLister = podInformer.GetIndexer() } diff --git a/pkg/controllers/proxy/service_endpoints_sync.go b/pkg/controllers/proxy/service_endpoints_sync.go index 6211fcdc..00ec431e 100644 --- a/pkg/controllers/proxy/service_endpoints_sync.go +++ b/pkg/controllers/proxy/service_endpoints_sync.go @@ -21,7 +21,7 @@ import ( // sync the ipvs service and server details configured to reflect the desired state of Kubernetes services // and endpoints as learned from services and endpoints information from the api server func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, - endpointsInfoMap endpointsInfoMap) error { + endpointsInfoMap endpointSliceInfoMap) error { start := time.Now() defer func() { endTime := time.Since(start) @@ -103,7 +103,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf } func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serviceInfoMap, - endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { + endpointsInfoMap endpointSliceInfoMap, activeServiceEndpointMap map[string][]string) error { ipvsSvcs, err := nsc.ln.ipvsGetServices() if err != nil { return errors.New("Failed get list of IPVS services due to: " + err.Error()) @@ -181,7 +181,7 @@ func (nsc *NetworkServicesController) addIPVSService(ipvsSvcs []*ipvs.Service, s return ipvsSvcs, svcID, ipvsService } -func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endpointsInfo, +func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endpointSliceInfo, svcEndpointMap map[string][]string, svc *serviceInfo, svcID string, ipvsSvc *ipvs.Service, vip net.IP) { var family v1.IPFamily if vip.To4() != nil { @@ -203,14 +203,14 @@ func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endp switch family { case v1.IPv4Protocol: - if eIP.To4() == nil { + if endpoint.isIPv6 { klog.V(3).Infof("not adding endpoint %s to service %s with VIP %s because families don't "+ "match", endpoint.ip, svc.name, vip) continue } syscallINET = syscall.AF_INET case v1.IPv6Protocol: - if eIP.To4() != nil { + if endpoint.isIPv4 { klog.V(3).Infof("not adding endpoint %s to service %s with VIP %s because families don't "+ "match", endpoint.ip, svc.name, vip) continue @@ -235,7 +235,7 @@ func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endp } func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap serviceInfoMap, - endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { + endpointsInfoMap endpointSliceInfoMap, activeServiceEndpointMap map[string][]string) error { ipvsSvcs, err := nsc.ln.ipvsGetServices() if err != nil { return errors.New("Failed get list of IPVS services due to: " + err.Error()) @@ -312,7 +312,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi } func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap serviceInfoMap, - endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { + endpointsInfoMap endpointSliceInfoMap, activeServiceEndpointMap map[string][]string) error { for k, svc := range serviceInfoMap { endpoints := endpointsInfoMap[k] // First we check to see if this is a local service and that it has any active endpoints, if it doesn't there @@ -368,7 +368,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser // the IPVS service to the host if it is missing, and setting up the dummy interface to be able to receive traffic on // the node. func (nsc *NetworkServicesController) setupExternalIPForService(svc *serviceInfo, externalIP net.IP, - endpoints []endpointsInfo, svcEndpointMap map[string][]string) error { + endpoints []endpointSliceInfo, svcEndpointMap map[string][]string) error { // Get everything we need to get setup to process the external IP protocol := convertSvcProtoToSysCallProto(svc.protocol) var nodeIP net.IP @@ -433,7 +433,7 @@ func (nsc *NetworkServicesController) setupExternalIPForService(svc *serviceInfo // based on FWMARK to enable direct server return functionality. DSR requires a director without a VIP // http://www.austintek.com/LVS/LVS-HOWTO/HOWTO/LVS-HOWTO.routing_to_VIP-less_director.html to avoid martian packets func (nsc *NetworkServicesController) setupExternalIPForDSRService(svc *serviceInfo, externalIP net.IP, - endpoints []endpointsInfo, svcEndpointMap map[string][]string) error { + endpoints []endpointSliceInfo, svcEndpointMap map[string][]string) error { // Get everything we need to get setup to process the external IP protocol := convertSvcProtoToSysCallProto(svc.protocol) var nodeIP net.IP diff --git a/pkg/controllers/proxy/utils.go b/pkg/controllers/proxy/utils.go index 55b003e5..d868810b 100644 --- a/pkg/controllers/proxy/utils.go +++ b/pkg/controllers/proxy/utils.go @@ -116,7 +116,7 @@ func (nsc *NetworkServicesController) lookupServiceByFWMark(fwMark uint32) (stri // unsortedListsEquivalent compares two lists of endpointsInfo and considers them the same if they contains the same // contents regardless of order. Returns true if both lists contain the same contents. -func unsortedListsEquivalent(a, b []endpointsInfo) bool { +func unsortedListsEquivalent(a, b []endpointSliceInfo) bool { if len(a) != len(b) { return false } @@ -140,7 +140,7 @@ func unsortedListsEquivalent(a, b []endpointsInfo) bool { // endpointsMapsEquivalent compares two maps of endpointsInfoMap to see if they have the same keys and values. Returns // true if both maps contain the same keys and values. -func endpointsMapsEquivalent(a, b endpointsInfoMap) bool { +func endpointsMapsEquivalent(a, b endpointSliceInfoMap) bool { if len(a) != len(b) { return false } diff --git a/pkg/utils/service.go b/pkg/utils/service.go index 3892303e..0059cde0 100644 --- a/pkg/utils/service.go +++ b/pkg/utils/service.go @@ -1,10 +1,13 @@ package utils import ( + "fmt" "strings" v1core "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" ) const ( @@ -17,6 +20,88 @@ func ServiceForEndpoints(ci *cache.Indexer, ep *v1core.Endpoints) (interface{}, if err != nil { return nil, false, err } + klog.V(2).Infof("key for looking up service from Endpoint is: %s", key) + + item, exists, err := (*ci).GetByKey(key) + if err != nil { + return nil, false, err + } + + if !exists { + return nil, false, nil + } + + return item, true, nil +} + +// ServiceNameforEndpointSlice returns the name of the service that created the EndpointSlice for a given EndpointSlice +// +// With endpoints, the name of the endpoint object always matches the service object, however when it comes to +// EndpointSlices, things work a bit different as k8s' controller will autogenerated it (something like: foo-kl29b) +// +// We can get service information from a number of spots: +// * From the ownerReferences in the metadata EndpointSlice -> metadata -> ownerReferences[0] -> name +// * We can also get this from the label: kubernetes.io/service-name +// * generateName will also contain the prefix for the autogenerated name which should align with our service name +// +// We'll all through all of these and do our best to identify the service's name, if we aren't able to find any of these +// or they disagree with each other we'll throw an error +func ServiceNameforEndpointSlice(es *discovery.EndpointSlice) (string, error) { + const serviceNameLabel = "kubernetes.io/service-name" + var ownerRefName, labelSvcName, generateName, finalSvcName string + + ownerRef := es.GetObjectMeta().GetOwnerReferences() + if len(ownerRef) == 1 { + ownerRefName = ownerRef[0].Name + } + + labels := es.GetObjectMeta().GetLabels() + if svcLabel, ok := labels[serviceNameLabel]; ok { + labelSvcName = svcLabel + } + + if es.GetObjectMeta().GetGenerateName() != "" { + generateName = strings.TrimRight(es.GetObjectMeta().GetGenerateName(), "-") + } + + if ownerRefName == "" && labelSvcName == "" && generateName == "" { + return "", fmt.Errorf("all identifiers for service are empty on this EndpointSlice, unable to determine "+ + "owning service for: %s/%s", es.Namespace, es.Name) + } + + // Take things in an order of precedence here: generateName < ownerRefName < labelSvcName + finalSvcName = generateName + if ownerRefName != "" { + finalSvcName = ownerRefName + } + if labelSvcName != "" { + finalSvcName = labelSvcName + } + + // At this point we do some checks to ensure that the final owning service name is sane. Specifically, we want to + // check it against labelSvcName and ownerRefName if they were not blank and return error if they don't agree. We + // don't worry about generateName as that is less conclusive. + // + // From above, we already know that if labelSvcName was not blank then it is equal to finalSvcName, so we only need + // to worry about ownerRefName + if ownerRefName != "" && finalSvcName != ownerRefName { + return "", fmt.Errorf("the ownerReferences field on EndpointSlice (%s) doesn't agree with with the %s label "+ + "(%s) for %s/%s EndpointSlice", ownerRefName, serviceNameLabel, labelSvcName, es.Namespace, es.Name) + } + + return finalSvcName, nil +} + +// ServiceForEndpoints given EndpointSlice object return Service API object if it exists +func ServiceForEndpointSlice(ci *cache.Indexer, es *discovery.EndpointSlice) (interface{}, bool, error) { + svcName, err := ServiceNameforEndpointSlice(es) + if err != nil { + return nil, false, err + } + + // The key that we're looking for here is just / + key := fmt.Sprintf("%s/%s", es.Namespace, svcName) + klog.V(2).Infof("key for looking up service from EndpointSlice is: %s", key) item, exists, err := (*ci).GetByKey(key) if err != nil {