fix(source/service): disable node informer when not required (#5613)

* fix(source/service): disable node informer when service type filter is activated

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

fix(source/service): disable node informer when service type filter is activated

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

fix(source/service): disable node informer when service type filter is activated

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

fix(source/service): disable node informer when service type filter is activated

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* fix(source/service): disable node informer when service type filter is activated

Co-authored-by: Michel Loiseleur <97035654+mloiseleur@users.noreply.github.com>

* fix(source/service): disable node informer when service type filter is activated

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

---------

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
Co-authored-by: Michel Loiseleur <97035654+mloiseleur@users.noreply.github.com>
This commit is contained in:
Ivan Ka 2025-07-11 18:15:28 +01:00 committed by GitHub
parent 28d0ff9316
commit 1bfb970ace
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 269 additions and 27 deletions

View File

@ -0,0 +1,38 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
)
func DefaultEventHandler(handlers ...func()) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
if u, ok := obj.(*unstructured.Unstructured); ok {
log.WithFields(log.Fields{
"apiVersion": u.GetAPIVersion(),
"kind": u.GetKind(),
"namespace": u.GetNamespace(),
"name": u.GetName(),
}).Debug("added")
for _, handler := range handlers {
handler()
}
}
},
}
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"testing"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
func TestDefaultEventHandler_AddFunc(t *testing.T) {
tests := []struct {
name string
obj any
expected bool
}{
{
name: "calls handler for unstructured object",
obj: &unstructured.Unstructured{},
expected: true,
},
{
name: "does not call handler for unknown object",
obj: "not-unstructured",
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
called := false
handler := DefaultEventHandler(func() { called = true })
handler.OnAdd(tt.obj, true)
if called != tt.expected {
t.Errorf("handler called = %v, want %v", called, tt.expected)
}
})
}
}

View File

@ -98,34 +98,39 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
serviceInformer := informerFactory.Core().V1().Services() serviceInformer := informerFactory.Core().V1().Services()
endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices() endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices()
podInformer := informerFactory.Core().V1().Pods() podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()
// Add default resource event handlers to properly initialize informer. // Add default resource event handlers to properly initialize informer.
serviceInformer.Informer().AddEventHandler( _, _ = serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
}, },
}, },
) )
endpointSlicesInformer.Informer().AddEventHandler( _, _ = endpointSlicesInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
}, },
}, },
) )
podInformer.Informer().AddEventHandler( _, _ = podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
}, },
}, },
) )
// Transform the slice into a map so it will be way much easier and fast to filter later
sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter)
if err != nil {
return nil, err
}
var nodeInformer coreinformers.NodeInformer
if sTypesFilter.isNodeInformerRequired() {
nodeInformer = informerFactory.Core().V1().Nodes()
_, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
}
// Add an indexer to the EndpointSlice informer to index by the service name label // Add an indexer to the EndpointSlice informer to index by the service name label
err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{ err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{
serviceNameIndexKey: func(obj any) ([]string, error) { serviceNameIndexKey: func(obj any) ([]string, error) {
@ -153,12 +158,6 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
return nil, err return nil, err
} }
// Transform the slice into a map so it will be way much easier and fast to filter later
sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter)
if err != nil {
return nil, err
}
return &serviceSource{ return &serviceSource{
client: kubeClient, client: kubeClient,
namespace: namespace, namespace: namespace,
@ -197,7 +196,7 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err
return nil, err return nil, err
} }
endpoints := []*endpoint.Endpoint{} endpoints := make([]*endpoint.Endpoint, 0)
for _, svc := range services { for _, svc := range services {
// Check controller annotation to see if we are responsible. // Check controller annotation to see if we are responsible.
@ -364,6 +363,10 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
targets := annotations.TargetsFromTargetAnnotation(pod.Annotations) targets := annotations.TargetsFromTargetAnnotation(pod.Annotations)
if len(targets) == 0 { if len(targets) == 0 {
if endpointsType == EndpointsTypeNodeExternalIP { if endpointsType == EndpointsTypeNodeExternalIP {
if sc.nodeInformer == nil {
log.Warnf("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer", endpointSlice.Namespace, endpointSlice.Name)
continue
}
node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName) node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName)
if err != nil { if err != nil {
log.Errorf("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints", pod.Spec.NodeName, pod.GetName(), err) log.Errorf("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints", pod.Spec.NodeName, pod.GetName(), err)
@ -459,7 +462,8 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.End
// endpointsFromService extracts the endpoints from a service object // endpointsFromService extracts the endpoints from a service object
func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint { func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint var endpoints []*endpoint.Endpoint
// Skip endpoints if we do not want entries from annotations
// Skip endpoints if we do not want entries from annotations or service is excluded
if sc.ignoreHostnameAnnotation { if sc.ignoreHostnameAnnotation {
return endpoints return endpoints
} }
@ -512,7 +516,7 @@ func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Servi
} }
var result []*v1.Service var result []*v1.Service
for _, service := range services { for _, service := range services {
if _, ok := sc.serviceTypeFilter.types[service.Spec.Type]; ok { if sc.serviceTypeFilter.isProcessed(service.Spec.Type) {
result = append(result, service) result = append(result, service)
} }
} }
@ -797,9 +801,12 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) {
// Right now there is no way to remove event handler from informer, see: // Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610 // https://github.com/kubernetes/kubernetes/issues/79610
sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) _, _ = sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
if sc.listenEndpointEvents { if sc.listenEndpointEvents {
sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) _, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
if sc.serviceTypeFilter.isNodeInformerRequired() {
_, _ = sc.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
} }
} }
@ -817,20 +824,32 @@ func newServiceTypesFilter(filter []string) (*serviceTypes, error) {
enabled: false, enabled: false,
}, nil }, nil
} }
types := make(map[v1.ServiceType]bool) result := make(map[v1.ServiceType]bool)
for _, serviceType := range filter { for _, serviceType := range filter {
if _, ok := knownServiceTypes[v1.ServiceType(serviceType)]; !ok { if _, ok := knownServiceTypes[v1.ServiceType(serviceType)]; !ok {
return nil, fmt.Errorf("unsupported service type filter: %q. Supported types are: %q", serviceType, slices.Collect(maps.Keys(knownServiceTypes))) return nil, fmt.Errorf("unsupported service type filter: %q. Supported types are: %q", serviceType, slices.Collect(maps.Keys(knownServiceTypes)))
} }
types[v1.ServiceType(serviceType)] = true result[v1.ServiceType(serviceType)] = true
} }
return &serviceTypes{ return &serviceTypes{
enabled: true, enabled: true,
types: types, types: result,
}, nil }, nil
} }
func (sc *serviceTypes) isProcessed(serviceType v1.ServiceType) bool {
return !sc.enabled || sc.types[serviceType]
}
func (sc *serviceTypes) isNodeInformerRequired() bool {
if !sc.enabled {
return true
}
_, ok := sc.types[v1.ServiceTypeNodePort]
return ok
}
// conditionToBool converts an EndpointConditions condition to a bool value. // conditionToBool converts an EndpointConditions condition to a bool value.
func conditionToBool(v *bool) bool { func conditionToBool(v *bool) bool {
if v == nil { if v == nil {

View File

@ -302,6 +302,18 @@ func testServiceSourceEndpoints(t *testing.T) {
{DNSName: "foo.fqdn.com", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, {DNSName: "foo.fqdn.com", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}},
}, },
}, },
{
title: "with excluded service type should not generate endpoints",
svcNamespace: "testing",
svcName: "foo",
svcType: v1.ServiceTypeLoadBalancer,
fqdnTemplate: "{{.Name}}.fqdn.org,{{.Name}}.fqdn.com",
labels: map[string]string{},
annotations: map[string]string{},
lbs: []string{"1.2.3.4"},
serviceTypesFilter: []string{string(v1.ServiceTypeNodePort)},
expected: []*endpoint.Endpoint{},
},
{ {
title: "FQDN template with multiple hostnames return an endpoint with target IP when ignoring annotations", title: "FQDN template with multiple hostnames return an endpoint with target IP when ignoring annotations",
svcNamespace: "testing", svcNamespace: "testing",
@ -457,7 +469,7 @@ func testServiceSourceEndpoints(t *testing.T) {
}, },
externalIPs: []string{}, externalIPs: []string{},
lbs: []string{"1.2.3.4"}, lbs: []string{"1.2.3.4"},
serviceTypesFilter: []string{}, serviceTypesFilter: []string{string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeNodePort)},
expected: []*endpoint.Endpoint{ expected: []*endpoint.Endpoint{
{DNSName: "foo.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, {DNSName: "foo.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}},
}, },
@ -922,7 +934,6 @@ func testServiceSourceEndpoints(t *testing.T) {
annotations: map[string]string{ annotations: map[string]string{
hostnameAnnotationKey: "foo.example.org.", hostnameAnnotationKey: "foo.example.org.",
}, },
externalIPs: []string{},
lbs: []string{"1.2.3.4"}, lbs: []string{"1.2.3.4"},
serviceTypesFilter: []string{string(v1.ServiceTypeLoadBalancer)}, serviceTypesFilter: []string{string(v1.ServiceTypeLoadBalancer)},
expected: []*endpoint.Endpoint{}, expected: []*endpoint.Endpoint{},
@ -4049,6 +4060,7 @@ func TestExternalServices(t *testing.T) {
annotations map[string]string annotations map[string]string
externalName string externalName string
externalIPs []string externalIPs []string
serviceTypeFilter []string
expected []*endpoint.Endpoint expected []*endpoint.Endpoint
expectError bool expectError bool
}{ }{
@ -4067,6 +4079,7 @@ func TestExternalServices(t *testing.T) {
}, },
"111.111.111.111", "111.111.111.111",
[]string{}, []string{},
[]string{},
[]*endpoint.Endpoint{ []*endpoint.Endpoint{
{DNSName: "service.example.org", Targets: endpoint.Targets{"111.111.111.111"}, RecordType: endpoint.RecordTypeA}, {DNSName: "service.example.org", Targets: endpoint.Targets{"111.111.111.111"}, RecordType: endpoint.RecordTypeA},
}, },
@ -4087,6 +4100,7 @@ func TestExternalServices(t *testing.T) {
}, },
"2001:db8::111", "2001:db8::111",
[]string{}, []string{},
[]string{},
[]*endpoint.Endpoint{ []*endpoint.Endpoint{
{DNSName: "service.example.org", Targets: endpoint.Targets{"2001:db8::111"}, RecordType: endpoint.RecordTypeAAAA}, {DNSName: "service.example.org", Targets: endpoint.Targets{"2001:db8::111"}, RecordType: endpoint.RecordTypeAAAA},
}, },
@ -4107,6 +4121,7 @@ func TestExternalServices(t *testing.T) {
}, },
"remote.example.com", "remote.example.com",
[]string{}, []string{},
[]string{},
[]*endpoint.Endpoint{ []*endpoint.Endpoint{
{DNSName: "service.example.org", Targets: endpoint.Targets{"remote.example.com"}, RecordType: endpoint.RecordTypeCNAME}, {DNSName: "service.example.org", Targets: endpoint.Targets{"remote.example.com"}, RecordType: endpoint.RecordTypeCNAME},
}, },
@ -4127,6 +4142,7 @@ func TestExternalServices(t *testing.T) {
}, },
"service.example.org", "service.example.org",
[]string{"10.2.3.4", "11.2.3.4"}, []string{"10.2.3.4", "11.2.3.4"},
[]string{},
[]*endpoint.Endpoint{ []*endpoint.Endpoint{
{DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.2.3.4", "11.2.3.4"}}, {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.2.3.4", "11.2.3.4"}},
}, },
@ -4147,12 +4163,32 @@ func TestExternalServices(t *testing.T) {
}, },
"service.example.org", "service.example.org",
[]string{"10.2.3.4", "11.2.3.4", "2001:db8::1", "2001:db8::2"}, []string{"10.2.3.4", "11.2.3.4", "2001:db8::1", "2001:db8::2"},
[]string{string(v1.ServiceTypeNodePort), string(v1.ServiceTypeExternalName)},
[]*endpoint.Endpoint{ []*endpoint.Endpoint{
{DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.2.3.4", "11.2.3.4"}}, {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.2.3.4", "11.2.3.4"}},
{DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::1", "2001:db8::2"}}, {DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::1", "2001:db8::2"}},
}, },
false, false,
}, },
{
"annotated ExternalName service with externalIPs of dualstack and excluded in serviceTypeFilter",
"",
"testing",
"foo",
v1.ServiceTypeExternalName,
"",
"",
false,
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
},
"service.example.org",
[]string{"10.2.3.4", "11.2.3.4", "2001:db8::1", "2001:db8::2"},
[]string{string(v1.ServiceTypeNodePort), string(v1.ServiceTypeClusterIP)},
[]*endpoint.Endpoint{},
false,
},
} { } {
t.Run(tc.title, func(t *testing.T) { t.Run(tc.title, func(t *testing.T) {
@ -4190,7 +4226,7 @@ func TestExternalServices(t *testing.T) {
true, true,
false, false,
false, false,
[]string{}, tc.serviceTypeFilter,
tc.ignoreHostnameAnnotation, tc.ignoreHostnameAnnotation,
labels.Everything(), labels.Everything(),
false, false,
@ -4267,6 +4303,73 @@ func BenchmarkServiceEndpoints(b *testing.B) {
} }
} }
func TestNewServiceSourceInformersEnabled(t *testing.T) {
tests := []struct {
name string
asserts func(svc *serviceSource)
svcFilter []string
}{
{
name: "serviceTypeFilter is set to empty",
asserts: func(svc *serviceSource) {
assert.NotNil(t, svc)
assert.NotNil(t, svc.serviceTypeFilter)
assert.False(t, svc.serviceTypeFilter.enabled)
assert.NotNil(t, svc.nodeInformer)
},
},
{
name: "serviceTypeFilter contains NodePort",
svcFilter: []string{string(v1.ServiceTypeClusterIP)},
asserts: func(svc *serviceSource) {
assert.NotNil(t, svc)
assert.NotNil(t, svc.serviceTypeFilter)
assert.True(t, svc.serviceTypeFilter.enabled)
assert.Nil(t, svc.nodeInformer)
},
},
{
name: "serviceTypeFilter contains NodePort",
svcFilter: []string{string(v1.ServiceTypeNodePort)},
asserts: func(svc *serviceSource) {
assert.NotNil(t, svc)
assert.NotNil(t, svc.serviceTypeFilter)
assert.True(t, svc.serviceTypeFilter.enabled)
assert.NotNil(t, svc.nodeInformer)
},
},
}
for _, ts := range tests {
t.Run(ts.name, func(t *testing.T) {
svc, err := NewServiceSource(
t.Context(),
fake.NewClientset(),
"default",
"",
"",
false,
"",
true,
false,
false,
ts.svcFilter,
false,
labels.Everything(),
false,
false,
false,
)
require.NoError(t, err)
svcSrc, ok := svc.(*serviceSource)
if !ok {
require.Fail(t, "expected serviceSource")
}
ts.asserts(svcSrc)
})
}
}
func TestNewServiceSourceWithServiceTypeFilters_Unsupported(t *testing.T) { func TestNewServiceSourceWithServiceTypeFilters_Unsupported(t *testing.T) {
serviceTypeFilter := []string{"ClusterIP", "ServiceTypeNotExist"} serviceTypeFilter := []string{"ClusterIP", "ServiceTypeNotExist"}
@ -4520,3 +4623,35 @@ func createTestServicesByType(namespace string, typeCounts map[v1.ServiceType]in
}) })
return services return services
} }
func TestServiceTypes_isNodeInformerRequired(t *testing.T) {
tests := []struct {
name string
filter []string
want bool
}{
{
name: "NodePort type present",
filter: []string{string(v1.ServiceTypeNodePort)},
want: true,
},
{
name: "NodePort type absent, filter enabled",
filter: []string{string(v1.ServiceTypeLoadBalancer)},
want: false,
},
{
name: "NodePort and other filters present",
filter: []string{string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeNodePort)},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filter, _ := newServiceTypesFilter(tt.filter)
got := filter.isNodeInformerRequired()
assert.Equal(t, tt.want, got)
})
}
}