diff --git a/source/informers/fake.go b/source/informers/fake.go index aed7b3aee..2eb86e326 100644 --- a/source/informers/fake.go +++ b/source/informers/fake.go @@ -15,6 +15,9 @@ package informers import ( "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" corev1lister "k8s.io/client-go/listers/core/v1" discoveryv1lister "k8s.io/client-go/listers/discovery/v1" "k8s.io/client-go/tools/cache" @@ -58,3 +61,50 @@ func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer { func (f *FakeNodeInformer) Lister() corev1lister.NodeLister { return corev1lister.NewNodeLister(f.Informer().GetIndexer()) } + +func fakeService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-service", + Namespace: "ns", + Labels: map[string]string{"env": "prod", "team": "devops"}, + Annotations: map[string]string{"description": "Enriched service object"}, + UID: "1234", + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"app": "demo"}, + ExternalIPs: []string{"1.2.3.4"}, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt32(8080), + Protocol: corev1.ProtocolTCP, + }, + { + Name: "https", + Port: 443, + TargetPort: intstr.FromInt32(8443), + Protocol: corev1.ProtocolTCP, + }, + }, + Type: corev1.ServiceTypeLoadBalancer, + }, + Status: corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{ + {IP: "5.6.7.8", Hostname: "lb.example.com"}, + }, + }, + Conditions: []metav1.Condition{ + { + Type: "Available", + Status: metav1.ConditionTrue, + Reason: "MinimumReplicasAvailable", + Message: "Service is available", + LastTransitionTime: metav1.Now(), + }, + }, + }, + } +} diff --git a/source/informers/transfomers.go b/source/informers/transfomers.go new file mode 100644 index 000000000..2d3daf5bc --- /dev/null +++ b/source/informers/transfomers.go @@ -0,0 +1,84 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +type TransformOptions struct { + specSelector bool + specExternalIps bool + statusLb bool +} + +func TransformerWithOptions[T metav1.Object](optFns ...func(options *TransformOptions)) cache.TransformFunc { + options := TransformOptions{} + for _, fn := range optFns { + fn(&options) + } + return func(obj any) (any, error) { + // only transform if the object is a Service at the moment + entity, ok := obj.(*corev1.Service) + if !ok { + return nil, nil + } + if entity.UID == "" { + // Pod was already transformed and we must be idempotent. + return entity, nil + } + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: entity.Name, + Namespace: entity.Namespace, + DeletionTimestamp: entity.DeletionTimestamp, + }, + Spec: corev1.ServiceSpec{}, + Status: corev1.ServiceStatus{}, + } + if options.specSelector { + svc.Spec.Selector = entity.Spec.Selector + } + if options.specExternalIps { + svc.Spec.ExternalIPs = entity.Spec.ExternalIPs + } + if options.statusLb { + svc.Status.LoadBalancer = entity.Status.LoadBalancer + } + return svc, nil + } +} + +// TransformWithSpecSelector enables copying the Service's .spec.selector field. +func TransformWithSpecSelector() func(options *TransformOptions) { + return func(options *TransformOptions) { + options.specSelector = true + } +} + +// TransformWithSpecExternalIPs enables copying the Service's .spec.externalIPs field. +func TransformWithSpecExternalIPs() func(options *TransformOptions) { + return func(options *TransformOptions) { + options.specExternalIps = true + } +} + +// TransformWithStatusLoadBalancer enables copying the Service's .status.loadBalancer field. +func TransformWithStatusLoadBalancer() func(options *TransformOptions) { + return func(options *TransformOptions) { + options.statusLb = true + } +} diff --git a/source/informers/transformers_test.go b/source/informers/transformers_test.go new file mode 100644 index 000000000..b81664977 --- /dev/null +++ b/source/informers/transformers_test.go @@ -0,0 +1,176 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" +) + +func TestTransformerWithOptions_Service(t *testing.T) { + base := fakeService() + + tests := []struct { + name string + options []func(*TransformOptions) + asserts func(any) + }{ + { + name: "minimalistic object", + options: nil, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.Empty(t, svc.UID) + assert.NotEmpty(t, svc.Name) + assert.NotEmpty(t, svc.Namespace) + }, + }, + { + name: "with selector", + options: []func(*TransformOptions){TransformWithSpecSelector()}, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.NotEmpty(t, svc.Spec.Selector) + assert.Empty(t, svc.Spec.ExternalIPs) + assert.Empty(t, svc.Status.LoadBalancer.Ingress) + }, + }, + { + name: "with selector", + options: []func(*TransformOptions){TransformWithSpecSelector()}, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.NotEmpty(t, svc.Spec.Selector) + assert.Empty(t, svc.Spec.ExternalIPs) + assert.Empty(t, svc.Status.LoadBalancer.Ingress) + }, + }, + { + name: "with loadBalancer", + options: []func(*TransformOptions){TransformWithStatusLoadBalancer()}, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.Empty(t, svc.Spec.Selector) + assert.Empty(t, svc.Spec.ExternalIPs) + assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress) + }, + }, + { + name: "all options", + options: []func(*TransformOptions){ + TransformWithSpecSelector(), + TransformWithSpecExternalIPs(), + TransformWithStatusLoadBalancer(), + }, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.NotEmpty(t, svc.Spec.Selector) + assert.NotEmpty(t, svc.Spec.ExternalIPs) + assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + transform := TransformerWithOptions[*corev1.Service](tt.options...) + got, err := transform(base) + require.NoError(t, err) + tt.asserts(got) + }) + } + + t.Run("non-service input", func(t *testing.T) { + transform := TransformerWithOptions[*corev1.Service]() + out, err := transform("not-a-service") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if out != nil { + t.Errorf("expected nil output for non-service input, got %v", out) + } + }) +} + +func TestTransformer_Service_WithFakeClient(t *testing.T) { + t.Run("with transformer", func(t *testing.T) { + ctx := t.Context() + svc := fakeService() + fakeClient := fake.NewClientset() + + _, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}) + require.NoError(t, err) + + factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace)) + serviceInformer := factory.Core().V1().Services() + err = serviceInformer.Informer().SetTransform(TransformerWithOptions[*corev1.Service]( + TransformWithSpecSelector(), + TransformWithSpecExternalIPs(), + TransformWithStatusLoadBalancer(), + )) + require.NoError(t, err) + + factory.Start(ctx.Done()) + err = WaitForCacheSync(ctx, factory) + require.NoError(t, err) + + got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name) + require.NoError(t, err) + + assert.Equal(t, svc.Spec.Selector, got.Spec.Selector) + assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs) + assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress) + assert.NotEqual(t, svc.Annotations, got.Annotations) + assert.NotEqual(t, svc.Labels, got.Labels) + }) + + t.Run("without transformer", func(t *testing.T) { + ctx := t.Context() + svc := fakeService() + fakeClient := fake.NewClientset() + + _, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}) + require.NoError(t, err) + + factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace)) + serviceInformer := factory.Core().V1().Services() + + err = serviceInformer.Informer().GetIndexer().Add(svc) + require.NoError(t, err) + + factory.Start(ctx.Done()) + err = WaitForCacheSync(ctx, factory) + require.NoError(t, err) + + got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name) + require.NoError(t, err) + + assert.Equal(t, map[string]string{"app": "demo"}, got.Spec.Selector) + assert.Equal(t, []string{"1.2.3.4"}, got.Spec.ExternalIPs) + assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress) + assert.Equal(t, svc.Annotations, got.Annotations) + assert.Equal(t, svc.Labels, got.Labels) + }) +} diff --git a/source/istio_gateway.go b/source/istio_gateway.go index 9043cdcfe..ce3f17756 100644 --- a/source/istio_gateway.go +++ b/source/istio_gateway.go @@ -28,12 +28,12 @@ import ( istioclient "istio.io/client-go/pkg/clientset/versioned" istioinformers "istio.io/client-go/pkg/informers/externalversions" networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" @@ -84,30 +84,26 @@ func NewIstioGatewaySource( gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways() // Add default resource event handlers to properly initialize informer. - _, _ = serviceInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("service added") - }, - }, - ) + _, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service]( + informers.TransformWithSpecSelector(), + informers.TransformWithSpecExternalIPs(), + informers.TransformWithStatusLoadBalancer(), + )) + if err != nil { + return nil, err + } - _, _ = gatewayInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("gateway added") - }, - }, - ) + _, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) informerFactory.Start(ctx.Done()) istioInformerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil { return nil, err } - if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil { return nil, err } diff --git a/source/istio_gateway_test.go b/source/istio_gateway_test.go index 0352bf689..79a151d2c 100644 --- a/source/istio_gateway_test.go +++ b/source/istio_gateway_test.go @@ -32,6 +32,7 @@ import ( networkv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" "sigs.k8s.io/external-dns/endpoint" @@ -1634,6 +1635,99 @@ func TestGatewaySource_GWSelectorMatchServiceSelector(t *testing.T) { } } +func TestTransformerInIstioGatewaySource(t *testing.T) { + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-service", + Namespace: "default", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + "label3": "value3", + }, + Annotations: map[string]string{ + "user-annotation": "value", + "external-dns.alpha.kubernetes.io/hostname": "test-hostname", + "external-dns.alpha.kubernetes.io/random": "value", + "other/annotation": "value", + }, + UID: "someuid", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "selector": "one", + "selector2": "two", + "selector3": "three", + }, + ExternalIPs: []string{"1.2.3.4"}, + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt32(8080), + Protocol: v1.ProtocolTCP, + }, + { + Name: "https", + Port: 443, + TargetPort: intstr.FromInt32(8443), + Protocol: v1.ProtocolTCP, + }, + }, + Type: v1.ServiceTypeLoadBalancer, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + {IP: "5.6.7.8", Hostname: "lb.example.com"}, + }, + }, + Conditions: []metav1.Condition{ + { + Type: "Available", + Status: metav1.ConditionTrue, + Reason: "MinimumReplicasAvailable", + Message: "Service is available", + LastTransitionTime: metav1.Now(), + }, + }, + }, + } + + fakeClient := fake.NewClientset() + + _, err := fakeClient.CoreV1().Services(svc.Namespace).Create(context.Background(), svc, metav1.CreateOptions{}) + require.NoError(t, err) + + src, err := NewIstioGatewaySource( + t.Context(), + fakeClient, + istiofake.NewSimpleClientset(), + "", + "", + "", + false, + false) + require.NoError(t, err) + gwSource, ok := src.(*gatewaySource) + require.True(t, ok) + + rService, err := gwSource.serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name) + require.NoError(t, err) + + assert.Equal(t, "fake-service", rService.Name) + assert.Empty(t, rService.Labels) + assert.Empty(t, rService.Annotations) + assert.Empty(t, rService.UID) + assert.NotEmpty(t, rService.Status.LoadBalancer) + assert.Empty(t, rService.Status.Conditions) + assert.Equal(t, map[string]string{ + "selector": "one", + "selector2": "two", + "selector3": "three", + }, rService.Spec.Selector) +} + // gateway specific helper functions func newTestGatewaySource(loadBalancerList []fakeIngressGatewayService, ingressList []fakeIngress) (*gatewaySource, error) { fakeKubernetesClient := fake.NewClientset() diff --git a/source/istio_virtualservice.go b/source/istio_virtualservice.go index d16c236b0..08b0ec426 100644 --- a/source/istio_virtualservice.go +++ b/source/istio_virtualservice.go @@ -30,13 +30,13 @@ import ( istioclient "istio.io/client-go/pkg/clientset/versioned" istioinformers "istio.io/client-go/pkg/informers/externalversions" networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" @@ -88,38 +88,28 @@ func NewIstioVirtualServiceSource( gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways() // Add default resource event handlers to properly initialize informer. - _, _ = serviceInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("service added") - }, - }, - ) + _, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service]( + informers.TransformWithSpecSelector(), + informers.TransformWithSpecExternalIPs(), + informers.TransformWithStatusLoadBalancer(), + )) + if err != nil { + return nil, err + } - _, _ = virtualServiceInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("virtual service added") - }, - }, - ) + _, _ = virtualServiceInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) - _, _ = gatewayInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("gateway added") - }, - }, - ) + _, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) informerFactory.Start(ctx.Done()) istioInformerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil { return nil, err } - if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil { return nil, err } diff --git a/source/istio_virtualservice_test.go b/source/istio_virtualservice_test.go index e39f95d5c..198b87cc3 100644 --- a/source/istio_virtualservice_test.go +++ b/source/istio_virtualservice_test.go @@ -33,6 +33,7 @@ import ( networkv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" "sigs.k8s.io/external-dns/endpoint" @@ -2335,3 +2336,96 @@ func TestIstioVirtualServiceSource_GWServiceSelectorMatchServiceSelector(t *test }) } } + +func TestTransformerInIstioGatewayVirtualServiceSource(t *testing.T) { + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-service", + Namespace: "default", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + "label3": "value3", + }, + Annotations: map[string]string{ + "user-annotation": "value", + "external-dns.alpha.kubernetes.io/hostname": "test-hostname", + "external-dns.alpha.kubernetes.io/random": "value", + "other/annotation": "value", + }, + UID: "someuid", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "selector": "one", + "selector2": "two", + "selector3": "three", + }, + ExternalIPs: []string{"1.2.3.4"}, + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt32(8080), + Protocol: v1.ProtocolTCP, + }, + { + Name: "https", + Port: 443, + TargetPort: intstr.FromInt32(8443), + Protocol: v1.ProtocolTCP, + }, + }, + Type: v1.ServiceTypeLoadBalancer, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + {IP: "5.6.7.8", Hostname: "lb.example.com"}, + }, + }, + Conditions: []metav1.Condition{ + { + Type: "Available", + Status: metav1.ConditionTrue, + Reason: "MinimumReplicasAvailable", + Message: "Service is available", + LastTransitionTime: metav1.Now(), + }, + }, + }, + } + + fakeClient := fake.NewClientset() + + _, err := fakeClient.CoreV1().Services(svc.Namespace).Create(t.Context(), svc, metav1.CreateOptions{}) + require.NoError(t, err) + + src, err := NewIstioVirtualServiceSource( + t.Context(), + fakeClient, + istiofake.NewSimpleClientset(), + "", + "", + "", + false, + false) + require.NoError(t, err) + gwSource, ok := src.(*virtualServiceSource) + require.True(t, ok) + + rService, err := gwSource.serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name) + require.NoError(t, err) + + assert.Equal(t, svc.Name, rService.Name) + assert.Empty(t, rService.Labels) + assert.Empty(t, rService.Annotations) + assert.Empty(t, rService.UID) + assert.NotEmpty(t, rService.Status.LoadBalancer) + assert.Empty(t, rService.Status.Conditions) + assert.Equal(t, map[string]string{ + "selector": "one", + "selector2": "two", + "selector3": "three", + }, rService.Spec.Selector) +}