diff --git a/source/informers/handlers.go b/source/informers/handlers.go new file mode 100644 index 000000000..2d2067f45 --- /dev/null +++ b/source/informers/handlers.go @@ -0,0 +1,38 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/tools/cache" +) + +func DefaultEventHandler(handlers ...func()) cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if u, ok := obj.(*unstructured.Unstructured); ok { + log.WithFields(log.Fields{ + "apiVersion": u.GetAPIVersion(), + "kind": u.GetKind(), + "namespace": u.GetNamespace(), + "name": u.GetName(), + }).Debug("added") + for _, handler := range handlers { + handler() + } + } + }, + } +} diff --git a/source/informers/handlers_test.go b/source/informers/handlers_test.go new file mode 100644 index 000000000..519b99b52 --- /dev/null +++ b/source/informers/handlers_test.go @@ -0,0 +1,50 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "testing" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func TestDefaultEventHandler_AddFunc(t *testing.T) { + tests := []struct { + name string + obj any + expected bool + }{ + { + name: "calls handler for unstructured object", + obj: &unstructured.Unstructured{}, + expected: true, + }, + { + name: "does not call handler for unknown object", + obj: "not-unstructured", + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + called := false + handler := DefaultEventHandler(func() { called = true }) + handler.OnAdd(tt.obj, true) + if called != tt.expected { + t.Errorf("handler called = %v, want %v", called, tt.expected) + } + }) + } +} diff --git a/source/service.go b/source/service.go index 2606c0d8f..24f3b60ee 100644 --- a/source/service.go +++ b/source/service.go @@ -98,34 +98,39 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name serviceInformer := informerFactory.Core().V1().Services() endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices() podInformer := informerFactory.Core().V1().Pods() - nodeInformer := informerFactory.Core().V1().Nodes() // Add default resource event handlers to properly initialize informer. - serviceInformer.Informer().AddEventHandler( + _, _ = serviceInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { }, }, ) - endpointSlicesInformer.Informer().AddEventHandler( + _, _ = endpointSlicesInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { }, }, ) - podInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - }, - }, - ) - nodeInformer.Informer().AddEventHandler( + _, _ = podInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { }, }, ) + // Transform the slice into a map so it will be way much easier and fast to filter later + sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter) + if err != nil { + return nil, err + } + + var nodeInformer coreinformers.NodeInformer + if sTypesFilter.isNodeInformerRequired() { + nodeInformer = informerFactory.Core().V1().Nodes() + _, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + } + // Add an indexer to the EndpointSlice informer to index by the service name label err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{ serviceNameIndexKey: func(obj any) ([]string, error) { @@ -153,12 +158,6 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name return nil, err } - // Transform the slice into a map so it will be way much easier and fast to filter later - sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter) - if err != nil { - return nil, err - } - return &serviceSource{ client: kubeClient, namespace: namespace, @@ -197,7 +196,7 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err return nil, err } - endpoints := []*endpoint.Endpoint{} + endpoints := make([]*endpoint.Endpoint, 0) for _, svc := range services { // Check controller annotation to see if we are responsible. @@ -364,6 +363,10 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri targets := annotations.TargetsFromTargetAnnotation(pod.Annotations) if len(targets) == 0 { if endpointsType == EndpointsTypeNodeExternalIP { + if sc.nodeInformer == nil { + log.Warnf("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer", endpointSlice.Namespace, endpointSlice.Name) + continue + } node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName) if err != nil { log.Errorf("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints", pod.Spec.NodeName, pod.GetName(), err) @@ -459,7 +462,8 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.End // endpointsFromService extracts the endpoints from a service object func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint { var endpoints []*endpoint.Endpoint - // Skip endpoints if we do not want entries from annotations + + // Skip endpoints if we do not want entries from annotations or service is excluded if sc.ignoreHostnameAnnotation { return endpoints } @@ -512,7 +516,7 @@ func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Servi } var result []*v1.Service for _, service := range services { - if _, ok := sc.serviceTypeFilter.types[service.Spec.Type]; ok { + if sc.serviceTypeFilter.isProcessed(service.Spec.Type) { result = append(result, service) } } @@ -797,9 +801,12 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) { // Right now there is no way to remove event handler from informer, see: // https://github.com/kubernetes/kubernetes/issues/79610 - sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) + _, _ = sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) if sc.listenEndpointEvents { - sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) + _, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) + } + if sc.serviceTypeFilter.isNodeInformerRequired() { + _, _ = sc.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) } } @@ -817,20 +824,32 @@ func newServiceTypesFilter(filter []string) (*serviceTypes, error) { enabled: false, }, nil } - types := make(map[v1.ServiceType]bool) + result := make(map[v1.ServiceType]bool) for _, serviceType := range filter { if _, ok := knownServiceTypes[v1.ServiceType(serviceType)]; !ok { return nil, fmt.Errorf("unsupported service type filter: %q. Supported types are: %q", serviceType, slices.Collect(maps.Keys(knownServiceTypes))) } - types[v1.ServiceType(serviceType)] = true + result[v1.ServiceType(serviceType)] = true } return &serviceTypes{ enabled: true, - types: types, + types: result, }, nil } +func (sc *serviceTypes) isProcessed(serviceType v1.ServiceType) bool { + return !sc.enabled || sc.types[serviceType] +} + +func (sc *serviceTypes) isNodeInformerRequired() bool { + if !sc.enabled { + return true + } + _, ok := sc.types[v1.ServiceTypeNodePort] + return ok +} + // conditionToBool converts an EndpointConditions condition to a bool value. func conditionToBool(v *bool) bool { if v == nil { diff --git a/source/service_test.go b/source/service_test.go index 406bdbc43..ecbe1fc83 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -302,6 +302,18 @@ func testServiceSourceEndpoints(t *testing.T) { {DNSName: "foo.fqdn.com", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, }, }, + { + title: "with excluded service type should not generate endpoints", + svcNamespace: "testing", + svcName: "foo", + svcType: v1.ServiceTypeLoadBalancer, + fqdnTemplate: "{{.Name}}.fqdn.org,{{.Name}}.fqdn.com", + labels: map[string]string{}, + annotations: map[string]string{}, + lbs: []string{"1.2.3.4"}, + serviceTypesFilter: []string{string(v1.ServiceTypeNodePort)}, + expected: []*endpoint.Endpoint{}, + }, { title: "FQDN template with multiple hostnames return an endpoint with target IP when ignoring annotations", svcNamespace: "testing", @@ -457,7 +469,7 @@ func testServiceSourceEndpoints(t *testing.T) { }, externalIPs: []string{}, lbs: []string{"1.2.3.4"}, - serviceTypesFilter: []string{}, + serviceTypesFilter: []string{string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeNodePort)}, expected: []*endpoint.Endpoint{ {DNSName: "foo.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, }, @@ -922,7 +934,6 @@ func testServiceSourceEndpoints(t *testing.T) { annotations: map[string]string{ hostnameAnnotationKey: "foo.example.org.", }, - externalIPs: []string{}, lbs: []string{"1.2.3.4"}, serviceTypesFilter: []string{string(v1.ServiceTypeLoadBalancer)}, expected: []*endpoint.Endpoint{}, @@ -4049,6 +4060,7 @@ func TestExternalServices(t *testing.T) { annotations map[string]string externalName string externalIPs []string + serviceTypeFilter []string expected []*endpoint.Endpoint expectError bool }{ @@ -4067,6 +4079,7 @@ func TestExternalServices(t *testing.T) { }, "111.111.111.111", []string{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", Targets: endpoint.Targets{"111.111.111.111"}, RecordType: endpoint.RecordTypeA}, }, @@ -4087,6 +4100,7 @@ func TestExternalServices(t *testing.T) { }, "2001:db8::111", []string{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", Targets: endpoint.Targets{"2001:db8::111"}, RecordType: endpoint.RecordTypeAAAA}, }, @@ -4107,6 +4121,7 @@ func TestExternalServices(t *testing.T) { }, "remote.example.com", []string{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", Targets: endpoint.Targets{"remote.example.com"}, RecordType: endpoint.RecordTypeCNAME}, }, @@ -4127,6 +4142,7 @@ func TestExternalServices(t *testing.T) { }, "service.example.org", []string{"10.2.3.4", "11.2.3.4"}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.2.3.4", "11.2.3.4"}}, }, @@ -4147,12 +4163,32 @@ func TestExternalServices(t *testing.T) { }, "service.example.org", []string{"10.2.3.4", "11.2.3.4", "2001:db8::1", "2001:db8::2"}, + []string{string(v1.ServiceTypeNodePort), string(v1.ServiceTypeExternalName)}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.2.3.4", "11.2.3.4"}}, {DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::1", "2001:db8::2"}}, }, false, }, + { + "annotated ExternalName service with externalIPs of dualstack and excluded in serviceTypeFilter", + "", + "testing", + "foo", + v1.ServiceTypeExternalName, + "", + "", + false, + map[string]string{"component": "foo"}, + map[string]string{ + hostnameAnnotationKey: "service.example.org", + }, + "service.example.org", + []string{"10.2.3.4", "11.2.3.4", "2001:db8::1", "2001:db8::2"}, + []string{string(v1.ServiceTypeNodePort), string(v1.ServiceTypeClusterIP)}, + []*endpoint.Endpoint{}, + false, + }, } { t.Run(tc.title, func(t *testing.T) { @@ -4190,7 +4226,7 @@ func TestExternalServices(t *testing.T) { true, false, false, - []string{}, + tc.serviceTypeFilter, tc.ignoreHostnameAnnotation, labels.Everything(), false, @@ -4267,6 +4303,73 @@ func BenchmarkServiceEndpoints(b *testing.B) { } } +func TestNewServiceSourceInformersEnabled(t *testing.T) { + tests := []struct { + name string + asserts func(svc *serviceSource) + svcFilter []string + }{ + { + name: "serviceTypeFilter is set to empty", + asserts: func(svc *serviceSource) { + assert.NotNil(t, svc) + assert.NotNil(t, svc.serviceTypeFilter) + assert.False(t, svc.serviceTypeFilter.enabled) + assert.NotNil(t, svc.nodeInformer) + }, + }, + { + name: "serviceTypeFilter contains NodePort", + svcFilter: []string{string(v1.ServiceTypeClusterIP)}, + asserts: func(svc *serviceSource) { + assert.NotNil(t, svc) + assert.NotNil(t, svc.serviceTypeFilter) + assert.True(t, svc.serviceTypeFilter.enabled) + assert.Nil(t, svc.nodeInformer) + }, + }, + { + name: "serviceTypeFilter contains NodePort", + svcFilter: []string{string(v1.ServiceTypeNodePort)}, + asserts: func(svc *serviceSource) { + assert.NotNil(t, svc) + assert.NotNil(t, svc.serviceTypeFilter) + assert.True(t, svc.serviceTypeFilter.enabled) + assert.NotNil(t, svc.nodeInformer) + }, + }, + } + + for _, ts := range tests { + t.Run(ts.name, func(t *testing.T) { + svc, err := NewServiceSource( + t.Context(), + fake.NewClientset(), + "default", + "", + "", + false, + "", + true, + false, + false, + ts.svcFilter, + false, + labels.Everything(), + false, + false, + false, + ) + require.NoError(t, err) + svcSrc, ok := svc.(*serviceSource) + if !ok { + require.Fail(t, "expected serviceSource") + } + ts.asserts(svcSrc) + }) + } +} + func TestNewServiceSourceWithServiceTypeFilters_Unsupported(t *testing.T) { serviceTypeFilter := []string{"ClusterIP", "ServiceTypeNotExist"} @@ -4520,3 +4623,35 @@ func createTestServicesByType(namespace string, typeCounts map[v1.ServiceType]in }) return services } + +func TestServiceTypes_isNodeInformerRequired(t *testing.T) { + tests := []struct { + name string + filter []string + want bool + }{ + { + name: "NodePort type present", + filter: []string{string(v1.ServiceTypeNodePort)}, + want: true, + }, + { + name: "NodePort type absent, filter enabled", + filter: []string{string(v1.ServiceTypeLoadBalancer)}, + want: false, + }, + { + name: "NodePort and other filters present", + filter: []string{string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeNodePort)}, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filter, _ := newServiceTypesFilter(tt.filter) + got := filter.isNodeInformerRequired() + assert.Equal(t, tt.want, got) + }) + } +}