From ef6e0e5e1e1641ba8fcb8464902491fd38c21021 Mon Sep 17 00:00:00 2001 From: vflaux <38909103+vflaux@users.noreply.github.com> Date: Thu, 19 Jun 2025 12:06:52 +0200 Subject: [PATCH] feat(source): use EndpointSlices instead of Endpoints for Service (#5493) * feat(source): use EndpointSlice for Service source * feat(source): use indexer for EndpointSlice listing --- .../external-dns/templates/clusterrole.yaml | 5 +- charts/external-dns/tests/rbac_test.yaml | 5 +- docs/flags.md | 2 +- pkg/apis/externaldns/types.go | 2 +- source/service.go | 100 ++++++-- source/service_fqdn_test.go | 230 +++++++++--------- source/service_test.go | 123 +++++++--- 7 files changed, 293 insertions(+), 174 deletions(-) diff --git a/charts/external-dns/templates/clusterrole.yaml b/charts/external-dns/templates/clusterrole.yaml index ec0fc6463..a3d8cb777 100644 --- a/charts/external-dns/templates/clusterrole.yaml +++ b/charts/external-dns/templates/clusterrole.yaml @@ -18,7 +18,10 @@ rules: {{- end }} {{- if or (has "service" .Values.sources) (has "contour-httpproxy" .Values.sources) (has "gloo-proxy" .Values.sources) (has "istio-gateway" .Values.sources) (has "istio-virtualservice" .Values.sources) (has "openshift-route" .Values.sources) (has "skipper-routegroup" .Values.sources) }} - apiGroups: [""] - resources: ["services","endpoints"] + resources: ["services"] + verbs: ["get","watch","list"] + - apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] verbs: ["get","watch","list"] {{- end }} {{- if or (has "ingress" .Values.sources) (has "istio-gateway" .Values.sources) (has "istio-virtualservice" .Values.sources) (has "contour-httpproxy" .Values.sources) (has "openshift-route" .Values.sources) (has "skipper-routegroup" .Values.sources) }} diff --git a/charts/external-dns/tests/rbac_test.yaml b/charts/external-dns/tests/rbac_test.yaml index fd1b8cfbc..9e061c649 100644 --- a/charts/external-dns/tests/rbac_test.yaml +++ b/charts/external-dns/tests/rbac_test.yaml @@ -43,7 +43,10 @@ tests: resources: ["pods"] verbs: ["get", "watch", "list"] - apiGroups: [""] - resources: ["services","endpoints"] + resources: ["services"] + verbs: ["get","watch","list"] + - apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] verbs: ["get","watch","list"] - apiGroups: ["extensions","networking.k8s.io"] resources: ["ingresses"] diff --git a/docs/flags.md b/docs/flags.md index c881660c3..3825292fd 100644 --- a/docs/flags.md +++ b/docs/flags.md @@ -11,7 +11,7 @@ | `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) | | `--request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout | | `--[no-]resolve-service-load-balancer-hostname` | Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs | -| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to Endpoints, for Service source (default: false) | +| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to EndpointSlices, for Service source (default: false) | | `--cf-api-endpoint=""` | The fully-qualified domain name of the cloud foundry instance you are targeting | | `--cf-username=""` | The username to log into the cloud foundry API | | `--cf-password=""` | The password to log into the cloud foundry API | diff --git a/pkg/apis/externaldns/types.go b/pkg/apis/externaldns/types.go index fa5818bd6..6ea0b4a2a 100644 --- a/pkg/apis/externaldns/types.go +++ b/pkg/apis/externaldns/types.go @@ -438,7 +438,7 @@ func App(cfg *Config) *kingpin.Application { app.Flag("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)").Default(defaultConfig.KubeConfig).StringVar(&cfg.KubeConfig) app.Flag("request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout").Default(defaultConfig.RequestTimeout.String()).DurationVar(&cfg.RequestTimeout) app.Flag("resolve-service-load-balancer-hostname", "Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs").BoolVar(&cfg.ResolveServiceLoadBalancerHostname) - app.Flag("listen-endpoint-events", "Trigger a reconcile on changes to Endpoints, for Service source (default: false)").BoolVar(&cfg.ListenEndpointEvents) + app.Flag("listen-endpoint-events", "Trigger a reconcile on changes to EndpointSlices, for Service source (default: false)").BoolVar(&cfg.ListenEndpointEvents) // Flags related to cloud foundry app.Flag("cf-api-endpoint", "The fully-qualified domain name of the cloud foundry instance you are targeting").Default(defaultConfig.CFAPIEndpoint).StringVar(&cfg.CFAPIEndpoint) diff --git a/source/service.go b/source/service.go index 3ae05a8d5..942558552 100644 --- a/source/service.go +++ b/source/service.go @@ -28,10 +28,13 @@ import ( log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" + discoveryinformers "k8s.io/client-go/informers/discovery/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -50,6 +53,7 @@ var ( v1.ServiceTypeLoadBalancer: {}, // Exposes the service externally using a cloud provider's load balancer. v1.ServiceTypeExternalName: {}, // Maps the service to an external DNS name. } + serviceNameIndexKey = "serviceName" ) // serviceSource is an implementation of Source for Kubernetes service objects. @@ -72,7 +76,7 @@ type serviceSource struct { resolveLoadBalancerHostname bool listenEndpointEvents bool serviceInformer coreinformers.ServiceInformer - endpointsInformer coreinformers.EndpointsInformer + endpointSlicesInformer discoveryinformers.EndpointSliceInformer podInformer coreinformers.PodInformer nodeInformer coreinformers.NodeInformer serviceTypeFilter *serviceTypes @@ -93,7 +97,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name // Set the resync period to 0 to prevent processing when nothing has changed informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) serviceInformer := informerFactory.Core().V1().Services() - endpointsInformer := informerFactory.Core().V1().Endpoints() + endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices() podInformer := informerFactory.Core().V1().Pods() nodeInformer := informerFactory.Core().V1().Nodes() @@ -104,7 +108,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name }, }, ) - endpointsInformer.Informer().AddEventHandler( + endpointSlicesInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { }, @@ -123,6 +127,26 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name }, ) + // Add an indexer to the EndpointSlice informer to index by the service name label + err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{ + serviceNameIndexKey: func(obj any) ([]string, error) { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + // This should never happen because the Informer should only contain EndpointSlice objects + return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj) + } + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + if serviceName == "" { + return nil, nil + } + key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String() + return []string{key}, nil + }, + }) + if err != nil { + return nil, err + } + informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. @@ -148,7 +172,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name publishHostIP: publishHostIP, alwaysPublishNotReadyAddresses: alwaysPublishNotReadyAddresses, serviceInformer: serviceInformer, - endpointsInformer: endpointsInformer, + endpointSlicesInformer: endpointSlicesInformer, podInformer: podInformer, nodeInformer: nodeInformer, serviceTypeFilter: sTypesFilter, @@ -278,42 +302,63 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri return nil } - endpointsObject, err := sc.endpointsInformer.Lister().Endpoints(svc.Namespace).Get(svc.GetName()) + serviceKey := cache.ObjectName{Namespace: svc.Namespace, Name: svc.Name}.String() + rawEndpointSlices, err := sc.endpointSlicesInformer.Informer().GetIndexer().ByIndex(serviceNameIndexKey, serviceKey) if err != nil { - log.Errorf("Get endpoints of service[%s] error:%v", svc.GetName(), err) - return endpoints + // Should never happen as long as the index exists + log.Errorf("Get EndpointSlices of service[%s] error:%v", svc.GetName(), err) + return nil + } + + endpointSlices := make([]*discoveryv1.EndpointSlice, 0, len(rawEndpointSlices)) + for _, obj := range rawEndpointSlices { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + // Should never happen as the indexer can only contain EndpointSlice objects + log.Errorf("Expected %T but got %T instead, skipping", endpointSlice, obj) + continue + } + endpointSlices = append(endpointSlices, endpointSlice) } pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector) if err != nil { - log.Errorf("List pods of service[%s] error: %v", svc.GetName(), err) + log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err) return endpoints } endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations) + publishPodIPs := endpointsType != EndpointsTypeNodeExternalIP && endpointsType != EndpointsTypeHostIP && !sc.publishHostIP + publishNotReadyAddresses := svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses targetsByHeadlessDomainAndType := make(map[endpoint.EndpointKey]endpoint.Targets) - for _, subset := range endpointsObject.Subsets { - addresses := subset.Addresses - if svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses { - addresses = append(addresses, subset.NotReadyAddresses...) - } + for _, endpointSlice := range endpointSlices { + for _, ep := range endpointSlice.Endpoints { + if !conditionToBool(ep.Conditions.Ready) && !publishNotReadyAddresses { + continue + } + + if publishPodIPs && + endpointSlice.AddressType != discoveryv1.AddressTypeIPv4 && + endpointSlice.AddressType != discoveryv1.AddressTypeIPv6 { + log.Debugf("Skipping EndpointSlice %s/%s because its address type is unsupported: %s", endpointSlice.Namespace, endpointSlice.Name, endpointSlice.AddressType) + continue + } - for _, address := range addresses { // find pod for this address - if address.TargetRef == nil || address.TargetRef.APIVersion != "" || address.TargetRef.Kind != "Pod" { - log.Debugf("Skipping address because its target is not a pod: %v", address) + if ep.TargetRef == nil || ep.TargetRef.APIVersion != "" || ep.TargetRef.Kind != "Pod" { + log.Debugf("Skipping address because its target is not a pod: %v", ep) continue } var pod *v1.Pod for _, v := range pods { - if v.Name == address.TargetRef.Name { + if v.Name == ep.TargetRef.Name { pod = v break } } if pod == nil { - log.Errorf("Pod %s not found for address %v", address.TargetRef.Name, address) + log.Errorf("Pod %s not found for address %v", ep.TargetRef.Name, ep) continue } @@ -341,8 +386,13 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri targets = endpoint.Targets{pod.Status.HostIP} log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, pod.Status.HostIP) } else { - targets = endpoint.Targets{address.IP} - log.Debugf("Generating matching endpoint %s with EndpointAddress IP %s", headlessDomain, address.IP) + if len(ep.Addresses) == 0 { + log.Warnf("EndpointSlice %s/%s has no addresses for endpoint %v", endpointSlice.Namespace, endpointSlice.Name, ep) + continue + } + address := ep.Addresses[0] // Only use the first address, as additional addresses have no semantic defined + targets = endpoint.Targets{address} + log.Debugf("Generating matching endpoint %s with EndpointSliceAddress IP %s", headlessDomain, address) } } for _, target := range targets { @@ -758,7 +808,7 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) { // https://github.com/kubernetes/kubernetes/issues/79610 sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) if sc.listenEndpointEvents { - sc.endpointsInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) + sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) } } @@ -789,3 +839,11 @@ func newServiceTypesFilter(filter []string) (*serviceTypes, error) { types: types, }, nil } + +// conditionToBool converts an EndpointConditions condition to a bool value. +func conditionToBool(v *bool) bool { + if v == nil { + return true // nil should be interpreted as "true" as per EndpointConditions spec + } + return *v +} diff --git a/source/service_fqdn_test.go b/source/service_fqdn_test.go index 49fe48cde..d277ef9d3 100644 --- a/source/service_fqdn_test.go +++ b/source/service_fqdn_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/fake" @@ -30,13 +31,13 @@ import ( func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { for _, tt := range []struct { - title string - services []*v1.Service - endpoints []*v1.Endpoints - fqdnTemplate string - combineFQDN bool - publishHostIp bool - expected []*endpoint.Endpoint + title string + services []*v1.Service + endpointSlices []*discoveryv1.EndpointSlice + fqdnTemplate string + combineFQDN bool + publishHostIp bool + expected []*endpoint.Endpoint }{ { title: "templating with multiple services", @@ -346,23 +347,24 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { }, }, }, - endpoints: []*v1.Endpoints{ + endpointSlices: []*discoveryv1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "service-one", + Name: "service-one-xxxxx", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "service-one", + }, }, - Subsets: []v1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []v1.EndpointAddress{ - { - IP: "100.66.2.241", - Hostname: "ip-10-1-164-158.internal", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "pod-1", - Namespace: "default", - }}, + Addresses: []string{"100.66.2.241"}, + Hostname: testutils.ToPtr("ip-10-1-164-158.internal"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "pod-1", + Namespace: "default", }, }, }, @@ -370,20 +372,20 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "service-two", + Name: "service-two-xxxxx", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "service-two", + }, }, - Subsets: []v1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []v1.EndpointAddress{ - { - IP: "100.66.2.244", - Hostname: "ip-10-1-164-152.internal", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "pod-2", - Namespace: "default", - }, - }, + Addresses: []string{"100.66.2.244"}, + Hostname: testutils.ToPtr("ip-10-1-164-152.internal"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "pod-2", + Namespace: "default", }, }, }, @@ -391,29 +393,29 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "service-three", + Name: "service-three-xxxxx", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "service-three", + }, }, - Subsets: []v1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []v1.EndpointAddress{ - { - IP: "100.66.2.246", - Hostname: "ip-10-1-164-158.internal", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "pod-3", - Namespace: "default", - }, - }, - { - IP: "100.66.2.247", - Hostname: "ip-10-1-164-158.internal", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "pod-4", - Namespace: "default", - }, - }, + Addresses: []string{"100.66.2.246"}, + Hostname: testutils.ToPtr("ip-10-1-164-158.internal"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "pod-3", + Namespace: "default", + }, + }, + { + Addresses: []string{"100.66.2.247"}, + Hostname: testutils.ToPtr("ip-10-1-164-158.internal"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "pod-4", + Namespace: "default", }, }, }, @@ -476,23 +478,24 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { }, }, }, - endpoints: []*v1.Endpoints{ + endpointSlices: []*discoveryv1.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "service-one", + Name: "service-one-xxxxx", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "service-one", + }, }, - Subsets: []v1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []v1.EndpointAddress{ - { - IP: "100.66.2.241", - Hostname: "ip-10-1-164-158.internal", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "pod-1", - Namespace: "default", - }}, + Addresses: []string{"100.66.2.241"}, + Hostname: testutils.ToPtr("ip-10-1-164-158.internal"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "pod-1", + Namespace: "default", }, }, }, @@ -500,20 +503,20 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "service-two", + Name: "service-two-xxxxx", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "service-two", + }, }, - Subsets: []v1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []v1.EndpointAddress{ - { - IP: "100.66.2.244", - Hostname: "ip-10-1-164-152.internal", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "pod-2", - Namespace: "default", - }, - }, + Addresses: []string{"100.66.2.244"}, + Hostname: testutils.ToPtr("ip-10-1-164-152.internal"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "pod-2", + Namespace: "default", }, }, }, @@ -521,29 +524,29 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "service-three", + Name: "service-three-xxxxx", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "service-three", + }, }, - Subsets: []v1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []v1.EndpointAddress{ - { - IP: "100.66.2.246", - Hostname: "ip-10-1-164-158.internal", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "pod-3", - Namespace: "default", - }, - }, - { - IP: "100.66.2.247", - Hostname: "ip-10-1-164-158.internal", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "pod-4", - Namespace: "default", - }, - }, + Addresses: []string{"100.66.2.246"}, + Hostname: testutils.ToPtr("ip-10-1-164-158.internal"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "pod-3", + Namespace: "default", + }, + }, + { + Addresses: []string{"100.66.2.247"}, + Hostname: testutils.ToPtr("ip-10-1-164-158.internal"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "pod-4", + Namespace: "default", }, }, }, @@ -569,24 +572,23 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { } // Create endpoints and pods for the services - for _, el := range tt.endpoints { - _, err := kubeClient.CoreV1().Endpoints(el.Namespace).Create(t.Context(), el, metav1.CreateOptions{}) + for _, el := range tt.endpointSlices { + _, err := kubeClient.DiscoveryV1().EndpointSlices(el.Namespace).Create(t.Context(), el, metav1.CreateOptions{}) require.NoError(t, err) - for i, subset := range el.Subsets { - for idx, address := range subset.Addresses { - _, err = kubeClient.CoreV1().Pods(el.Namespace).Create(t.Context(), &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: address.TargetRef.Name, - Namespace: el.Namespace, - }, - Spec: v1.PodSpec{ - Hostname: address.Hostname, - }, - Status: v1.PodStatus{ - HostIP: fmt.Sprintf("10.1.2%d.4%d", i, idx), - }, - }, metav1.CreateOptions{}) - } + for i, ep := range el.Endpoints { + _, err = kubeClient.CoreV1().Pods(el.Namespace).Create(t.Context(), &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: ep.TargetRef.Name, + Namespace: el.Namespace, + }, + Spec: v1.PodSpec{ + Hostname: *ep.Hostname, + }, + Status: v1.PodStatus{ + HostIP: fmt.Sprintf("10.1.20.4%d", i), + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) } } diff --git a/source/service_test.go b/source/service_test.go index cad1012f9..885af6959 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -19,6 +19,7 @@ package source import ( "context" "fmt" + "maps" "math/rand" "net" "sort" @@ -30,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/fake" @@ -3107,7 +3109,7 @@ func TestHeadlessServices(t *testing.T) { _, err := kubernetes.CoreV1().Services(service.Namespace).Create(context.Background(), service, metav1.CreateOptions{}) require.NoError(t, err) - var addresses, notReadyAddresses []v1.EndpointAddress + var endpointSliceEndpoints []discoveryv1.Endpoint for i, podname := range tc.podnames { pod := &v1.Pod{ Spec: v1.PodSpec{ @@ -3129,34 +3131,31 @@ func TestHeadlessServices(t *testing.T) { _, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(context.Background(), pod, metav1.CreateOptions{}) require.NoError(t, err) - address := v1.EndpointAddress{ - IP: tc.podIPs[i], + ep := discoveryv1.Endpoint{ + Addresses: []string{tc.podIPs[i]}, TargetRef: &v1.ObjectReference{ APIVersion: "", Kind: "Pod", Name: podname, }, + Conditions: discoveryv1.EndpointConditions{ + Ready: &tc.podsReady[i], + }, } - if tc.podsReady[i] { - addresses = append(addresses, address) - } else { - notReadyAddresses = append(notReadyAddresses, address) - } + endpointSliceEndpoints = append(endpointSliceEndpoints, ep) } - endpointsObject := &v1.Endpoints{ + endpointSliceLabels := maps.Clone(tc.labels) + endpointSliceLabels[discoveryv1.LabelServiceName] = tc.svcName + endpointSlice := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: tc.svcNamespace, Name: tc.svcName, - Labels: tc.labels, - }, - Subsets: []v1.EndpointSubset{ - { - Addresses: addresses, - NotReadyAddresses: notReadyAddresses, - }, + Labels: endpointSliceLabels, }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: endpointSliceEndpoints, } - _, err = kubernetes.CoreV1().Endpoints(tc.svcNamespace).Create(context.Background(), endpointsObject, metav1.CreateOptions{}) + _, err = kubernetes.DiscoveryV1().EndpointSlices(tc.svcNamespace).Create(context.Background(), endpointSlice, metav1.CreateOptions{}) require.NoError(t, err) for _, node := range tc.nodes { _, err = kubernetes.CoreV1().Nodes().Create(context.Background(), &node, metav1.CreateOptions{}) @@ -3576,8 +3575,7 @@ func TestHeadlessServicesHostIP(t *testing.T) { _, err := kubernetes.CoreV1().Services(service.Namespace).Create(context.Background(), service, metav1.CreateOptions{}) require.NoError(t, err) - var addresses []v1.EndpointAddress - var notReadyAddresses []v1.EndpointAddress + var endpointsSlicesEndpoints []discoveryv1.Endpoint for i, podname := range tc.podnames { pod := &v1.Pod{ Spec: v1.PodSpec{ @@ -3598,30 +3596,27 @@ func TestHeadlessServicesHostIP(t *testing.T) { _, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(context.Background(), pod, metav1.CreateOptions{}) require.NoError(t, err) - address := v1.EndpointAddress{ - IP: "4.3.2.1", + ep := discoveryv1.Endpoint{ + Addresses: []string{"4.3.2.1"}, TargetRef: tc.targetRefs[i], + Conditions: discoveryv1.EndpointConditions{ + Ready: &tc.podsReady[i], + }, } - if tc.podsReady[i] { - addresses = append(addresses, address) - } else { - notReadyAddresses = append(notReadyAddresses, address) - } + endpointsSlicesEndpoints = append(endpointsSlicesEndpoints, ep) } - endpointsObject := &v1.Endpoints{ + endpointSliceLabels := maps.Clone(tc.labels) + endpointSliceLabels[discoveryv1.LabelServiceName] = tc.svcName + endpointSlice := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: tc.svcNamespace, Name: tc.svcName, - Labels: tc.labels, - }, - Subsets: []v1.EndpointSubset{ - { - Addresses: addresses, - NotReadyAddresses: notReadyAddresses, - }, + Labels: endpointSliceLabels, }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: endpointsSlicesEndpoints, } - _, err = kubernetes.CoreV1().Endpoints(tc.svcNamespace).Create(context.Background(), endpointsObject, metav1.CreateOptions{}) + _, err = kubernetes.DiscoveryV1().EndpointSlices(tc.svcNamespace).Create(context.Background(), endpointSlice, metav1.CreateOptions{}) require.NoError(t, err) // Create our object under test and get the endpoints. @@ -4064,6 +4059,64 @@ func TestFilterByServiceType_WithFixture(t *testing.T) { } } +func TestEndpointSlicesIndexer(t *testing.T) { + ctx := t.Context() + fakeClient := fake.NewClientset() + + // Create a dummy EndpointSlice without the service name label + endpointSlice := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-slice", + Namespace: "default", + Labels: map[string]string{}, // No discoveryv1.LabelServiceName + }, + } + _, err := fakeClient.DiscoveryV1().EndpointSlices("default").Create(ctx, endpointSlice, metav1.CreateOptions{}) + require.NoError(t, err) + + // Should not error when creating the source + src, err := NewServiceSource( + ctx, + fakeClient, + "default", + "", + "{{.Name}}", + false, + "", + false, + false, + false, + []string{}, + false, + labels.Everything(), + false, + false, + false, + ) + require.NoError(t, err) + ss, ok := src.(*serviceSource) + require.True(t, ok) + + // Try to get EndpointSlices by index; should not panic or error, should return empty slice + indexer := ss.endpointSlicesInformer.Informer().GetIndexer() + slices, err := indexer.ByIndex(serviceNameIndexKey, "default/foo") + require.NoError(t, err) + require.Empty(t, slices) + + // Insert an object of the wrong type into the indexer; indexFunc should return an error and Add() should panic + require.PanicsWithError(t, + "unable to calculate an index entry for key \"default/not-an-endpointslice\" on index \"serviceName\": "+ + "expected *v1.EndpointSlice but got *v1.Service instead", + func() { + _ = indexer.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "not-an-endpointslice", + Namespace: "default", + }, + }) + }) +} + // createTestServicesByType creates the requested number of services per type in the given namespace. func createTestServicesByType(namespace string, typeCounts map[v1.ServiceType]int) []*v1.Service { var services []*v1.Service