feat(source/istio): speed up processing with indexers

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
ivan katliarchuk 2025-08-01 13:21:40 +01:00
parent 468fb66758
commit 8c7af680c0
No known key found for this signature in database
GPG Key ID: 601CDBBBB76E47BE
15 changed files with 493 additions and 147 deletions

View File

@ -28,6 +28,7 @@ import (
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/source/informers"
v1alpha3 "istio.io/api/networking/v1alpha3" v1alpha3 "istio.io/api/networking/v1alpha3"
istiov1a "istio.io/client-go/pkg/apis/networking/v1" istiov1a "istio.io/client-go/pkg/apis/networking/v1"
@ -92,14 +93,9 @@ func svcInformerWithServices(toLookup, underTest int) (coreinformers.ServiceInfo
svcInformer := informerFactory.Core().V1().Services() svcInformer := informerFactory.Core().V1().Services()
ctx := context.Background() ctx := context.Background()
_, err := svcInformer.Informer().AddEventHandler( err := svcInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to add event handler: %w", err) return nil, fmt.Errorf("failed to add indexer: %w", err)
} }
services := fixturesSvcWithLabels(toLookup, underTest) services := fixturesSvcWithLabels(toLookup, underTest)

View File

@ -16,10 +16,12 @@ package source
import ( import (
"fmt" "fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/informers"
) )
// EndpointsForHostname returns the endpoint objects for each host-target combination. // EndpointsForHostname returns the endpoint objects for each host-target combination.
@ -84,14 +86,15 @@ func EndpointsForHostname(hostname string, targets endpoint.Targets, ttl endpoin
func EndpointTargetsFromServices(svcInformer coreinformers.ServiceInformer, namespace string, selector map[string]string) (endpoint.Targets, error) { func EndpointTargetsFromServices(svcInformer coreinformers.ServiceInformer, namespace string, selector map[string]string) (endpoint.Targets, error) {
targets := endpoint.Targets{} targets := endpoint.Targets{}
services, err := svcInformer.Lister().Services(namespace).List(labels.Everything()) services, err := svcInformer.Informer().GetIndexer().ByIndex(informers.IndexWithSpecSelector, informers.ToSHA(labels.Set(selector).String()))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list labels for services in namespace %q: %w", namespace, err) return nil, fmt.Errorf("failed to list labels for services in namespace %q: %w", namespace, err)
} }
for _, service := range services { for _, svc := range services {
if !MatchesServiceSelector(selector, service.Spec.Selector) { service, ok := svc.(*corev1.Service)
if !ok {
continue continue
} }

View File

@ -22,6 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/source/informers"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
) )
@ -247,6 +248,8 @@ func TestEndpointTargetsFromServices(t *testing.T) {
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0, informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0,
kubeinformers.WithNamespace(tt.namespace)) kubeinformers.WithNamespace(tt.namespace))
serviceInformer := informerFactory.Core().V1().Services() serviceInformer := informerFactory.Core().V1().Services()
err := serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
assert.NoError(t, err)
for _, svc := range tt.services { for _, svc := range tt.services {
_, err := client.CoreV1().Services(tt.namespace).Create(context.Background(), svc, metav1.CreateOptions{}) _, err := client.CoreV1().Services(tt.namespace).Create(context.Background(), svc, metav1.CreateOptions{})

View File

@ -15,6 +15,9 @@ package informers
import ( import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
corev1lister "k8s.io/client-go/listers/core/v1" corev1lister "k8s.io/client-go/listers/core/v1"
discoveryv1lister "k8s.io/client-go/listers/discovery/v1" discoveryv1lister "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -58,3 +61,49 @@ func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer {
func (f *FakeNodeInformer) Lister() corev1lister.NodeLister { func (f *FakeNodeInformer) Lister() corev1lister.NodeLister {
return corev1lister.NewNodeLister(f.Informer().GetIndexer()) return corev1lister.NewNodeLister(f.Informer().GetIndexer())
} }
func fakeService() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-service",
Namespace: "ns",
Labels: map[string]string{"env": "prod", "team": "devops"},
Annotations: map[string]string{"description": "Enriched service object"},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{"app": "demo"},
ExternalIPs: []string{"1.2.3.4"},
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.FromInt32(8080),
Protocol: corev1.ProtocolTCP,
},
{
Name: "https",
Port: 443,
TargetPort: intstr.FromInt32(8443),
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceTypeLoadBalancer,
},
Status: corev1.ServiceStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{
{IP: "5.6.7.8", Hostname: "lb.example.com"},
},
},
Conditions: []metav1.Condition{
{
Type: "Available",
Status: metav1.ConditionTrue,
Reason: "MinimumReplicasAvailable",
Message: "Service is available",
LastTransitionTime: metav1.Now(),
},
},
},
}
}

View File

@ -16,6 +16,7 @@ package informers
import ( import (
"fmt" "fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -25,7 +26,8 @@ import (
) )
const ( const (
IndexWithSelectors = "withSelectors" IndexWithSelectors = "withSelectors"
IndexWithSpecSelector = "spec.selector"
) )
type IndexSelectorOptions struct { type IndexSelectorOptions struct {
@ -95,6 +97,23 @@ func IndexerWithOptions[T metav1.Object](optFns ...func(options *IndexSelectorOp
} }
} }
// IndexerSpecSelector returns a cache.Indexers map that indexes Kubernetes Service objects
// by a hash of their .spec.selector field. This enables efficient lookups of Services
// sharing the same selector. The function is generic over metav1.Object, but only operates
// on *corev1.Service objects at the moment. If the object is not a Service, it does not index.
func IndexerSpecSelector[T metav1.Object]() cache.Indexers {
return cache.Indexers{
IndexWithSpecSelector: func(obj interface{}) ([]string, error) {
entity, ok := obj.(*corev1.Service)
if !ok {
return nil, nil
}
key := ToSHA(labels.Set(entity.Spec.Selector).String())
return []string{key}, nil
},
}
}
// GetByKey retrieves an object of type T (metav1.Object) from the given cache.Indexer by its key. // GetByKey retrieves an object of type T (metav1.Object) from the given cache.Indexer by its key.
// It returns the object and an error if the retrieval or type assertion fails. // It returns the object and an error if the retrieval or type assertion fails.
// If the object does not exist, it returns the zero value of T and nil. // If the object does not exist, it returns the zero value of T and nil.

View File

@ -183,3 +183,34 @@ func TestGetByKey_TypeAssertionFailure(t *testing.T) {
assert.Contains(t, err.Error(), "object is not of type") assert.Contains(t, err.Error(), "object is not of type")
assert.Nil(t, result) assert.Nil(t, result)
} }
func TestIndexerSpecSelector_Service(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
svc := &corev1.Service{}
svc.Spec.Selector = map[string]string{"app": "demo", "tier": "backend"}
keys, err := indexers[IndexWithSpecSelector](svc)
assert.NoError(t, err)
expected := ToSHA(labels.Set(svc.Spec.Selector).String())
assert.Equal(t, []string{expected}, keys)
}
func TestIndexerSpecSelector_NonService(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
obj := "not-a-service"
keys, err := indexers[IndexWithSpecSelector](obj)
assert.NoError(t, err)
assert.Nil(t, keys)
}
func TestIndexerSpecSelector_EmptySelector(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
svc := &corev1.Service{}
svc.Spec.Selector = map[string]string{}
keys, err := indexers[IndexWithSpecSelector](svc)
assert.NoError(t, err)
expected := ToSHA(labels.Set(svc.Spec.Selector).String())
assert.Equal(t, []string{expected}, keys)
}

View File

@ -0,0 +1,80 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)
type TransformOptions struct {
specSelector bool
specExternalIps bool
statusLb bool
}
func TransformerWithOptions[T metav1.Object](optFns ...func(options *TransformOptions)) cache.TransformFunc {
options := TransformOptions{}
for _, fn := range optFns {
fn(&options)
}
return func(obj any) (any, error) {
// only transform if the object is a Service at the moment
entity, ok := obj.(*corev1.Service)
if !ok {
return nil, nil
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: entity.Name,
Namespace: entity.Namespace,
DeletionTimestamp: entity.DeletionTimestamp,
},
Spec: corev1.ServiceSpec{},
Status: corev1.ServiceStatus{},
}
if options.specSelector {
svc.Spec.Selector = entity.Spec.Selector
}
if options.specExternalIps {
svc.Spec.ExternalIPs = entity.Spec.ExternalIPs
}
if options.statusLb {
svc.Status.LoadBalancer = entity.Status.LoadBalancer
}
return svc, nil
}
}
// TransformWithSpecSelector enables copying the Service's .spec.selector field.
func TransformWithSpecSelector() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.specSelector = true
}
}
// TransformWithSpecExternalIPs enables copying the Service's .spec.externalIPs field.
func TransformWithSpecExternalIPs() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.specExternalIps = true
}
}
// TransformWithStatusLoadBalancer enables copying the Service's .status.loadBalancer field.
func TransformWithStatusLoadBalancer() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.statusLb = true
}
}

View File

@ -0,0 +1,176 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
)
func TestTransformerWithOptions_Service(t *testing.T) {
base := fakeService()
tests := []struct {
name string
options []func(*TransformOptions)
asserts func(any)
}{
{
name: "minimalistic object",
options: nil,
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.Empty(t, svc.UID)
assert.NotEmpty(t, svc.Name)
assert.NotEmpty(t, svc.Namespace)
},
},
{
name: "with selector",
options: []func(*TransformOptions){TransformWithSpecSelector()},
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.NotEmpty(t, svc.Spec.Selector)
assert.Empty(t, svc.Spec.ExternalIPs)
assert.Empty(t, svc.Status.LoadBalancer.Ingress)
},
},
{
name: "with selector",
options: []func(*TransformOptions){TransformWithSpecSelector()},
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.NotEmpty(t, svc.Spec.Selector)
assert.Empty(t, svc.Spec.ExternalIPs)
assert.Empty(t, svc.Status.LoadBalancer.Ingress)
},
},
{
name: "with loadBalancer",
options: []func(*TransformOptions){TransformWithStatusLoadBalancer()},
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.Empty(t, svc.Spec.Selector)
assert.Empty(t, svc.Spec.ExternalIPs)
assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress)
},
},
{
name: "all options",
options: []func(*TransformOptions){
TransformWithSpecSelector(),
TransformWithSpecExternalIPs(),
TransformWithStatusLoadBalancer(),
},
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.NotEmpty(t, svc.Spec.Selector)
assert.NotEmpty(t, svc.Spec.ExternalIPs)
assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
transform := TransformerWithOptions[*corev1.Service](tt.options...)
got, err := transform(base)
require.NoError(t, err)
tt.asserts(got)
})
}
t.Run("non-service input", func(t *testing.T) {
transform := TransformerWithOptions[*corev1.Service]()
out, err := transform("not-a-service")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if out != nil {
t.Errorf("expected nil output for non-service input, got %v", out)
}
})
}
func TestTransformer_Service_WithFakeClient(t *testing.T) {
t.Run("with transformer", func(t *testing.T) {
ctx := t.Context()
svc := fakeService()
fakeClient := fake.NewClientset()
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
require.NoError(t, err)
factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace))
serviceInformer := factory.Core().V1().Services()
err = serviceInformer.Informer().SetTransform(TransformerWithOptions[*corev1.Service](
TransformWithSpecSelector(),
TransformWithSpecExternalIPs(),
TransformWithStatusLoadBalancer(),
))
require.NoError(t, err)
factory.Start(ctx.Done())
err = WaitForCacheSync(ctx, factory)
require.NoError(t, err)
got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
require.NoError(t, err)
assert.Equal(t, svc.Spec.Selector, got.Spec.Selector)
assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs)
assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress)
assert.NotEqual(t, svc.Annotations, got.Annotations)
assert.NotEqual(t, svc.Labels, got.Labels)
})
t.Run("without transformer", func(t *testing.T) {
ctx := t.Context()
svc := fakeService()
fakeClient := fake.NewClientset()
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
require.NoError(t, err)
factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace))
serviceInformer := factory.Core().V1().Services()
err = serviceInformer.Informer().GetIndexer().Add(svc)
require.NoError(t, err)
factory.Start(ctx.Done())
err = WaitForCacheSync(ctx, factory)
require.NoError(t, err)
got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
require.NoError(t, err)
// assert.Equal(t, svc.Spec.Selector, got.Spec.Selector)
// assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs)
assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress)
assert.Equal(t, svc.Annotations, got.Annotations)
assert.Equal(t, svc.Labels, got.Labels)
})
}

30
source/informers/utils.go Normal file
View File

@ -0,0 +1,30 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"crypto/sha1"
"encoding/hex"
)
// ToSHA returns the SHA1 hash of the input string as a hex string.
// Using a SHA1 hash of the label selector string (as in ToSHA(labels.Set(selector).String())) is useful:
// - It provides a consistent and compact representation of the selector.
// - It allows for efficient indexing and lookup in Kubernetes informers.
// - It avoids issues with long label selector strings that could exceed index length limits.
func ToSHA(s string) string {
h := sha1.New()
h.Write([]byte(s))
return hex.EncodeToString(h.Sum(nil))
}

View File

@ -0,0 +1,58 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)
func TestToSHA(t *testing.T) {
tests := []struct {
input string
expected string
}{
{
input: "test",
expected: "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3",
},
{
input: "",
expected: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
},
{
input: labels.Set(map[string]string{
"app": "test",
"env": "production",
}).String(),
expected: "29eda95ee609e3186afe17e3bf988a654bc5b739",
},
{
input: labels.Set(map[string]string{
"app": "test",
"env": "production",
"version": "v1",
"component": "frontend",
}).String(),
expected: "446f9bdf6ba5c7edf324a07482bcd5c3b6c6ce38",
},
}
for _, tt := range tests {
got := ToSHA(tt.input)
assert.Equal(t, tt.expected, got)
}
}

View File

@ -28,12 +28,12 @@ import (
istioclient "istio.io/client-go/pkg/clientset/versioned" istioclient "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions" istioinformers "istio.io/client-go/pkg/informers/externalversions"
networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1" networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
@ -79,35 +79,35 @@ func NewIstioGatewaySource(
// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace. // Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed // Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
istioInformerFactory := istioinformers.NewSharedInformerFactory(istioClient, 0) istioInformerFactory := istioinformers.NewSharedInformerFactory(istioClient, 0)
gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways() gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways()
// Add default resource event handlers to properly initialize informer. serviceInformer := informerFactory.Core().V1().Services()
_, _ = serviceInformer.Informer().AddEventHandler( err = serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
cache.ResourceEventHandlerFuncs{ if err != nil {
AddFunc: func(obj interface{}) { return nil, err
log.Debug("service added") }
}, err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service](
}, informers.TransformWithSpecSelector(),
) informers.TransformWithSpecExternalIPs(),
informers.TransformWithStatusLoadBalancer(),
))
if err != nil {
return nil, err
}
_, _ = gatewayInformer.Informer().AddEventHandler( // Add default resource event handlers to properly initialize informer.
cache.ResourceEventHandlerFuncs{ _, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
AddFunc: func(obj interface{}) { _, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
log.Debug("gateway added")
},
},
)
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done()) istioInformerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
return nil, err return nil, err
} }
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil { if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -30,13 +30,13 @@ import (
istioclient "istio.io/client-go/pkg/clientset/versioned" istioclient "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions" istioinformers "istio.io/client-go/pkg/informers/externalversions"
networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1" networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
@ -82,35 +82,28 @@ func NewIstioVirtualServiceSource(
// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace. // Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed // Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
istioInformerFactory := istioinformers.NewSharedInformerFactoryWithOptions(istioClient, 0, istioinformers.WithNamespace(namespace)) istioInformerFactory := istioinformers.NewSharedInformerFactoryWithOptions(istioClient, 0, istioinformers.WithNamespace(namespace))
virtualServiceInformer := istioInformerFactory.Networking().V1beta1().VirtualServices() virtualServiceInformer := istioInformerFactory.Networking().V1beta1().VirtualServices()
gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways() gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways()
serviceInformer := informerFactory.Core().V1().Services()
err = serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
if err != nil {
return nil, err
}
err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service](
informers.TransformWithSpecSelector(),
informers.TransformWithSpecExternalIPs(),
informers.TransformWithStatusLoadBalancer(),
))
if err != nil {
return nil, err
}
// Add default resource event handlers to properly initialize informer. // Add default resource event handlers to properly initialize informer.
_, _ = serviceInformer.Informer().AddEventHandler( _, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
cache.ResourceEventHandlerFuncs{ _, _ = virtualServiceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
AddFunc: func(obj interface{}) { _, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
log.Debug("service added")
},
},
)
_, _ = virtualServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("virtual service added")
},
},
)
_, _ = gatewayInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("gateway added")
},
},
)
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done()) istioInformerFactory.Start(ctx.Done())

View File

@ -139,7 +139,6 @@ func TestVirtualService(t *testing.T) {
t.Run("virtualServiceBindsToGateway", testVirtualServiceBindsToGateway) t.Run("virtualServiceBindsToGateway", testVirtualServiceBindsToGateway)
t.Run("endpointsFromVirtualServiceConfig", testEndpointsFromVirtualServiceConfig) t.Run("endpointsFromVirtualServiceConfig", testEndpointsFromVirtualServiceConfig)
t.Run("Endpoints", testVirtualServiceEndpoints) t.Run("Endpoints", testVirtualServiceEndpoints)
t.Run("gatewaySelectorMatchesService", testGatewaySelectorMatchesService)
} }
func TestNewIstioVirtualServiceSource(t *testing.T) { func TestNewIstioVirtualServiceSource(t *testing.T) {
@ -2008,38 +2007,6 @@ func testVirtualServiceEndpoints(t *testing.T) {
} }
} }
func testGatewaySelectorMatchesService(t *testing.T) {
for _, ti := range []struct {
title string
gwSelector map[string]string
lbSelector map[string]string
expected bool
}{
{
title: "gw selector matches lb selector",
gwSelector: map[string]string{"istio": "ingressgateway"},
lbSelector: map[string]string{"istio": "ingressgateway"},
expected: true,
},
{
title: "gw selector matches lb selector partially",
gwSelector: map[string]string{"istio": "ingressgateway"},
lbSelector: map[string]string{"release": "istio", "istio": "ingressgateway"},
expected: true,
},
{
title: "gw selector does not match lb selector",
gwSelector: map[string]string{"app": "mytest"},
lbSelector: map[string]string{"istio": "ingressgateway"},
expected: false,
},
} {
t.Run(ti.title, func(t *testing.T) {
require.Equal(t, ti.expected, MatchesServiceSelector(ti.gwSelector, ti.lbSelector))
})
}
}
func newTestVirtualServiceSource(loadBalancerList []fakeIngressGatewayService, ingressList []fakeIngress, gwList []fakeGatewayConfig) (*virtualServiceSource, error) { func newTestVirtualServiceSource(loadBalancerList []fakeIngressGatewayService, ingressList []fakeIngress, gwList []fakeGatewayConfig) (*virtualServiceSource, error) {
fakeKubernetesClient := fake.NewClientset() fakeKubernetesClient := fake.NewClientset()
fakeIstioClient := istiofake.NewSimpleClientset() fakeIstioClient := istiofake.NewSimpleClientset()

View File

@ -56,15 +56,3 @@ func ParseIngress(ingress string) (string, string, error) {
return namespace, name, err return namespace, name, err
} }
// MatchesServiceSelector checks if all key-value pairs in the selector map
// are present and match the corresponding key-value pairs in the svcSelector map.
// It returns true if all pairs match, otherwise it returns false.
func MatchesServiceSelector(selector, svcSelector map[string]string) bool {
for k, v := range selector {
if lbl, ok := svcSelector[k]; !ok || lbl != v {
return false
}
}
return true
}

View File

@ -102,50 +102,3 @@ func TestParseIngress(t *testing.T) {
}) })
} }
} }
func TestSelectorMatchesService(t *testing.T) {
tests := []struct {
name string
selector map[string]string
svcSelector map[string]string
expected bool
}{
{
name: "all key-value pairs match",
selector: map[string]string{"app": "nginx", "env": "prod"},
svcSelector: map[string]string{"app": "nginx", "env": "prod"},
expected: true,
},
{
name: "one key-value pair does not match",
selector: map[string]string{"app": "nginx", "env": "prod"},
svcSelector: map[string]string{"app": "nginx", "env": "dev"},
expected: false,
},
{
name: "key not present in svcSelector",
selector: map[string]string{"app": "nginx", "env": "prod"},
svcSelector: map[string]string{"app": "nginx"},
expected: false,
},
{
name: "empty selector",
selector: map[string]string{},
svcSelector: map[string]string{"app": "nginx", "env": "prod"},
expected: true,
},
{
name: "empty svcSelector",
selector: map[string]string{"app": "nginx", "env": "prod"},
svcSelector: map[string]string{},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := MatchesServiceSelector(tt.selector, tt.svcSelector)
assert.Equal(t, tt.expected, result)
})
}
}