This commit is contained in:
Ivan Ka 2025-08-05 14:02:30 +01:00 committed by GitHub
commit 1e837315f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 503 additions and 155 deletions

View File

@ -28,6 +28,7 @@ import (
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/source/informers"
v1alpha3 "istio.io/api/networking/v1alpha3"
istiov1a "istio.io/client-go/pkg/apis/networking/v1"
@ -92,14 +93,9 @@ func svcInformerWithServices(toLookup, underTest int) (coreinformers.ServiceInfo
svcInformer := informerFactory.Core().V1().Services()
ctx := context.Background()
_, err := svcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
err := svcInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
if err != nil {
return nil, fmt.Errorf("failed to add event handler: %w", err)
return nil, fmt.Errorf("failed to add indexer: %w", err)
}
services := fixturesSvcWithLabels(toLookup, underTest)

View File

@ -16,10 +16,12 @@ package source
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
coreinformers "k8s.io/client-go/informers/core/v1"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/informers"
)
// EndpointsForHostname returns the endpoint objects for each host-target combination.
@ -84,14 +86,15 @@ func EndpointsForHostname(hostname string, targets endpoint.Targets, ttl endpoin
func EndpointTargetsFromServices(svcInformer coreinformers.ServiceInformer, namespace string, selector map[string]string) (endpoint.Targets, error) {
targets := endpoint.Targets{}
services, err := svcInformer.Lister().Services(namespace).List(labels.Everything())
services, err := svcInformer.Informer().GetIndexer().ByIndex(informers.IndexWithSpecSelector, informers.ToSHA(labels.Set(selector).String()))
if err != nil {
return nil, fmt.Errorf("failed to list labels for services in namespace %q: %w", namespace, err)
}
for _, service := range services {
if !MatchesServiceSelector(selector, service.Spec.Selector) {
for _, svc := range services {
service, ok := svc.(*corev1.Service)
if !ok {
continue
}

View File

@ -22,6 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/source/informers"
"sigs.k8s.io/external-dns/endpoint"
)
@ -247,6 +248,8 @@ func TestEndpointTargetsFromServices(t *testing.T) {
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0,
kubeinformers.WithNamespace(tt.namespace))
serviceInformer := informerFactory.Core().V1().Services()
err := serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
assert.NoError(t, err)
for _, svc := range tt.services {
_, err := client.CoreV1().Services(tt.namespace).Create(context.Background(), svc, metav1.CreateOptions{})

View File

@ -15,6 +15,9 @@ package informers
import (
"github.com/stretchr/testify/mock"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
corev1lister "k8s.io/client-go/listers/core/v1"
discoveryv1lister "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache"
@ -58,3 +61,49 @@ func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer {
func (f *FakeNodeInformer) Lister() corev1lister.NodeLister {
return corev1lister.NewNodeLister(f.Informer().GetIndexer())
}
func fakeService() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-service",
Namespace: "ns",
Labels: map[string]string{"env": "prod", "team": "devops"},
Annotations: map[string]string{"description": "Enriched service object"},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{"app": "demo"},
ExternalIPs: []string{"1.2.3.4"},
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.FromInt32(8080),
Protocol: corev1.ProtocolTCP,
},
{
Name: "https",
Port: 443,
TargetPort: intstr.FromInt32(8443),
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceTypeLoadBalancer,
},
Status: corev1.ServiceStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{
{IP: "5.6.7.8", Hostname: "lb.example.com"},
},
},
Conditions: []metav1.Condition{
{
Type: "Available",
Status: metav1.ConditionTrue,
Reason: "MinimumReplicasAvailable",
Message: "Service is available",
LastTransitionTime: metav1.Now(),
},
},
},
}
}

View File

@ -16,6 +16,7 @@ package informers
import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
@ -25,7 +26,8 @@ import (
)
const (
IndexWithSelectors = "withSelectors"
IndexWithSelectors = "withSelectors"
IndexWithSpecSelector = "spec.selector"
)
type IndexSelectorOptions struct {
@ -95,6 +97,23 @@ func IndexerWithOptions[T metav1.Object](optFns ...func(options *IndexSelectorOp
}
}
// IndexerSpecSelector returns a cache.Indexers map that indexes Kubernetes Service objects
// by a hash of their .spec.selector field. This enables efficient lookups of Services
// sharing the same selector. The function is generic over metav1.Object, but only operates
// on *corev1.Service objects at the moment. If the object is not a Service, it does not index.
func IndexerSpecSelector[T metav1.Object]() cache.Indexers {
return cache.Indexers{
IndexWithSpecSelector: func(obj interface{}) ([]string, error) {
entity, ok := obj.(*corev1.Service)
if !ok {
return nil, nil
}
key := ToSHA(labels.Set(entity.Spec.Selector).String())
return []string{key}, nil
},
}
}
// GetByKey retrieves an object of type T (metav1.Object) from the given cache.Indexer by its key.
// It returns the object and an error if the retrieval or type assertion fails.
// If the object does not exist, it returns the zero value of T and nil.

View File

@ -183,3 +183,34 @@ func TestGetByKey_TypeAssertionFailure(t *testing.T) {
assert.Contains(t, err.Error(), "object is not of type")
assert.Nil(t, result)
}
func TestIndexerSpecSelector_Service(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
svc := &corev1.Service{}
svc.Spec.Selector = map[string]string{"app": "demo", "tier": "backend"}
keys, err := indexers[IndexWithSpecSelector](svc)
assert.NoError(t, err)
expected := ToSHA(labels.Set(svc.Spec.Selector).String())
assert.Equal(t, []string{expected}, keys)
}
func TestIndexerSpecSelector_NonService(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
obj := "not-a-service"
keys, err := indexers[IndexWithSpecSelector](obj)
assert.NoError(t, err)
assert.Nil(t, keys)
}
func TestIndexerSpecSelector_EmptySelector(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
svc := &corev1.Service{}
svc.Spec.Selector = map[string]string{}
keys, err := indexers[IndexWithSpecSelector](svc)
assert.NoError(t, err)
expected := ToSHA(labels.Set(svc.Spec.Selector).String())
assert.Equal(t, []string{expected}, keys)
}

View File

@ -0,0 +1,80 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)
type TransformOptions struct {
specSelector bool
specExternalIps bool
statusLb bool
}
func TransformerWithOptions[T metav1.Object](optFns ...func(options *TransformOptions)) cache.TransformFunc {
options := TransformOptions{}
for _, fn := range optFns {
fn(&options)
}
return func(obj any) (any, error) {
// only transform if the object is a Service at the moment
entity, ok := obj.(*corev1.Service)
if !ok {
return nil, nil
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: entity.Name,
Namespace: entity.Namespace,
DeletionTimestamp: entity.DeletionTimestamp,
},
Spec: corev1.ServiceSpec{},
Status: corev1.ServiceStatus{},
}
if options.specSelector {
svc.Spec.Selector = entity.Spec.Selector
}
if options.specExternalIps {
svc.Spec.ExternalIPs = entity.Spec.ExternalIPs
}
if options.statusLb {
svc.Status.LoadBalancer = entity.Status.LoadBalancer
}
return svc, nil
}
}
// TransformWithSpecSelector enables copying the Service's .spec.selector field.
func TransformWithSpecSelector() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.specSelector = true
}
}
// TransformWithSpecExternalIPs enables copying the Service's .spec.externalIPs field.
func TransformWithSpecExternalIPs() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.specExternalIps = true
}
}
// TransformWithStatusLoadBalancer enables copying the Service's .status.loadBalancer field.
func TransformWithStatusLoadBalancer() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.statusLb = true
}
}

View File

@ -0,0 +1,176 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
)
func TestTransformerWithOptions_Service(t *testing.T) {
base := fakeService()
tests := []struct {
name string
options []func(*TransformOptions)
asserts func(any)
}{
{
name: "minimalistic object",
options: nil,
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.Empty(t, svc.UID)
assert.NotEmpty(t, svc.Name)
assert.NotEmpty(t, svc.Namespace)
},
},
{
name: "with selector",
options: []func(*TransformOptions){TransformWithSpecSelector()},
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.NotEmpty(t, svc.Spec.Selector)
assert.Empty(t, svc.Spec.ExternalIPs)
assert.Empty(t, svc.Status.LoadBalancer.Ingress)
},
},
{
name: "with selector",
options: []func(*TransformOptions){TransformWithSpecSelector()},
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.NotEmpty(t, svc.Spec.Selector)
assert.Empty(t, svc.Spec.ExternalIPs)
assert.Empty(t, svc.Status.LoadBalancer.Ingress)
},
},
{
name: "with loadBalancer",
options: []func(*TransformOptions){TransformWithStatusLoadBalancer()},
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.Empty(t, svc.Spec.Selector)
assert.Empty(t, svc.Spec.ExternalIPs)
assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress)
},
},
{
name: "all options",
options: []func(*TransformOptions){
TransformWithSpecSelector(),
TransformWithSpecExternalIPs(),
TransformWithStatusLoadBalancer(),
},
asserts: func(obj any) {
svc, ok := obj.(*corev1.Service)
assert.True(t, ok)
assert.NotEmpty(t, svc.Spec.Selector)
assert.NotEmpty(t, svc.Spec.ExternalIPs)
assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
transform := TransformerWithOptions[*corev1.Service](tt.options...)
got, err := transform(base)
require.NoError(t, err)
tt.asserts(got)
})
}
t.Run("non-service input", func(t *testing.T) {
transform := TransformerWithOptions[*corev1.Service]()
out, err := transform("not-a-service")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if out != nil {
t.Errorf("expected nil output for non-service input, got %v", out)
}
})
}
func TestTransformer_Service_WithFakeClient(t *testing.T) {
t.Run("with transformer", func(t *testing.T) {
ctx := t.Context()
svc := fakeService()
fakeClient := fake.NewClientset()
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
require.NoError(t, err)
factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace))
serviceInformer := factory.Core().V1().Services()
err = serviceInformer.Informer().SetTransform(TransformerWithOptions[*corev1.Service](
TransformWithSpecSelector(),
TransformWithSpecExternalIPs(),
TransformWithStatusLoadBalancer(),
))
require.NoError(t, err)
factory.Start(ctx.Done())
err = WaitForCacheSync(ctx, factory)
require.NoError(t, err)
got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
require.NoError(t, err)
assert.Equal(t, svc.Spec.Selector, got.Spec.Selector)
assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs)
assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress)
assert.NotEqual(t, svc.Annotations, got.Annotations)
assert.NotEqual(t, svc.Labels, got.Labels)
})
t.Run("without transformer", func(t *testing.T) {
ctx := t.Context()
svc := fakeService()
fakeClient := fake.NewClientset()
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
require.NoError(t, err)
factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace))
serviceInformer := factory.Core().V1().Services()
err = serviceInformer.Informer().GetIndexer().Add(svc)
require.NoError(t, err)
factory.Start(ctx.Done())
err = WaitForCacheSync(ctx, factory)
require.NoError(t, err)
got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
require.NoError(t, err)
// assert.Equal(t, svc.Spec.Selector, got.Spec.Selector)
// assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs)
assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress)
assert.Equal(t, svc.Annotations, got.Annotations)
assert.Equal(t, svc.Labels, got.Labels)
})
}

30
source/informers/utils.go Normal file
View File

@ -0,0 +1,30 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"crypto/sha1"
"encoding/hex"
)
// ToSHA returns the SHA1 hash of the input string as a hex string.
// Using a SHA1 hash of the label selector string (as in ToSHA(labels.Set(selector).String())) is useful:
// - It provides a consistent and compact representation of the selector.
// - It allows for efficient indexing and lookup in Kubernetes informers.
// - It avoids issues with long label selector strings that could exceed index length limits.
func ToSHA(s string) string {
h := sha1.New()
h.Write([]byte(s))
return hex.EncodeToString(h.Sum(nil))
}

View File

@ -0,0 +1,58 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)
func TestToSHA(t *testing.T) {
tests := []struct {
input string
expected string
}{
{
input: "test",
expected: "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3",
},
{
input: "",
expected: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
},
{
input: labels.Set(map[string]string{
"app": "test",
"env": "production",
}).String(),
expected: "29eda95ee609e3186afe17e3bf988a654bc5b739",
},
{
input: labels.Set(map[string]string{
"app": "test",
"env": "production",
"version": "v1",
"component": "frontend",
}).String(),
expected: "446f9bdf6ba5c7edf324a07482bcd5c3b6c6ce38",
},
}
for _, tt := range tests {
got := ToSHA(tt.input)
assert.Equal(t, tt.expected, got)
}
}

View File

@ -28,12 +28,12 @@ import (
istioclient "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions"
networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations"
@ -79,35 +79,35 @@ func NewIstioGatewaySource(
// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
istioInformerFactory := istioinformers.NewSharedInformerFactory(istioClient, 0)
gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways()
// Add default resource event handlers to properly initialize informer.
_, _ = serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("service added")
},
},
)
serviceInformer := informerFactory.Core().V1().Services()
err = serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
if err != nil {
return nil, err
}
err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service](
informers.TransformWithSpecSelector(),
informers.TransformWithSpecExternalIPs(),
informers.TransformWithStatusLoadBalancer(),
))
if err != nil {
return nil, err
}
_, _ = gatewayInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("gateway added")
},
},
)
// Add default resource event handlers to properly initialize informer.
_, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
_, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
informerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done())
// wait for the local cache to be populated.
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
return nil, err
}
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil {
return nil, err
}
@ -191,7 +191,7 @@ func (sc *gatewaySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, e
endpoints = append(endpoints, gwEndpoints...)
}
// TODO: sort on endpoint creation
// TODO: sort on endpoint creation (performance)
for _, ep := range endpoints {
sort.Sort(ep.Targets)
}
@ -241,6 +241,7 @@ func (sc *gatewaySource) targetsFromIngress(ctx context.Context, ingressStr stri
targets := make(endpoint.Targets, 0)
// TODO: should be informer as currently this is make an API call for each gateway (performance)
ingress, err := sc.kubeClient.NetworkingV1().Ingresses(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
log.Error(err)

View File

@ -1477,7 +1477,7 @@ func testGatewayEndpoints(t *testing.T) {
t.Run(ti.title, func(t *testing.T) {
t.Parallel()
fakeKubernetesClient := fake.NewSimpleClientset()
fakeKubernetesClient := fake.NewClientset()
for _, lb := range ti.lbServices {
service := lb.Service()
@ -1524,7 +1524,7 @@ func testGatewayEndpoints(t *testing.T) {
// gateway specific helper functions
func newTestGatewaySource(loadBalancerList []fakeIngressGatewayService, ingressList []fakeIngress) (*gatewaySource, error) {
fakeKubernetesClient := fake.NewSimpleClientset()
fakeKubernetesClient := fake.NewClientset()
fakeIstioClient := istiofake.NewSimpleClientset()
for _, lb := range loadBalancerList {

View File

@ -30,13 +30,13 @@ import (
istioclient "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions"
networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations"
@ -82,44 +82,37 @@ func NewIstioVirtualServiceSource(
// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
istioInformerFactory := istioinformers.NewSharedInformerFactoryWithOptions(istioClient, 0, istioinformers.WithNamespace(namespace))
virtualServiceInformer := istioInformerFactory.Networking().V1beta1().VirtualServices()
gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways()
serviceInformer := informerFactory.Core().V1().Services()
err = serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
if err != nil {
return nil, err
}
err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service](
informers.TransformWithSpecSelector(),
informers.TransformWithSpecExternalIPs(),
informers.TransformWithStatusLoadBalancer(),
))
if err != nil {
return nil, err
}
// Add default resource event handlers to properly initialize informer.
_, _ = serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("service added")
},
},
)
_, _ = virtualServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("virtual service added")
},
},
)
_, _ = gatewayInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("gateway added")
},
},
)
_, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
_, _ = virtualServiceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
_, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
informerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done())
// wait for the local cache to be populated.
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
return nil, err
}
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil {
return nil, err
}
@ -190,7 +183,7 @@ func (sc *virtualServiceSource) Endpoints(ctx context.Context) ([]*endpoint.Endp
endpoints = append(endpoints, gwEndpoints...)
}
// TODO: sort on endpoint creation
// TODO: sort on endpoint creation (performance)
for _, ep := range endpoints {
sort.Sort(ep.Targets)
}
@ -426,6 +419,7 @@ func (sc *virtualServiceSource) targetsFromIngress(ctx context.Context, ingressS
namespace = gateway.Namespace
}
// TODO: should be informer as currently this is making an API call for each gateway (performance)
ingress, err := sc.kubeClient.NetworkingV1().Ingresses(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
log.Error(err)

View File

@ -49,7 +49,7 @@ type VirtualServiceSuite struct {
}
func (suite *VirtualServiceSuite) SetupTest() {
fakeKubernetesClient := fake.NewSimpleClientset()
fakeKubernetesClient := fake.NewClientset()
fakeIstioClient := istiofake.NewSimpleClientset()
var err error
@ -139,7 +139,6 @@ func TestVirtualService(t *testing.T) {
t.Run("virtualServiceBindsToGateway", testVirtualServiceBindsToGateway)
t.Run("endpointsFromVirtualServiceConfig", testEndpointsFromVirtualServiceConfig)
t.Run("Endpoints", testVirtualServiceEndpoints)
t.Run("gatewaySelectorMatchesService", testGatewaySelectorMatchesService)
}
func TestNewIstioVirtualServiceSource(t *testing.T) {
@ -189,7 +188,7 @@ func TestNewIstioVirtualServiceSource(t *testing.T) {
_, err := NewIstioVirtualServiceSource(
context.TODO(),
fake.NewSimpleClientset(),
fake.NewClientset(),
istiofake.NewSimpleClientset(),
"",
ti.annotationFilter,
@ -2008,38 +2007,6 @@ func testVirtualServiceEndpoints(t *testing.T) {
}
}
func testGatewaySelectorMatchesService(t *testing.T) {
for _, ti := range []struct {
title string
gwSelector map[string]string
lbSelector map[string]string
expected bool
}{
{
title: "gw selector matches lb selector",
gwSelector: map[string]string{"istio": "ingressgateway"},
lbSelector: map[string]string{"istio": "ingressgateway"},
expected: true,
},
{
title: "gw selector matches lb selector partially",
gwSelector: map[string]string{"istio": "ingressgateway"},
lbSelector: map[string]string{"release": "istio", "istio": "ingressgateway"},
expected: true,
},
{
title: "gw selector does not match lb selector",
gwSelector: map[string]string{"app": "mytest"},
lbSelector: map[string]string{"istio": "ingressgateway"},
expected: false,
},
} {
t.Run(ti.title, func(t *testing.T) {
require.Equal(t, ti.expected, MatchesServiceSelector(ti.gwSelector, ti.lbSelector))
})
}
}
func newTestVirtualServiceSource(loadBalancerList []fakeIngressGatewayService, ingressList []fakeIngress, gwList []fakeGatewayConfig) (*virtualServiceSource, error) {
fakeKubernetesClient := fake.NewClientset()
fakeIstioClient := istiofake.NewSimpleClientset()

View File

@ -56,15 +56,3 @@ func ParseIngress(ingress string) (string, string, error) {
return namespace, name, err
}
// MatchesServiceSelector checks if all key-value pairs in the selector map
// are present and match the corresponding key-value pairs in the svcSelector map.
// It returns true if all pairs match, otherwise it returns false.
func MatchesServiceSelector(selector, svcSelector map[string]string) bool {
for k, v := range selector {
if lbl, ok := svcSelector[k]; !ok || lbl != v {
return false
}
}
return true
}

View File

@ -102,50 +102,3 @@ func TestParseIngress(t *testing.T) {
})
}
}
func TestSelectorMatchesService(t *testing.T) {
tests := []struct {
name string
selector map[string]string
svcSelector map[string]string
expected bool
}{
{
name: "all key-value pairs match",
selector: map[string]string{"app": "nginx", "env": "prod"},
svcSelector: map[string]string{"app": "nginx", "env": "prod"},
expected: true,
},
{
name: "one key-value pair does not match",
selector: map[string]string{"app": "nginx", "env": "prod"},
svcSelector: map[string]string{"app": "nginx", "env": "dev"},
expected: false,
},
{
name: "key not present in svcSelector",
selector: map[string]string{"app": "nginx", "env": "prod"},
svcSelector: map[string]string{"app": "nginx"},
expected: false,
},
{
name: "empty selector",
selector: map[string]string{},
svcSelector: map[string]string{"app": "nginx", "env": "prod"},
expected: true,
},
{
name: "empty svcSelector",
selector: map[string]string{"app": "nginx", "env": "prod"},
svcSelector: map[string]string{},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := MatchesServiceSelector(tt.selector, tt.svcSelector)
assert.Equal(t, tt.expected, result)
})
}
}