diff --git a/daemonset/generic-kuberouter-all-features-advertise-routes.yaml b/daemonset/generic-kuberouter-all-features-advertise-routes.yaml index bc896456..157d82aa 100644 --- a/daemonset/generic-kuberouter-all-features-advertise-routes.yaml +++ b/daemonset/generic-kuberouter-all-features-advertise-routes.yaml @@ -224,6 +224,14 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch --- kind: ClusterRoleBinding diff --git a/daemonset/generic-kuberouter-all-features.yaml b/daemonset/generic-kuberouter-all-features.yaml index ff31bd3d..cd0e0db1 100644 --- a/daemonset/generic-kuberouter-all-features.yaml +++ b/daemonset/generic-kuberouter-all-features.yaml @@ -220,6 +220,14 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/generic-kuberouter-only-advertise-routes.yaml b/daemonset/generic-kuberouter-only-advertise-routes.yaml index 6a3d0d3f..d905604b 100644 --- a/daemonset/generic-kuberouter-only-advertise-routes.yaml +++ b/daemonset/generic-kuberouter-only-advertise-routes.yaml @@ -132,7 +132,15 @@ rules: - services/status verbs: - update - + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/generic-kuberouter.yaml b/daemonset/generic-kuberouter.yaml index 52eab6da..a44dd61d 100644 --- a/daemonset/generic-kuberouter.yaml +++ b/daemonset/generic-kuberouter.yaml @@ -187,7 +187,15 @@ rules: - services/status verbs: - update - + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter-all-features-dsr.yaml b/daemonset/kubeadm-kuberouter-all-features-dsr.yaml index 301b04f9..974f1d81 100644 --- a/daemonset/kubeadm-kuberouter-all-features-dsr.yaml +++ b/daemonset/kubeadm-kuberouter-all-features-dsr.yaml @@ -203,6 +203,15 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter-all-features-hostport.yaml b/daemonset/kubeadm-kuberouter-all-features-hostport.yaml index 13e6061b..72e8b196 100644 --- a/daemonset/kubeadm-kuberouter-all-features-hostport.yaml +++ b/daemonset/kubeadm-kuberouter-all-features-hostport.yaml @@ -202,6 +202,15 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter-all-features.yaml b/daemonset/kubeadm-kuberouter-all-features.yaml index 9e25d9fa..a91f8785 100644 --- a/daemonset/kubeadm-kuberouter-all-features.yaml +++ b/daemonset/kubeadm-kuberouter-all-features.yaml @@ -195,6 +195,15 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/daemonset/kubeadm-kuberouter.yaml b/daemonset/kubeadm-kuberouter.yaml index 8093176f..b5df7b47 100644 --- a/daemonset/kubeadm-kuberouter.yaml +++ b/daemonset/kubeadm-kuberouter.yaml @@ -191,6 +191,15 @@ rules: - services/status verbs: - update + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch + --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/pkg/cmd/kube-router.go b/pkg/cmd/kube-router.go index 59b8c134..d64604f9 100644 --- a/pkg/cmd/kube-router.go +++ b/pkg/cmd/kube-router.go @@ -97,6 +97,7 @@ func (kr *KubeRouter) Run() error { informerFactory := informers.NewSharedInformerFactory(kr.Client, 0) svcInformer := informerFactory.Core().V1().Services().Informer() epInformer := informerFactory.Core().V1().Endpoints().Informer() + epSliceInformer := informerFactory.Discovery().V1().EndpointSlices().Informer() podInformer := informerFactory.Core().V1().Pods().Informer() nodeInformer := informerFactory.Core().V1().Nodes().Informer() nsInformer := informerFactory.Core().V1().Namespaces().Informer() @@ -177,7 +178,7 @@ func (kr *KubeRouter) Run() error { if kr.Config.RunServiceProxy { nsc, err := proxy.NewNetworkServicesController(kr.Client, kr.Config, - svcInformer, epInformer, podInformer, &ipsetMutex) + svcInformer, epSliceInformer, podInformer, &ipsetMutex) if err != nil { return fmt.Errorf("failed to create network services controller: %v", err) } @@ -186,7 +187,7 @@ func (kr *KubeRouter) Run() error { if err != nil { return fmt.Errorf("failed to add ServiceEventHandler: %v", err) } - _, err = epInformer.AddEventHandler(nsc.EndpointsEventHandler) + _, err = epSliceInformer.AddEventHandler(nsc.EndpointSliceEventHandler) if err != nil { return fmt.Errorf("failed to add EndpointsEventHandler: %v", err) } diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 5386ca4d..1395ab80 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -20,6 +20,7 @@ import ( "github.com/moby/ipvs" "github.com/vishvananda/netlink" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -103,7 +104,7 @@ type NetworkServicesController struct { syncPeriod time.Duration mu sync.Mutex serviceMap serviceInfoMap - endpointsMap endpointsInfoMap + endpointsMap endpointSliceInfoMap podCidr string excludedCidrs []net.IPNet masqueradeAll bool @@ -124,12 +125,12 @@ type NetworkServicesController struct { serviceIPPortsIPSet map[v1.IPFamily]*utils.Set serviceIPsIPSet map[v1.IPFamily]*utils.Set - svcLister cache.Indexer - epLister cache.Indexer - podLister cache.Indexer + svcLister cache.Indexer + epSliceLister cache.Indexer + podLister cache.Indexer - EndpointsEventHandler cache.ResourceEventHandler - ServiceEventHandler cache.ResourceEventHandler + EndpointSliceEventHandler cache.ResourceEventHandler + ServiceEventHandler cache.ResourceEventHandler gracefulPeriod time.Duration gracefulQueue gracefulQueue @@ -204,14 +205,16 @@ type schedFlags struct { type serviceInfoMap map[string]*serviceInfo // internal representation of endpoints -type endpointsInfo struct { +type endpointSliceInfo struct { ip string port int isLocal bool + isIPv4 bool + isIPv6 bool } // map of all endpoints, with unique service id(namespace name, service name, port) as key -type endpointsInfoMap map[string][]endpointsInfo +type endpointSliceInfoMap map[string][]endpointSliceInfo // Run periodically sync ipvs configuration to reflect desired state of services and endpoints func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, @@ -369,7 +372,7 @@ func (nsc *NetworkServicesController) doSync() error { } nsc.serviceMap = nsc.buildServicesInfo() - nsc.endpointsMap = nsc.buildEndpointsInfo() + nsc.endpointsMap = nsc.buildEndpointSliceInfo() err = nsc.syncHairpinIptablesRules() if err != nil { klog.Errorf("Error syncing hairpin iptables rules: %s", err.Error()) @@ -766,19 +769,15 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM } // OnEndpointsUpdate handle change in endpoints update from the API server -func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *v1.Endpoints) { - - if isEndpointsForLeaderElection(ep) { - return - } +func (nsc *NetworkServicesController) OnEndpointsUpdate(es *discovery.EndpointSlice) { nsc.mu.Lock() defer nsc.mu.Unlock() - klog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name) + klog.V(1).Infof("Received update to EndpointSlice: %s/%s from watch API", es.Namespace, es.Name) if !nsc.readyForUpdates { - klog.V(3).Infof( - "Skipping update to endpoint: %s/%s as controller is not ready to process service and endpoints updates", - ep.Namespace, ep.Name) + klog.V(1).Infof( + "Skipping update to EndpointSlice: %s/%s as controller is not ready to process service and endpoints "+ + "updates", es.Namespace, es.Name) return } @@ -786,33 +785,34 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *v1.Endpoints) { // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't // need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a // ClusterIP before. - svc, exists, err := utils.ServiceForEndpoints(&nsc.svcLister, ep) + svc, exists, err := utils.ServiceForEndpointSlice(&nsc.svcLister, es) if err != nil { - klog.Errorf("failed to convert endpoints resource to service: %s", err) + klog.Errorf("failed to convert endpoints resource to service for %s/%s: %v", es.Namespace, es.Name, err) return } // ignore updates to Endpoints object with no corresponding Service object if !exists { + klog.Warningf("failed to lookup any service as an owner for %s/%s", es.Namespace, es.Name) return } if utils.ServiceIsHeadless(svc) { klog.V(1).Infof("The service associated with endpoint: %s/%s is headless, skipping...", - ep.Namespace, ep.Name) + es.Namespace, es.Name) return } // build new service and endpoints map to reflect the change newServiceMap := nsc.buildServicesInfo() - newEndpointsMap := nsc.buildEndpointsInfo() + newEndpointsMap := nsc.buildEndpointSliceInfo() if !endpointsMapsEquivalent(newEndpointsMap, nsc.endpointsMap) { nsc.endpointsMap = newEndpointsMap nsc.serviceMap = newServiceMap - klog.V(1).Infof("Syncing IPVS services sync for update to endpoint: %s/%s", ep.Namespace, ep.Name) + klog.V(1).Infof("Syncing IPVS services sync for update to endpoint: %s/%s", es.Namespace, es.Name) nsc.sync(synctypeIpvs) } else { klog.V(1).Infof("Skipping IPVS services sync on endpoint: %s/%s update as nothing changed", - ep.Namespace, ep.Name) + es.Namespace, es.Name) } } @@ -841,7 +841,7 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *v1.Service) { // build new service and endpoints map to reflect the change newServiceMap := nsc.buildServicesInfo() - newEndpointsMap := nsc.buildEndpointsInfo() + newEndpointsMap := nsc.buildEndpointSliceInfo() if len(newServiceMap) != len(nsc.serviceMap) || !reflect.DeepEqual(newServiceMap, nsc.serviceMap) { nsc.endpointsMap = newEndpointsMap @@ -854,7 +854,7 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *v1.Service) { } } -func hasActiveEndpoints(endpoints []endpointsInfo) bool { +func hasActiveEndpoints(endpoints []endpointSliceInfo) bool { for _, endpoint := range endpoints { if endpoint.isLocal { return true @@ -983,7 +983,7 @@ func parseSchedFlags(value string) schedFlags { return schedFlags{flag1, flag2, flag3} } -func shuffle(endPoints []endpointsInfo) []endpointsInfo { +func shuffle(endPoints []endpointSliceInfo) []endpointSliceInfo { for index1 := range endPoints { randBitInt, err := rand.Int(rand.Reader, big.NewInt(int64(index1+1))) index2 := randBitInt.Int64() @@ -995,18 +995,84 @@ func shuffle(endPoints []endpointsInfo) []endpointsInfo { return endPoints } -func (nsc *NetworkServicesController) buildEndpointsInfo() endpointsInfoMap { - endpointsMap := make(endpointsInfoMap) - for _, obj := range nsc.epLister.List() { - ep := obj.(*v1.Endpoints) +// buildEndpointSliceInfo creates a map of EndpointSlices taken at a moment in time +func (nsc *NetworkServicesController) buildEndpointSliceInfo() endpointSliceInfoMap { + endpointsMap := make(endpointSliceInfoMap) + for _, obj := range nsc.epSliceLister.List() { + var isIPv4, isIPv6 bool + es := obj.(*discovery.EndpointSlice) + switch es.AddressType { + case discovery.AddressTypeIPv4: + isIPv4 = true + case discovery.AddressTypeIPv6: + isIPv6 = true + case discovery.AddressTypeFQDN: + // At this point we don't handle FQDN type EndpointSlices, at some point in the future this might change + continue + default: + // If at some point k8s adds more AddressTypes, we'd prefer to handle them manually to ensure consistent + // functionality within kube-router + continue + } - for _, epSubset := range ep.Subsets { - for _, port := range epSubset.Ports { - svcID := generateServiceID(ep.Namespace, ep.Name, port.Name) - endpoints := make([]endpointsInfo, 0) - for _, addr := range epSubset.Addresses { - isLocal := addr.NodeName != nil && *addr.NodeName == nsc.nodeHostName - endpoints = append(endpoints, endpointsInfo{ip: addr.IP, port: int(port.Port), isLocal: isLocal}) + // In order to properly link the endpoint with the service, we need the service's name + svcName, err := utils.ServiceNameforEndpointSlice(es) + if err != nil { + klog.Errorf("unable to lookup service from EndpointSlice, skipping: %v", err) + continue + } + + // Keep in mind that ports aren't embedded in Endpoints, but we do need to make an endpointSliceInfo and a svcID + // for each pair, so we consume them as an inter and outer loop. Actual structure of EndpointSlice looks like: + // + // metadata: + // name: ... + // namespace: ... + // endpoints: + // - addresses: + // - 10.0.0.1 + // conditions: + // ready: (true|false) + // nodeName: foo + // targetRef: + // kind: Pod + // name: bar + // zone: z1 + // ports: + // - name: baz + // port: 8080 + // protocol: TCP + // + for _, ep := range es.Endpoints { + // Previously, when we used endpoints, we only looked at subsets.addresses and not subsets.notReadyAddresses + // so here we need to limit our endpoints to only the ones that are ready. In the future, we could consider + // changing this to .Serving which continues to include pods that are in Terminating state. For now we keep + // it the same. + if !*ep.Conditions.Ready { + continue + } + + for _, port := range es.Ports { + var endpoints []endpointSliceInfo + var ok bool + + svcID := generateServiceID(es.Namespace, svcName, *port.Name) + + // we may have already started to populate endpoints for this service from another EndpointSlice, if so + // continue where we left off, otherwise create a new slice + if endpoints, ok = endpointsMap[svcID]; !ok { + endpoints = make([]endpointSliceInfo, 0) + } + + for _, addr := range ep.Addresses { + isLocal := ep.NodeName != nil && *ep.NodeName == nsc.nodeHostName + endpoints = append(endpoints, endpointSliceInfo{ + ip: addr, + port: int(*port.Port), + isLocal: isLocal, + isIPv4: isIPv4, + isIPv6: isIPv6, + }) } endpointsMap[svcID] = shuffle(endpoints) } @@ -1589,6 +1655,12 @@ func routeVIPTrafficToDirector(fwmark string, family v1.IPFamily) error { return nil } +// isEndpointsForLeaderElection checks to see if this change has to do with leadership elections +// +// Deprecated: this is no longer used because we use EndpointSlices instead of Endpoints in the NSC now, this is +// currently preserved for posterity, but will be removed in the future if it is no longer used +// +//nolint:unused // We understand that this function is unused, but we want to keep it for now func isEndpointsForLeaderElection(ep *v1.Endpoints) bool { _, isLeaderElection := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey] return isLeaderElection @@ -1664,23 +1736,23 @@ func (nsc *NetworkServicesController) Cleanup() { klog.Infof("Successfully cleaned the NetworkServiceController configuration done by kube-router") } -func (nsc *NetworkServicesController) newEndpointsEventHandler() cache.ResourceEventHandler { +func (nsc *NetworkServicesController) newEndpointSliceEventHandler() cache.ResourceEventHandler { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - nsc.handleEndpointsAdd(obj) + nsc.handleEndpointSliceAdd(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { - nsc.handleEndpointsUpdate(oldObj, newObj) + nsc.handleEndpointSliceUpdate(oldObj, newObj) }, DeleteFunc: func(obj interface{}) { - nsc.handleEndpointsDelete(obj) + nsc.handleEndpointSliceDelete(obj) }, } } -func (nsc *NetworkServicesController) handleEndpointsAdd(obj interface{}) { - endpoints, ok := obj.(*v1.Endpoints) +func (nsc *NetworkServicesController) handleEndpointSliceAdd(obj interface{}) { + endpoints, ok := obj.(*discovery.EndpointSlice) if !ok { klog.Errorf("unexpected object type: %v", obj) return @@ -1688,13 +1760,13 @@ func (nsc *NetworkServicesController) handleEndpointsAdd(obj interface{}) { nsc.OnEndpointsUpdate(endpoints) } -func (nsc *NetworkServicesController) handleEndpointsUpdate(oldObj, newObj interface{}) { - _, ok := oldObj.(*v1.Endpoints) +func (nsc *NetworkServicesController) handleEndpointSliceUpdate(oldObj, newObj interface{}) { + _, ok := oldObj.(*discovery.EndpointSlice) if !ok { klog.Errorf("unexpected object type: %v", oldObj) return } - newEndpoints, ok := newObj.(*v1.Endpoints) + newEndpoints, ok := newObj.(*discovery.EndpointSlice) if !ok { klog.Errorf("unexpected object type: %v", newObj) return @@ -1702,15 +1774,15 @@ func (nsc *NetworkServicesController) handleEndpointsUpdate(oldObj, newObj inter nsc.OnEndpointsUpdate(newEndpoints) } -func (nsc *NetworkServicesController) handleEndpointsDelete(obj interface{}) { - endpoints, ok := obj.(*v1.Endpoints) +func (nsc *NetworkServicesController) handleEndpointSliceDelete(obj interface{}) { + endpoints, ok := obj.(*discovery.EndpointSlice) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { klog.Errorf("unexpected object type: %v", obj) return } - if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok { + if endpoints, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok { klog.Errorf("unexpected object type: %v", obj) return } @@ -1774,7 +1846,7 @@ func (nsc *NetworkServicesController) handleServiceDelete(obj interface{}) { // NewNetworkServicesController returns NetworkServicesController object func NewNetworkServicesController(clientset kubernetes.Interface, config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer, - epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, + epSliceInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkServicesController, error) { var err error @@ -1810,7 +1882,7 @@ func NewNetworkServicesController(clientset kubernetes.Interface, nsc.globalHairpin = config.GlobalHairpinMode nsc.serviceMap = make(serviceInfoMap) - nsc.endpointsMap = make(endpointsInfoMap) + nsc.endpointsMap = make(endpointSliceInfoMap) nsc.client = clientset nsc.ProxyFirewallSetup = sync.NewCond(&sync.Mutex{}) @@ -1929,8 +2001,8 @@ func NewNetworkServicesController(clientset kubernetes.Interface, nsc.ipvsPermitAll = config.IpvsPermitAll - nsc.epLister = epInformer.GetIndexer() - nsc.EndpointsEventHandler = nsc.newEndpointsEventHandler() + nsc.epSliceLister = epSliceInformer.GetIndexer() + nsc.EndpointSliceEventHandler = nsc.newEndpointSliceEventHandler() return &nsc, nil } diff --git a/pkg/controllers/proxy/network_services_controller_test.go b/pkg/controllers/proxy/network_services_controller_test.go index 91f22e9b..7ae6486d 100644 --- a/pkg/controllers/proxy/network_services_controller_test.go +++ b/pkg/controllers/proxy/network_services_controller_test.go @@ -148,10 +148,10 @@ var _ = Describe("NetworkServicesController", func() { startInformersForServiceProxy(nsc, clientset) waitForListerWithTimeoutG(nsc.svcLister, time.Second*10) - waitForListerWithTimeoutG(nsc.epLister, time.Second*10) + waitForListerWithTimeoutG(nsc.epSliceLister, time.Second*10) nsc.serviceMap = nsc.buildServicesInfo() - nsc.endpointsMap = nsc.buildEndpointsInfo() + nsc.endpointsMap = nsc.buildEndpointSliceInfo() }) Context("service no endpoints with externalIPs", func() { var fooSvc1, fooSvc2 *ipvs.Service @@ -503,7 +503,7 @@ func startInformersForServiceProxy(nsc *NetworkServicesController, clientset kub informerFactory.WaitForCacheSync(nil) nsc.svcLister = svcInformer.GetIndexer() - nsc.epLister = epInformer.GetIndexer() + nsc.epSliceLister = epInformer.GetIndexer() nsc.podLister = podInformer.GetIndexer() } diff --git a/pkg/controllers/proxy/service_endpoints_sync.go b/pkg/controllers/proxy/service_endpoints_sync.go index 6211fcdc..00ec431e 100644 --- a/pkg/controllers/proxy/service_endpoints_sync.go +++ b/pkg/controllers/proxy/service_endpoints_sync.go @@ -21,7 +21,7 @@ import ( // sync the ipvs service and server details configured to reflect the desired state of Kubernetes services // and endpoints as learned from services and endpoints information from the api server func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, - endpointsInfoMap endpointsInfoMap) error { + endpointsInfoMap endpointSliceInfoMap) error { start := time.Now() defer func() { endTime := time.Since(start) @@ -103,7 +103,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf } func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serviceInfoMap, - endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { + endpointsInfoMap endpointSliceInfoMap, activeServiceEndpointMap map[string][]string) error { ipvsSvcs, err := nsc.ln.ipvsGetServices() if err != nil { return errors.New("Failed get list of IPVS services due to: " + err.Error()) @@ -181,7 +181,7 @@ func (nsc *NetworkServicesController) addIPVSService(ipvsSvcs []*ipvs.Service, s return ipvsSvcs, svcID, ipvsService } -func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endpointsInfo, +func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endpointSliceInfo, svcEndpointMap map[string][]string, svc *serviceInfo, svcID string, ipvsSvc *ipvs.Service, vip net.IP) { var family v1.IPFamily if vip.To4() != nil { @@ -203,14 +203,14 @@ func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endp switch family { case v1.IPv4Protocol: - if eIP.To4() == nil { + if endpoint.isIPv6 { klog.V(3).Infof("not adding endpoint %s to service %s with VIP %s because families don't "+ "match", endpoint.ip, svc.name, vip) continue } syscallINET = syscall.AF_INET case v1.IPv6Protocol: - if eIP.To4() != nil { + if endpoint.isIPv4 { klog.V(3).Infof("not adding endpoint %s to service %s with VIP %s because families don't "+ "match", endpoint.ip, svc.name, vip) continue @@ -235,7 +235,7 @@ func (nsc *NetworkServicesController) addEndpointsToIPVSService(endpoints []endp } func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap serviceInfoMap, - endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { + endpointsInfoMap endpointSliceInfoMap, activeServiceEndpointMap map[string][]string) error { ipvsSvcs, err := nsc.ln.ipvsGetServices() if err != nil { return errors.New("Failed get list of IPVS services due to: " + err.Error()) @@ -312,7 +312,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi } func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap serviceInfoMap, - endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { + endpointsInfoMap endpointSliceInfoMap, activeServiceEndpointMap map[string][]string) error { for k, svc := range serviceInfoMap { endpoints := endpointsInfoMap[k] // First we check to see if this is a local service and that it has any active endpoints, if it doesn't there @@ -368,7 +368,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser // the IPVS service to the host if it is missing, and setting up the dummy interface to be able to receive traffic on // the node. func (nsc *NetworkServicesController) setupExternalIPForService(svc *serviceInfo, externalIP net.IP, - endpoints []endpointsInfo, svcEndpointMap map[string][]string) error { + endpoints []endpointSliceInfo, svcEndpointMap map[string][]string) error { // Get everything we need to get setup to process the external IP protocol := convertSvcProtoToSysCallProto(svc.protocol) var nodeIP net.IP @@ -433,7 +433,7 @@ func (nsc *NetworkServicesController) setupExternalIPForService(svc *serviceInfo // based on FWMARK to enable direct server return functionality. DSR requires a director without a VIP // http://www.austintek.com/LVS/LVS-HOWTO/HOWTO/LVS-HOWTO.routing_to_VIP-less_director.html to avoid martian packets func (nsc *NetworkServicesController) setupExternalIPForDSRService(svc *serviceInfo, externalIP net.IP, - endpoints []endpointsInfo, svcEndpointMap map[string][]string) error { + endpoints []endpointSliceInfo, svcEndpointMap map[string][]string) error { // Get everything we need to get setup to process the external IP protocol := convertSvcProtoToSysCallProto(svc.protocol) var nodeIP net.IP diff --git a/pkg/controllers/proxy/utils.go b/pkg/controllers/proxy/utils.go index 55b003e5..d868810b 100644 --- a/pkg/controllers/proxy/utils.go +++ b/pkg/controllers/proxy/utils.go @@ -116,7 +116,7 @@ func (nsc *NetworkServicesController) lookupServiceByFWMark(fwMark uint32) (stri // unsortedListsEquivalent compares two lists of endpointsInfo and considers them the same if they contains the same // contents regardless of order. Returns true if both lists contain the same contents. -func unsortedListsEquivalent(a, b []endpointsInfo) bool { +func unsortedListsEquivalent(a, b []endpointSliceInfo) bool { if len(a) != len(b) { return false } @@ -140,7 +140,7 @@ func unsortedListsEquivalent(a, b []endpointsInfo) bool { // endpointsMapsEquivalent compares two maps of endpointsInfoMap to see if they have the same keys and values. Returns // true if both maps contain the same keys and values. -func endpointsMapsEquivalent(a, b endpointsInfoMap) bool { +func endpointsMapsEquivalent(a, b endpointSliceInfoMap) bool { if len(a) != len(b) { return false } diff --git a/pkg/utils/service.go b/pkg/utils/service.go index 3892303e..0059cde0 100644 --- a/pkg/utils/service.go +++ b/pkg/utils/service.go @@ -1,10 +1,13 @@ package utils import ( + "fmt" "strings" v1core "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" ) const ( @@ -17,6 +20,88 @@ func ServiceForEndpoints(ci *cache.Indexer, ep *v1core.Endpoints) (interface{}, if err != nil { return nil, false, err } + klog.V(2).Infof("key for looking up service from Endpoint is: %s", key) + + item, exists, err := (*ci).GetByKey(key) + if err != nil { + return nil, false, err + } + + if !exists { + return nil, false, nil + } + + return item, true, nil +} + +// ServiceNameforEndpointSlice returns the name of the service that created the EndpointSlice for a given EndpointSlice +// +// With endpoints, the name of the endpoint object always matches the service object, however when it comes to +// EndpointSlices, things work a bit different as k8s' controller will autogenerated it (something like: foo-kl29b) +// +// We can get service information from a number of spots: +// * From the ownerReferences in the metadata EndpointSlice -> metadata -> ownerReferences[0] -> name +// * We can also get this from the label: kubernetes.io/service-name +// * generateName will also contain the prefix for the autogenerated name which should align with our service name +// +// We'll all through all of these and do our best to identify the service's name, if we aren't able to find any of these +// or they disagree with each other we'll throw an error +func ServiceNameforEndpointSlice(es *discovery.EndpointSlice) (string, error) { + const serviceNameLabel = "kubernetes.io/service-name" + var ownerRefName, labelSvcName, generateName, finalSvcName string + + ownerRef := es.GetObjectMeta().GetOwnerReferences() + if len(ownerRef) == 1 { + ownerRefName = ownerRef[0].Name + } + + labels := es.GetObjectMeta().GetLabels() + if svcLabel, ok := labels[serviceNameLabel]; ok { + labelSvcName = svcLabel + } + + if es.GetObjectMeta().GetGenerateName() != "" { + generateName = strings.TrimRight(es.GetObjectMeta().GetGenerateName(), "-") + } + + if ownerRefName == "" && labelSvcName == "" && generateName == "" { + return "", fmt.Errorf("all identifiers for service are empty on this EndpointSlice, unable to determine "+ + "owning service for: %s/%s", es.Namespace, es.Name) + } + + // Take things in an order of precedence here: generateName < ownerRefName < labelSvcName + finalSvcName = generateName + if ownerRefName != "" { + finalSvcName = ownerRefName + } + if labelSvcName != "" { + finalSvcName = labelSvcName + } + + // At this point we do some checks to ensure that the final owning service name is sane. Specifically, we want to + // check it against labelSvcName and ownerRefName if they were not blank and return error if they don't agree. We + // don't worry about generateName as that is less conclusive. + // + // From above, we already know that if labelSvcName was not blank then it is equal to finalSvcName, so we only need + // to worry about ownerRefName + if ownerRefName != "" && finalSvcName != ownerRefName { + return "", fmt.Errorf("the ownerReferences field on EndpointSlice (%s) doesn't agree with with the %s label "+ + "(%s) for %s/%s EndpointSlice", ownerRefName, serviceNameLabel, labelSvcName, es.Namespace, es.Name) + } + + return finalSvcName, nil +} + +// ServiceForEndpoints given EndpointSlice object return Service API object if it exists +func ServiceForEndpointSlice(ci *cache.Indexer, es *discovery.EndpointSlice) (interface{}, bool, error) { + svcName, err := ServiceNameforEndpointSlice(es) + if err != nil { + return nil, false, err + } + + // The key that we're looking for here is just / + key := fmt.Sprintf("%s/%s", es.Namespace, svcName) + klog.V(2).Infof("key for looking up service from EndpointSlice is: %s", key) item, exists, err := (*ci).GetByKey(key) if err != nil {