feat(source): use EndpointSlices instead of Endpoints for Service (#5493)

* feat(source): use EndpointSlice for Service source

* feat(source): use indexer for EndpointSlice listing
This commit is contained in:
vflaux 2025-06-19 12:06:52 +02:00 committed by GitHub
parent cab4e85377
commit ef6e0e5e1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 293 additions and 174 deletions

View File

@ -18,7 +18,10 @@ rules:
{{- end }}
{{- if or (has "service" .Values.sources) (has "contour-httpproxy" .Values.sources) (has "gloo-proxy" .Values.sources) (has "istio-gateway" .Values.sources) (has "istio-virtualservice" .Values.sources) (has "openshift-route" .Values.sources) (has "skipper-routegroup" .Values.sources) }}
- apiGroups: [""]
resources: ["services","endpoints"]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get","watch","list"]
{{- end }}
{{- if or (has "ingress" .Values.sources) (has "istio-gateway" .Values.sources) (has "istio-virtualservice" .Values.sources) (has "contour-httpproxy" .Values.sources) (has "openshift-route" .Values.sources) (has "skipper-routegroup" .Values.sources) }}

View File

@ -43,7 +43,10 @@ tests:
resources: ["pods"]
verbs: ["get", "watch", "list"]
- apiGroups: [""]
resources: ["services","endpoints"]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions","networking.k8s.io"]
resources: ["ingresses"]

View File

@ -11,7 +11,7 @@
| `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) |
| `--request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout |
| `--[no-]resolve-service-load-balancer-hostname` | Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs |
| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to Endpoints, for Service source (default: false) |
| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to EndpointSlices, for Service source (default: false) |
| `--cf-api-endpoint=""` | The fully-qualified domain name of the cloud foundry instance you are targeting |
| `--cf-username=""` | The username to log into the cloud foundry API |
| `--cf-password=""` | The password to log into the cloud foundry API |

View File

@ -438,7 +438,7 @@ func App(cfg *Config) *kingpin.Application {
app.Flag("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)").Default(defaultConfig.KubeConfig).StringVar(&cfg.KubeConfig)
app.Flag("request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout").Default(defaultConfig.RequestTimeout.String()).DurationVar(&cfg.RequestTimeout)
app.Flag("resolve-service-load-balancer-hostname", "Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs").BoolVar(&cfg.ResolveServiceLoadBalancerHostname)
app.Flag("listen-endpoint-events", "Trigger a reconcile on changes to Endpoints, for Service source (default: false)").BoolVar(&cfg.ListenEndpointEvents)
app.Flag("listen-endpoint-events", "Trigger a reconcile on changes to EndpointSlices, for Service source (default: false)").BoolVar(&cfg.ListenEndpointEvents)
// Flags related to cloud foundry
app.Flag("cf-api-endpoint", "The fully-qualified domain name of the cloud foundry instance you are targeting").Default(defaultConfig.CFAPIEndpoint).StringVar(&cfg.CFAPIEndpoint)

View File

@ -28,10 +28,13 @@ import (
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
@ -50,6 +53,7 @@ var (
v1.ServiceTypeLoadBalancer: {}, // Exposes the service externally using a cloud provider's load balancer.
v1.ServiceTypeExternalName: {}, // Maps the service to an external DNS name.
}
serviceNameIndexKey = "serviceName"
)
// serviceSource is an implementation of Source for Kubernetes service objects.
@ -72,7 +76,7 @@ type serviceSource struct {
resolveLoadBalancerHostname bool
listenEndpointEvents bool
serviceInformer coreinformers.ServiceInformer
endpointsInformer coreinformers.EndpointsInformer
endpointSlicesInformer discoveryinformers.EndpointSliceInformer
podInformer coreinformers.PodInformer
nodeInformer coreinformers.NodeInformer
serviceTypeFilter *serviceTypes
@ -93,7 +97,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
// Set the resync period to 0 to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
endpointsInformer := informerFactory.Core().V1().Endpoints()
endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices()
podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()
@ -104,7 +108,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
},
},
)
endpointsInformer.Informer().AddEventHandler(
endpointSlicesInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
@ -123,6 +127,26 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
},
)
// Add an indexer to the EndpointSlice informer to index by the service name label
err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{
serviceNameIndexKey: func(obj any) ([]string, error) {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
// This should never happen because the Informer should only contain EndpointSlice objects
return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj)
}
serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName]
if serviceName == "" {
return nil, nil
}
key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String()
return []string{key}, nil
},
})
if err != nil {
return nil, err
}
informerFactory.Start(ctx.Done())
// wait for the local cache to be populated.
@ -148,7 +172,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
publishHostIP: publishHostIP,
alwaysPublishNotReadyAddresses: alwaysPublishNotReadyAddresses,
serviceInformer: serviceInformer,
endpointsInformer: endpointsInformer,
endpointSlicesInformer: endpointSlicesInformer,
podInformer: podInformer,
nodeInformer: nodeInformer,
serviceTypeFilter: sTypesFilter,
@ -278,42 +302,63 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
return nil
}
endpointsObject, err := sc.endpointsInformer.Lister().Endpoints(svc.Namespace).Get(svc.GetName())
serviceKey := cache.ObjectName{Namespace: svc.Namespace, Name: svc.Name}.String()
rawEndpointSlices, err := sc.endpointSlicesInformer.Informer().GetIndexer().ByIndex(serviceNameIndexKey, serviceKey)
if err != nil {
log.Errorf("Get endpoints of service[%s] error:%v", svc.GetName(), err)
return endpoints
// Should never happen as long as the index exists
log.Errorf("Get EndpointSlices of service[%s] error:%v", svc.GetName(), err)
return nil
}
endpointSlices := make([]*discoveryv1.EndpointSlice, 0, len(rawEndpointSlices))
for _, obj := range rawEndpointSlices {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
// Should never happen as the indexer can only contain EndpointSlice objects
log.Errorf("Expected %T but got %T instead, skipping", endpointSlice, obj)
continue
}
endpointSlices = append(endpointSlices, endpointSlice)
}
pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
log.Errorf("List pods of service[%s] error: %v", svc.GetName(), err)
log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
return endpoints
}
endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations)
publishPodIPs := endpointsType != EndpointsTypeNodeExternalIP && endpointsType != EndpointsTypeHostIP && !sc.publishHostIP
publishNotReadyAddresses := svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses
targetsByHeadlessDomainAndType := make(map[endpoint.EndpointKey]endpoint.Targets)
for _, subset := range endpointsObject.Subsets {
addresses := subset.Addresses
if svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses {
addresses = append(addresses, subset.NotReadyAddresses...)
}
for _, endpointSlice := range endpointSlices {
for _, ep := range endpointSlice.Endpoints {
if !conditionToBool(ep.Conditions.Ready) && !publishNotReadyAddresses {
continue
}
if publishPodIPs &&
endpointSlice.AddressType != discoveryv1.AddressTypeIPv4 &&
endpointSlice.AddressType != discoveryv1.AddressTypeIPv6 {
log.Debugf("Skipping EndpointSlice %s/%s because its address type is unsupported: %s", endpointSlice.Namespace, endpointSlice.Name, endpointSlice.AddressType)
continue
}
for _, address := range addresses {
// find pod for this address
if address.TargetRef == nil || address.TargetRef.APIVersion != "" || address.TargetRef.Kind != "Pod" {
log.Debugf("Skipping address because its target is not a pod: %v", address)
if ep.TargetRef == nil || ep.TargetRef.APIVersion != "" || ep.TargetRef.Kind != "Pod" {
log.Debugf("Skipping address because its target is not a pod: %v", ep)
continue
}
var pod *v1.Pod
for _, v := range pods {
if v.Name == address.TargetRef.Name {
if v.Name == ep.TargetRef.Name {
pod = v
break
}
}
if pod == nil {
log.Errorf("Pod %s not found for address %v", address.TargetRef.Name, address)
log.Errorf("Pod %s not found for address %v", ep.TargetRef.Name, ep)
continue
}
@ -341,8 +386,13 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
targets = endpoint.Targets{pod.Status.HostIP}
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, pod.Status.HostIP)
} else {
targets = endpoint.Targets{address.IP}
log.Debugf("Generating matching endpoint %s with EndpointAddress IP %s", headlessDomain, address.IP)
if len(ep.Addresses) == 0 {
log.Warnf("EndpointSlice %s/%s has no addresses for endpoint %v", endpointSlice.Namespace, endpointSlice.Name, ep)
continue
}
address := ep.Addresses[0] // Only use the first address, as additional addresses have no semantic defined
targets = endpoint.Targets{address}
log.Debugf("Generating matching endpoint %s with EndpointSliceAddress IP %s", headlessDomain, address)
}
}
for _, target := range targets {
@ -758,7 +808,7 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) {
// https://github.com/kubernetes/kubernetes/issues/79610
sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
if sc.listenEndpointEvents {
sc.endpointsInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
}
@ -789,3 +839,11 @@ func newServiceTypesFilter(filter []string) (*serviceTypes, error) {
types: types,
}, nil
}
// conditionToBool converts an EndpointConditions condition to a bool value.
func conditionToBool(v *bool) bool {
if v == nil {
return true // nil should be interpreted as "true" as per EndpointConditions spec
}
return *v
}

View File

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/fake"
@ -30,13 +31,13 @@ import (
func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
for _, tt := range []struct {
title string
services []*v1.Service
endpoints []*v1.Endpoints
fqdnTemplate string
combineFQDN bool
publishHostIp bool
expected []*endpoint.Endpoint
title string
services []*v1.Service
endpointSlices []*discoveryv1.EndpointSlice
fqdnTemplate string
combineFQDN bool
publishHostIp bool
expected []*endpoint.Endpoint
}{
{
title: "templating with multiple services",
@ -346,23 +347,24 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
},
},
},
endpoints: []*v1.Endpoints{
endpointSlices: []*discoveryv1.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "service-one",
Name: "service-one-xxxxx",
Labels: map[string]string{
discoveryv1.LabelServiceName: "service-one",
},
},
Subsets: []v1.EndpointSubset{
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []v1.EndpointAddress{
{
IP: "100.66.2.241",
Hostname: "ip-10-1-164-158.internal",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-1",
Namespace: "default",
}},
Addresses: []string{"100.66.2.241"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-1",
Namespace: "default",
},
},
},
@ -370,20 +372,20 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "service-two",
Name: "service-two-xxxxx",
Labels: map[string]string{
discoveryv1.LabelServiceName: "service-two",
},
},
Subsets: []v1.EndpointSubset{
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []v1.EndpointAddress{
{
IP: "100.66.2.244",
Hostname: "ip-10-1-164-152.internal",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-2",
Namespace: "default",
},
},
Addresses: []string{"100.66.2.244"},
Hostname: testutils.ToPtr("ip-10-1-164-152.internal"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-2",
Namespace: "default",
},
},
},
@ -391,29 +393,29 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "service-three",
Name: "service-three-xxxxx",
Labels: map[string]string{
discoveryv1.LabelServiceName: "service-three",
},
},
Subsets: []v1.EndpointSubset{
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []v1.EndpointAddress{
{
IP: "100.66.2.246",
Hostname: "ip-10-1-164-158.internal",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-3",
Namespace: "default",
},
},
{
IP: "100.66.2.247",
Hostname: "ip-10-1-164-158.internal",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-4",
Namespace: "default",
},
},
Addresses: []string{"100.66.2.246"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-3",
Namespace: "default",
},
},
{
Addresses: []string{"100.66.2.247"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-4",
Namespace: "default",
},
},
},
@ -476,23 +478,24 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
},
},
},
endpoints: []*v1.Endpoints{
endpointSlices: []*discoveryv1.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "service-one",
Name: "service-one-xxxxx",
Labels: map[string]string{
discoveryv1.LabelServiceName: "service-one",
},
},
Subsets: []v1.EndpointSubset{
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []v1.EndpointAddress{
{
IP: "100.66.2.241",
Hostname: "ip-10-1-164-158.internal",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-1",
Namespace: "default",
}},
Addresses: []string{"100.66.2.241"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-1",
Namespace: "default",
},
},
},
@ -500,20 +503,20 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "service-two",
Name: "service-two-xxxxx",
Labels: map[string]string{
discoveryv1.LabelServiceName: "service-two",
},
},
Subsets: []v1.EndpointSubset{
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []v1.EndpointAddress{
{
IP: "100.66.2.244",
Hostname: "ip-10-1-164-152.internal",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-2",
Namespace: "default",
},
},
Addresses: []string{"100.66.2.244"},
Hostname: testutils.ToPtr("ip-10-1-164-152.internal"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-2",
Namespace: "default",
},
},
},
@ -521,29 +524,29 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "service-three",
Name: "service-three-xxxxx",
Labels: map[string]string{
discoveryv1.LabelServiceName: "service-three",
},
},
Subsets: []v1.EndpointSubset{
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []v1.EndpointAddress{
{
IP: "100.66.2.246",
Hostname: "ip-10-1-164-158.internal",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-3",
Namespace: "default",
},
},
{
IP: "100.66.2.247",
Hostname: "ip-10-1-164-158.internal",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-4",
Namespace: "default",
},
},
Addresses: []string{"100.66.2.246"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-3",
Namespace: "default",
},
},
{
Addresses: []string{"100.66.2.247"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-4",
Namespace: "default",
},
},
},
@ -569,24 +572,23 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
}
// Create endpoints and pods for the services
for _, el := range tt.endpoints {
_, err := kubeClient.CoreV1().Endpoints(el.Namespace).Create(t.Context(), el, metav1.CreateOptions{})
for _, el := range tt.endpointSlices {
_, err := kubeClient.DiscoveryV1().EndpointSlices(el.Namespace).Create(t.Context(), el, metav1.CreateOptions{})
require.NoError(t, err)
for i, subset := range el.Subsets {
for idx, address := range subset.Addresses {
_, err = kubeClient.CoreV1().Pods(el.Namespace).Create(t.Context(), &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: address.TargetRef.Name,
Namespace: el.Namespace,
},
Spec: v1.PodSpec{
Hostname: address.Hostname,
},
Status: v1.PodStatus{
HostIP: fmt.Sprintf("10.1.2%d.4%d", i, idx),
},
}, metav1.CreateOptions{})
}
for i, ep := range el.Endpoints {
_, err = kubeClient.CoreV1().Pods(el.Namespace).Create(t.Context(), &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: ep.TargetRef.Name,
Namespace: el.Namespace,
},
Spec: v1.PodSpec{
Hostname: *ep.Hostname,
},
Status: v1.PodStatus{
HostIP: fmt.Sprintf("10.1.20.4%d", i),
},
}, metav1.CreateOptions{})
require.NoError(t, err)
}
}

View File

@ -19,6 +19,7 @@ package source
import (
"context"
"fmt"
"maps"
"math/rand"
"net"
"sort"
@ -30,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/fake"
@ -3107,7 +3109,7 @@ func TestHeadlessServices(t *testing.T) {
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(context.Background(), service, metav1.CreateOptions{})
require.NoError(t, err)
var addresses, notReadyAddresses []v1.EndpointAddress
var endpointSliceEndpoints []discoveryv1.Endpoint
for i, podname := range tc.podnames {
pod := &v1.Pod{
Spec: v1.PodSpec{
@ -3129,34 +3131,31 @@ func TestHeadlessServices(t *testing.T) {
_, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)
address := v1.EndpointAddress{
IP: tc.podIPs[i],
ep := discoveryv1.Endpoint{
Addresses: []string{tc.podIPs[i]},
TargetRef: &v1.ObjectReference{
APIVersion: "",
Kind: "Pod",
Name: podname,
},
Conditions: discoveryv1.EndpointConditions{
Ready: &tc.podsReady[i],
},
}
if tc.podsReady[i] {
addresses = append(addresses, address)
} else {
notReadyAddresses = append(notReadyAddresses, address)
}
endpointSliceEndpoints = append(endpointSliceEndpoints, ep)
}
endpointsObject := &v1.Endpoints{
endpointSliceLabels := maps.Clone(tc.labels)
endpointSliceLabels[discoveryv1.LabelServiceName] = tc.svcName
endpointSlice := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
},
Subsets: []v1.EndpointSubset{
{
Addresses: addresses,
NotReadyAddresses: notReadyAddresses,
},
Labels: endpointSliceLabels,
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: endpointSliceEndpoints,
}
_, err = kubernetes.CoreV1().Endpoints(tc.svcNamespace).Create(context.Background(), endpointsObject, metav1.CreateOptions{})
_, err = kubernetes.DiscoveryV1().EndpointSlices(tc.svcNamespace).Create(context.Background(), endpointSlice, metav1.CreateOptions{})
require.NoError(t, err)
for _, node := range tc.nodes {
_, err = kubernetes.CoreV1().Nodes().Create(context.Background(), &node, metav1.CreateOptions{})
@ -3576,8 +3575,7 @@ func TestHeadlessServicesHostIP(t *testing.T) {
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(context.Background(), service, metav1.CreateOptions{})
require.NoError(t, err)
var addresses []v1.EndpointAddress
var notReadyAddresses []v1.EndpointAddress
var endpointsSlicesEndpoints []discoveryv1.Endpoint
for i, podname := range tc.podnames {
pod := &v1.Pod{
Spec: v1.PodSpec{
@ -3598,30 +3596,27 @@ func TestHeadlessServicesHostIP(t *testing.T) {
_, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)
address := v1.EndpointAddress{
IP: "4.3.2.1",
ep := discoveryv1.Endpoint{
Addresses: []string{"4.3.2.1"},
TargetRef: tc.targetRefs[i],
Conditions: discoveryv1.EndpointConditions{
Ready: &tc.podsReady[i],
},
}
if tc.podsReady[i] {
addresses = append(addresses, address)
} else {
notReadyAddresses = append(notReadyAddresses, address)
}
endpointsSlicesEndpoints = append(endpointsSlicesEndpoints, ep)
}
endpointsObject := &v1.Endpoints{
endpointSliceLabels := maps.Clone(tc.labels)
endpointSliceLabels[discoveryv1.LabelServiceName] = tc.svcName
endpointSlice := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
},
Subsets: []v1.EndpointSubset{
{
Addresses: addresses,
NotReadyAddresses: notReadyAddresses,
},
Labels: endpointSliceLabels,
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: endpointsSlicesEndpoints,
}
_, err = kubernetes.CoreV1().Endpoints(tc.svcNamespace).Create(context.Background(), endpointsObject, metav1.CreateOptions{})
_, err = kubernetes.DiscoveryV1().EndpointSlices(tc.svcNamespace).Create(context.Background(), endpointSlice, metav1.CreateOptions{})
require.NoError(t, err)
// Create our object under test and get the endpoints.
@ -4064,6 +4059,64 @@ func TestFilterByServiceType_WithFixture(t *testing.T) {
}
}
func TestEndpointSlicesIndexer(t *testing.T) {
ctx := t.Context()
fakeClient := fake.NewClientset()
// Create a dummy EndpointSlice without the service name label
endpointSlice := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "test-slice",
Namespace: "default",
Labels: map[string]string{}, // No discoveryv1.LabelServiceName
},
}
_, err := fakeClient.DiscoveryV1().EndpointSlices("default").Create(ctx, endpointSlice, metav1.CreateOptions{})
require.NoError(t, err)
// Should not error when creating the source
src, err := NewServiceSource(
ctx,
fakeClient,
"default",
"",
"{{.Name}}",
false,
"",
false,
false,
false,
[]string{},
false,
labels.Everything(),
false,
false,
false,
)
require.NoError(t, err)
ss, ok := src.(*serviceSource)
require.True(t, ok)
// Try to get EndpointSlices by index; should not panic or error, should return empty slice
indexer := ss.endpointSlicesInformer.Informer().GetIndexer()
slices, err := indexer.ByIndex(serviceNameIndexKey, "default/foo")
require.NoError(t, err)
require.Empty(t, slices)
// Insert an object of the wrong type into the indexer; indexFunc should return an error and Add() should panic
require.PanicsWithError(t,
"unable to calculate an index entry for key \"default/not-an-endpointslice\" on index \"serviceName\": "+
"expected *v1.EndpointSlice but got *v1.Service instead",
func() {
_ = indexer.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "not-an-endpointslice",
Namespace: "default",
},
})
})
}
// createTestServicesByType creates the requested number of services per type in the given namespace.
func createTestServicesByType(namespace string, typeCounts map[v1.ServiceType]int) []*v1.Service {
var services []*v1.Service