diff --git a/source/endpoint_benchmark_test.go b/source/endpoint_benchmark_test.go index a178950d9..450a533af 100644 --- a/source/endpoint_benchmark_test.go +++ b/source/endpoint_benchmark_test.go @@ -28,6 +28,7 @@ import ( kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/external-dns/source/informers" v1alpha3 "istio.io/api/networking/v1alpha3" istiov1a "istio.io/client-go/pkg/apis/networking/v1" @@ -92,14 +93,9 @@ func svcInformerWithServices(toLookup, underTest int) (coreinformers.ServiceInfo svcInformer := informerFactory.Core().V1().Services() ctx := context.Background() - _, err := svcInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - }, - }, - ) + err := svcInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]()) if err != nil { - return nil, fmt.Errorf("failed to add event handler: %w", err) + return nil, fmt.Errorf("failed to add indexer: %w", err) } services := fixturesSvcWithLabels(toLookup, underTest) diff --git a/source/endpoints.go b/source/endpoints.go index b3667052d..e5813c4b7 100644 --- a/source/endpoints.go +++ b/source/endpoints.go @@ -16,10 +16,12 @@ package source import ( "fmt" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" coreinformers "k8s.io/client-go/informers/core/v1" "sigs.k8s.io/external-dns/endpoint" + "sigs.k8s.io/external-dns/source/informers" ) // EndpointsForHostname returns the endpoint objects for each host-target combination. @@ -84,14 +86,15 @@ func EndpointsForHostname(hostname string, targets endpoint.Targets, ttl endpoin func EndpointTargetsFromServices(svcInformer coreinformers.ServiceInformer, namespace string, selector map[string]string) (endpoint.Targets, error) { targets := endpoint.Targets{} - services, err := svcInformer.Lister().Services(namespace).List(labels.Everything()) + services, err := svcInformer.Informer().GetIndexer().ByIndex(informers.IndexWithSpecSelector, informers.ToSHA(labels.Set(selector).String())) if err != nil { return nil, fmt.Errorf("failed to list labels for services in namespace %q: %w", namespace, err) } - for _, service := range services { - if !MatchesServiceSelector(selector, service.Spec.Selector) { + for _, svc := range services { + service, ok := svc.(*corev1.Service) + if !ok { continue } diff --git a/source/endpoints_test.go b/source/endpoints_test.go index 9470db689..4746e2708 100644 --- a/source/endpoints_test.go +++ b/source/endpoints_test.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/external-dns/source/informers" "sigs.k8s.io/external-dns/endpoint" ) @@ -247,6 +248,8 @@ func TestEndpointTargetsFromServices(t *testing.T) { informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0, kubeinformers.WithNamespace(tt.namespace)) serviceInformer := informerFactory.Core().V1().Services() + err := serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]()) + assert.NoError(t, err) for _, svc := range tt.services { _, err := client.CoreV1().Services(tt.namespace).Create(context.Background(), svc, metav1.CreateOptions{}) diff --git a/source/informers/fake.go b/source/informers/fake.go index aed7b3aee..294206d7f 100644 --- a/source/informers/fake.go +++ b/source/informers/fake.go @@ -15,6 +15,9 @@ package informers import ( "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" corev1lister "k8s.io/client-go/listers/core/v1" discoveryv1lister "k8s.io/client-go/listers/discovery/v1" "k8s.io/client-go/tools/cache" @@ -58,3 +61,49 @@ func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer { func (f *FakeNodeInformer) Lister() corev1lister.NodeLister { return corev1lister.NewNodeLister(f.Informer().GetIndexer()) } + +func fakeService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-service", + Namespace: "ns", + Labels: map[string]string{"env": "prod", "team": "devops"}, + Annotations: map[string]string{"description": "Enriched service object"}, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"app": "demo"}, + ExternalIPs: []string{"1.2.3.4"}, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt32(8080), + Protocol: corev1.ProtocolTCP, + }, + { + Name: "https", + Port: 443, + TargetPort: intstr.FromInt32(8443), + Protocol: corev1.ProtocolTCP, + }, + }, + Type: corev1.ServiceTypeLoadBalancer, + }, + Status: corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{ + {IP: "5.6.7.8", Hostname: "lb.example.com"}, + }, + }, + Conditions: []metav1.Condition{ + { + Type: "Available", + Status: metav1.ConditionTrue, + Reason: "MinimumReplicasAvailable", + Message: "Service is available", + LastTransitionTime: metav1.Now(), + }, + }, + }, + } +} diff --git a/source/informers/indexers.go b/source/informers/indexers.go index b2b947da4..4e851d796 100644 --- a/source/informers/indexers.go +++ b/source/informers/indexers.go @@ -16,6 +16,7 @@ package informers import ( "fmt" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -25,7 +26,8 @@ import ( ) const ( - IndexWithSelectors = "withSelectors" + IndexWithSelectors = "withSelectors" + IndexWithSpecSelector = "spec.selector" ) type IndexSelectorOptions struct { @@ -95,6 +97,23 @@ func IndexerWithOptions[T metav1.Object](optFns ...func(options *IndexSelectorOp } } +// IndexerSpecSelector returns a cache.Indexers map that indexes Kubernetes Service objects +// by a hash of their .spec.selector field. This enables efficient lookups of Services +// sharing the same selector. The function is generic over metav1.Object, but only operates +// on *corev1.Service objects at the moment. If the object is not a Service, it does not index. +func IndexerSpecSelector[T metav1.Object]() cache.Indexers { + return cache.Indexers{ + IndexWithSpecSelector: func(obj interface{}) ([]string, error) { + entity, ok := obj.(*corev1.Service) + if !ok { + return nil, nil + } + key := ToSHA(labels.Set(entity.Spec.Selector).String()) + return []string{key}, nil + }, + } +} + // GetByKey retrieves an object of type T (metav1.Object) from the given cache.Indexer by its key. // It returns the object and an error if the retrieval or type assertion fails. // If the object does not exist, it returns the zero value of T and nil. diff --git a/source/informers/indexers_test.go b/source/informers/indexers_test.go index d9b5de61e..9e322f4c4 100644 --- a/source/informers/indexers_test.go +++ b/source/informers/indexers_test.go @@ -183,3 +183,34 @@ func TestGetByKey_TypeAssertionFailure(t *testing.T) { assert.Contains(t, err.Error(), "object is not of type") assert.Nil(t, result) } + +func TestIndexerSpecSelector_Service(t *testing.T) { + indexers := IndexerSpecSelector[*corev1.Service]() + svc := &corev1.Service{} + svc.Spec.Selector = map[string]string{"app": "demo", "tier": "backend"} + + keys, err := indexers[IndexWithSpecSelector](svc) + assert.NoError(t, err) + expected := ToSHA(labels.Set(svc.Spec.Selector).String()) + assert.Equal(t, []string{expected}, keys) +} + +func TestIndexerSpecSelector_NonService(t *testing.T) { + indexers := IndexerSpecSelector[*corev1.Service]() + obj := "not-a-service" + + keys, err := indexers[IndexWithSpecSelector](obj) + assert.NoError(t, err) + assert.Nil(t, keys) +} + +func TestIndexerSpecSelector_EmptySelector(t *testing.T) { + indexers := IndexerSpecSelector[*corev1.Service]() + svc := &corev1.Service{} + svc.Spec.Selector = map[string]string{} + + keys, err := indexers[IndexWithSpecSelector](svc) + assert.NoError(t, err) + expected := ToSHA(labels.Set(svc.Spec.Selector).String()) + assert.Equal(t, []string{expected}, keys) +} diff --git a/source/informers/transformers.go b/source/informers/transformers.go new file mode 100644 index 000000000..8bdb64c18 --- /dev/null +++ b/source/informers/transformers.go @@ -0,0 +1,80 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +type TransformOptions struct { + specSelector bool + specExternalIps bool + statusLb bool +} + +func TransformerWithOptions[T metav1.Object](optFns ...func(options *TransformOptions)) cache.TransformFunc { + options := TransformOptions{} + for _, fn := range optFns { + fn(&options) + } + return func(obj any) (any, error) { + // only transform if the object is a Service at the moment + entity, ok := obj.(*corev1.Service) + if !ok { + return nil, nil + } + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: entity.Name, + Namespace: entity.Namespace, + DeletionTimestamp: entity.DeletionTimestamp, + }, + Spec: corev1.ServiceSpec{}, + Status: corev1.ServiceStatus{}, + } + if options.specSelector { + svc.Spec.Selector = entity.Spec.Selector + } + if options.specExternalIps { + svc.Spec.ExternalIPs = entity.Spec.ExternalIPs + } + if options.statusLb { + svc.Status.LoadBalancer = entity.Status.LoadBalancer + } + return svc, nil + } +} + +// TransformWithSpecSelector enables copying the Service's .spec.selector field. +func TransformWithSpecSelector() func(options *TransformOptions) { + return func(options *TransformOptions) { + options.specSelector = true + } +} + +// TransformWithSpecExternalIPs enables copying the Service's .spec.externalIPs field. +func TransformWithSpecExternalIPs() func(options *TransformOptions) { + return func(options *TransformOptions) { + options.specExternalIps = true + } +} + +// TransformWithStatusLoadBalancer enables copying the Service's .status.loadBalancer field. +func TransformWithStatusLoadBalancer() func(options *TransformOptions) { + return func(options *TransformOptions) { + options.statusLb = true + } +} diff --git a/source/informers/transormers_test.go b/source/informers/transormers_test.go new file mode 100644 index 000000000..7d96b4855 --- /dev/null +++ b/source/informers/transormers_test.go @@ -0,0 +1,176 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" +) + +func TestTransformerWithOptions_Service(t *testing.T) { + base := fakeService() + + tests := []struct { + name string + options []func(*TransformOptions) + asserts func(any) + }{ + { + name: "minimalistic object", + options: nil, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.Empty(t, svc.UID) + assert.NotEmpty(t, svc.Name) + assert.NotEmpty(t, svc.Namespace) + }, + }, + { + name: "with selector", + options: []func(*TransformOptions){TransformWithSpecSelector()}, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.NotEmpty(t, svc.Spec.Selector) + assert.Empty(t, svc.Spec.ExternalIPs) + assert.Empty(t, svc.Status.LoadBalancer.Ingress) + }, + }, + { + name: "with selector", + options: []func(*TransformOptions){TransformWithSpecSelector()}, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.NotEmpty(t, svc.Spec.Selector) + assert.Empty(t, svc.Spec.ExternalIPs) + assert.Empty(t, svc.Status.LoadBalancer.Ingress) + }, + }, + { + name: "with loadBalancer", + options: []func(*TransformOptions){TransformWithStatusLoadBalancer()}, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.Empty(t, svc.Spec.Selector) + assert.Empty(t, svc.Spec.ExternalIPs) + assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress) + }, + }, + { + name: "all options", + options: []func(*TransformOptions){ + TransformWithSpecSelector(), + TransformWithSpecExternalIPs(), + TransformWithStatusLoadBalancer(), + }, + asserts: func(obj any) { + svc, ok := obj.(*corev1.Service) + assert.True(t, ok) + assert.NotEmpty(t, svc.Spec.Selector) + assert.NotEmpty(t, svc.Spec.ExternalIPs) + assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + transform := TransformerWithOptions[*corev1.Service](tt.options...) + got, err := transform(base) + require.NoError(t, err) + tt.asserts(got) + }) + } + + t.Run("non-service input", func(t *testing.T) { + transform := TransformerWithOptions[*corev1.Service]() + out, err := transform("not-a-service") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if out != nil { + t.Errorf("expected nil output for non-service input, got %v", out) + } + }) +} + +func TestTransformer_Service_WithFakeClient(t *testing.T) { + t.Run("with transformer", func(t *testing.T) { + ctx := t.Context() + svc := fakeService() + fakeClient := fake.NewClientset() + + _, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}) + require.NoError(t, err) + + factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace)) + serviceInformer := factory.Core().V1().Services() + err = serviceInformer.Informer().SetTransform(TransformerWithOptions[*corev1.Service]( + TransformWithSpecSelector(), + TransformWithSpecExternalIPs(), + TransformWithStatusLoadBalancer(), + )) + require.NoError(t, err) + + factory.Start(ctx.Done()) + err = WaitForCacheSync(ctx, factory) + require.NoError(t, err) + + got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name) + require.NoError(t, err) + + assert.Equal(t, svc.Spec.Selector, got.Spec.Selector) + assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs) + assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress) + assert.NotEqual(t, svc.Annotations, got.Annotations) + assert.NotEqual(t, svc.Labels, got.Labels) + }) + + t.Run("without transformer", func(t *testing.T) { + ctx := t.Context() + svc := fakeService() + fakeClient := fake.NewClientset() + + _, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}) + require.NoError(t, err) + + factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace)) + serviceInformer := factory.Core().V1().Services() + + err = serviceInformer.Informer().GetIndexer().Add(svc) + require.NoError(t, err) + + factory.Start(ctx.Done()) + err = WaitForCacheSync(ctx, factory) + require.NoError(t, err) + + got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name) + require.NoError(t, err) + + // assert.Equal(t, svc.Spec.Selector, got.Spec.Selector) + // assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs) + assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress) + assert.Equal(t, svc.Annotations, got.Annotations) + assert.Equal(t, svc.Labels, got.Labels) + }) +} diff --git a/source/informers/utils.go b/source/informers/utils.go new file mode 100644 index 000000000..3d169b13b --- /dev/null +++ b/source/informers/utils.go @@ -0,0 +1,30 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "crypto/sha1" + "encoding/hex" +) + +// ToSHA returns the SHA1 hash of the input string as a hex string. +// Using a SHA1 hash of the label selector string (as in ToSHA(labels.Set(selector).String())) is useful: +// - It provides a consistent and compact representation of the selector. +// - It allows for efficient indexing and lookup in Kubernetes informers. +// - It avoids issues with long label selector strings that could exceed index length limits. +func ToSHA(s string) string { + h := sha1.New() + h.Write([]byte(s)) + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/source/informers/utils_test.go b/source/informers/utils_test.go new file mode 100644 index 000000000..4904fe87a --- /dev/null +++ b/source/informers/utils_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/labels" +) + +func TestToSHA(t *testing.T) { + tests := []struct { + input string + expected string + }{ + { + input: "test", + expected: "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3", + }, + { + input: "", + expected: "da39a3ee5e6b4b0d3255bfef95601890afd80709", + }, + { + input: labels.Set(map[string]string{ + "app": "test", + "env": "production", + }).String(), + expected: "29eda95ee609e3186afe17e3bf988a654bc5b739", + }, + { + input: labels.Set(map[string]string{ + "app": "test", + "env": "production", + "version": "v1", + "component": "frontend", + }).String(), + expected: "446f9bdf6ba5c7edf324a07482bcd5c3b6c6ce38", + }, + } + + for _, tt := range tests { + got := ToSHA(tt.input) + assert.Equal(t, tt.expected, got) + } +} diff --git a/source/istio_gateway.go b/source/istio_gateway.go index 9043cdcfe..784a477c7 100644 --- a/source/istio_gateway.go +++ b/source/istio_gateway.go @@ -28,12 +28,12 @@ import ( istioclient "istio.io/client-go/pkg/clientset/versioned" istioinformers "istio.io/client-go/pkg/informers/externalversions" networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" @@ -79,35 +79,35 @@ func NewIstioGatewaySource( // Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace. // Set resync period to 0, to prevent processing when nothing has changed informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) - serviceInformer := informerFactory.Core().V1().Services() istioInformerFactory := istioinformers.NewSharedInformerFactory(istioClient, 0) gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways() - // Add default resource event handlers to properly initialize informer. - _, _ = serviceInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("service added") - }, - }, - ) + serviceInformer := informerFactory.Core().V1().Services() + err = serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]()) + if err != nil { + return nil, err + } + err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service]( + informers.TransformWithSpecSelector(), + informers.TransformWithSpecExternalIPs(), + informers.TransformWithStatusLoadBalancer(), + )) + if err != nil { + return nil, err + } - _, _ = gatewayInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("gateway added") - }, - }, - ) + // Add default resource event handlers to properly initialize informer. + _, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + _, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) informerFactory.Start(ctx.Done()) istioInformerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil { return nil, err } - if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil { return nil, err } @@ -191,7 +191,7 @@ func (sc *gatewaySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, e endpoints = append(endpoints, gwEndpoints...) } - // TODO: sort on endpoint creation + // TODO: sort on endpoint creation (performance) for _, ep := range endpoints { sort.Sort(ep.Targets) } @@ -241,6 +241,7 @@ func (sc *gatewaySource) targetsFromIngress(ctx context.Context, ingressStr stri targets := make(endpoint.Targets, 0) + // TODO: should be informer as currently this is make an API call for each gateway (performance) ingress, err := sc.kubeClient.NetworkingV1().Ingresses(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { log.Error(err) diff --git a/source/istio_gateway_test.go b/source/istio_gateway_test.go index 21f57009e..61479620c 100644 --- a/source/istio_gateway_test.go +++ b/source/istio_gateway_test.go @@ -1477,7 +1477,7 @@ func testGatewayEndpoints(t *testing.T) { t.Run(ti.title, func(t *testing.T) { t.Parallel() - fakeKubernetesClient := fake.NewSimpleClientset() + fakeKubernetesClient := fake.NewClientset() for _, lb := range ti.lbServices { service := lb.Service() @@ -1524,7 +1524,7 @@ func testGatewayEndpoints(t *testing.T) { // gateway specific helper functions func newTestGatewaySource(loadBalancerList []fakeIngressGatewayService, ingressList []fakeIngress) (*gatewaySource, error) { - fakeKubernetesClient := fake.NewSimpleClientset() + fakeKubernetesClient := fake.NewClientset() fakeIstioClient := istiofake.NewSimpleClientset() for _, lb := range loadBalancerList { diff --git a/source/istio_virtualservice.go b/source/istio_virtualservice.go index d16c236b0..11a46ad78 100644 --- a/source/istio_virtualservice.go +++ b/source/istio_virtualservice.go @@ -30,13 +30,13 @@ import ( istioclient "istio.io/client-go/pkg/clientset/versioned" istioinformers "istio.io/client-go/pkg/informers/externalversions" networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" @@ -82,44 +82,37 @@ func NewIstioVirtualServiceSource( // Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace. // Set resync period to 0, to prevent processing when nothing has changed informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) - serviceInformer := informerFactory.Core().V1().Services() istioInformerFactory := istioinformers.NewSharedInformerFactoryWithOptions(istioClient, 0, istioinformers.WithNamespace(namespace)) virtualServiceInformer := istioInformerFactory.Networking().V1beta1().VirtualServices() gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways() + serviceInformer := informerFactory.Core().V1().Services() + err = serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]()) + if err != nil { + return nil, err + } + err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service]( + informers.TransformWithSpecSelector(), + informers.TransformWithSpecExternalIPs(), + informers.TransformWithStatusLoadBalancer(), + )) + if err != nil { + return nil, err + } + // Add default resource event handlers to properly initialize informer. - _, _ = serviceInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("service added") - }, - }, - ) - - _, _ = virtualServiceInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("virtual service added") - }, - }, - ) - - _, _ = gatewayInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - log.Debug("gateway added") - }, - }, - ) + _, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + _, _ = virtualServiceInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + _, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) informerFactory.Start(ctx.Done()) istioInformerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil { return nil, err } - if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil { return nil, err } @@ -190,7 +183,7 @@ func (sc *virtualServiceSource) Endpoints(ctx context.Context) ([]*endpoint.Endp endpoints = append(endpoints, gwEndpoints...) } - // TODO: sort on endpoint creation + // TODO: sort on endpoint creation (performance) for _, ep := range endpoints { sort.Sort(ep.Targets) } @@ -426,6 +419,7 @@ func (sc *virtualServiceSource) targetsFromIngress(ctx context.Context, ingressS namespace = gateway.Namespace } + // TODO: should be informer as currently this is making an API call for each gateway (performance) ingress, err := sc.kubeClient.NetworkingV1().Ingresses(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { log.Error(err) diff --git a/source/istio_virtualservice_test.go b/source/istio_virtualservice_test.go index c4c01ba75..77875879c 100644 --- a/source/istio_virtualservice_test.go +++ b/source/istio_virtualservice_test.go @@ -49,7 +49,7 @@ type VirtualServiceSuite struct { } func (suite *VirtualServiceSuite) SetupTest() { - fakeKubernetesClient := fake.NewSimpleClientset() + fakeKubernetesClient := fake.NewClientset() fakeIstioClient := istiofake.NewSimpleClientset() var err error @@ -139,7 +139,6 @@ func TestVirtualService(t *testing.T) { t.Run("virtualServiceBindsToGateway", testVirtualServiceBindsToGateway) t.Run("endpointsFromVirtualServiceConfig", testEndpointsFromVirtualServiceConfig) t.Run("Endpoints", testVirtualServiceEndpoints) - t.Run("gatewaySelectorMatchesService", testGatewaySelectorMatchesService) } func TestNewIstioVirtualServiceSource(t *testing.T) { @@ -189,7 +188,7 @@ func TestNewIstioVirtualServiceSource(t *testing.T) { _, err := NewIstioVirtualServiceSource( context.TODO(), - fake.NewSimpleClientset(), + fake.NewClientset(), istiofake.NewSimpleClientset(), "", ti.annotationFilter, @@ -2008,38 +2007,6 @@ func testVirtualServiceEndpoints(t *testing.T) { } } -func testGatewaySelectorMatchesService(t *testing.T) { - for _, ti := range []struct { - title string - gwSelector map[string]string - lbSelector map[string]string - expected bool - }{ - { - title: "gw selector matches lb selector", - gwSelector: map[string]string{"istio": "ingressgateway"}, - lbSelector: map[string]string{"istio": "ingressgateway"}, - expected: true, - }, - { - title: "gw selector matches lb selector partially", - gwSelector: map[string]string{"istio": "ingressgateway"}, - lbSelector: map[string]string{"release": "istio", "istio": "ingressgateway"}, - expected: true, - }, - { - title: "gw selector does not match lb selector", - gwSelector: map[string]string{"app": "mytest"}, - lbSelector: map[string]string{"istio": "ingressgateway"}, - expected: false, - }, - } { - t.Run(ti.title, func(t *testing.T) { - require.Equal(t, ti.expected, MatchesServiceSelector(ti.gwSelector, ti.lbSelector)) - }) - } -} - func newTestVirtualServiceSource(loadBalancerList []fakeIngressGatewayService, ingressList []fakeIngress, gwList []fakeGatewayConfig) (*virtualServiceSource, error) { fakeKubernetesClient := fake.NewClientset() fakeIstioClient := istiofake.NewSimpleClientset() diff --git a/source/utils.go b/source/utils.go index 5858fddb8..4b5d37354 100644 --- a/source/utils.go +++ b/source/utils.go @@ -56,15 +56,3 @@ func ParseIngress(ingress string) (string, string, error) { return namespace, name, err } - -// MatchesServiceSelector checks if all key-value pairs in the selector map -// are present and match the corresponding key-value pairs in the svcSelector map. -// It returns true if all pairs match, otherwise it returns false. -func MatchesServiceSelector(selector, svcSelector map[string]string) bool { - for k, v := range selector { - if lbl, ok := svcSelector[k]; !ok || lbl != v { - return false - } - } - return true -} diff --git a/source/utils_test.go b/source/utils_test.go index ba6447c2e..a6c78b9f4 100644 --- a/source/utils_test.go +++ b/source/utils_test.go @@ -102,50 +102,3 @@ func TestParseIngress(t *testing.T) { }) } } - -func TestSelectorMatchesService(t *testing.T) { - tests := []struct { - name string - selector map[string]string - svcSelector map[string]string - expected bool - }{ - { - name: "all key-value pairs match", - selector: map[string]string{"app": "nginx", "env": "prod"}, - svcSelector: map[string]string{"app": "nginx", "env": "prod"}, - expected: true, - }, - { - name: "one key-value pair does not match", - selector: map[string]string{"app": "nginx", "env": "prod"}, - svcSelector: map[string]string{"app": "nginx", "env": "dev"}, - expected: false, - }, - { - name: "key not present in svcSelector", - selector: map[string]string{"app": "nginx", "env": "prod"}, - svcSelector: map[string]string{"app": "nginx"}, - expected: false, - }, - { - name: "empty selector", - selector: map[string]string{}, - svcSelector: map[string]string{"app": "nginx", "env": "prod"}, - expected: true, - }, - { - name: "empty svcSelector", - selector: map[string]string{"app": "nginx", "env": "prod"}, - svcSelector: map[string]string{}, - expected: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := MatchesServiceSelector(tt.selector, tt.svcSelector) - assert.Equal(t, tt.expected, result) - }) - } -}