From 2b7d236734abac32f1bd7a8ef59ca07dbd77a030 Mon Sep 17 00:00:00 2001 From: ivan katliarchuk Date: Sun, 25 May 2025 13:47:16 +0100 Subject: [PATCH] chore(source): move cache informer to dedicated folder Signed-off-by: ivan katliarchuk --- source/ambassador_host.go | 18 +++-- source/contour_httpproxy.go | 11 ++- source/f5_transportserver.go | 8 +- source/f5_virtualserver.go | 7 +- source/gateway.go | 25 +++--- source/informers/informers.go | 72 +++++++++++++++++ source/informers/informers_test.go | 126 +++++++++++++++++++++++++++++ source/ingress.go | 4 +- source/istio_gateway.go | 8 +- source/istio_virtualservice.go | 5 +- source/kong_tcpingress.go | 7 +- source/node.go | 3 +- source/openshift_route.go | 3 +- source/pod.go | 6 +- source/service.go | 4 +- source/source.go | 44 ---------- source/source_test.go | 88 ++++++++++++-------- source/traefik_proxy.go | 21 ++--- 18 files changed, 323 insertions(+), 137 deletions(-) create mode 100644 source/informers/informers.go create mode 100644 source/informers/informers_test.go diff --git a/source/ambassador_host.go b/source/ambassador_host.go index 077a3f582..3de5d5bbe 100644 --- a/source/ambassador_host.go +++ b/source/ambassador_host.go @@ -32,20 +32,22 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" + "sigs.k8s.io/external-dns/source/informers" ) -// ambHostAnnotation is the annotation in the Host that maps to a Service -const ambHostAnnotation = "external-dns.ambassador-service" - -// groupName is the group name for the Ambassador API -const groupName = "getambassador.io" +const ( + // ambHostAnnotation is the annotation in the Host that maps to a Service + ambHostAnnotation = "external-dns.ambassador-service" + // groupName is the group name for the Ambassador API + groupName = "getambassador.io" +) var schemeGroupVersion = schema.GroupVersion{Group: groupName, Version: "v2"} @@ -59,7 +61,7 @@ type ambassadorHostSource struct { kubeClient kubernetes.Interface namespace string annotationFilter string - ambassadorHostInformer informers.GenericInformer + ambassadorHostInformer kubeinformers.GenericInformer unstructuredConverter *unstructuredConverter labelSelector labels.Selector } @@ -91,7 +93,7 @@ func NewAmbassadorHostSource( informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/contour_httpproxy.go b/source/contour_httpproxy.go index 7977ea840..b0a57171c 100644 --- a/source/contour_httpproxy.go +++ b/source/contour_httpproxy.go @@ -30,13 +30,13 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" - "sigs.k8s.io/external-dns/source/fqdn" - "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" + "sigs.k8s.io/external-dns/source/fqdn" + "sigs.k8s.io/external-dns/source/informers" ) // HTTPProxySource is an implementation of Source for ProjectContour HTTPProxy objects. @@ -49,7 +49,7 @@ type httpProxySource struct { fqdnTemplate *template.Template combineFQDNAnnotation bool ignoreHostnameAnnotation bool - httpProxyInformer informers.GenericInformer + httpProxyInformer kubeinformers.GenericInformer unstructuredConverter *UnstructuredConverter } @@ -84,7 +84,7 @@ func NewContourHTTPProxySource( informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil { return nil, err } @@ -113,7 +113,6 @@ func (sc *httpProxySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, return nil, err } - // Convert to []*projectcontour.HTTPProxy var httpProxies []*projectcontour.HTTPProxy for _, hp := range hps { unstructuredHP, ok := hp.(*unstructured.Unstructured) diff --git a/source/f5_transportserver.go b/source/f5_transportserver.go index c9417d5bf..70b5736e9 100644 --- a/source/f5_transportserver.go +++ b/source/f5_transportserver.go @@ -30,13 +30,15 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" f5 "github.com/F5Networks/k8s-bigip-ctlr/v2/config/apis/cis/v1" + "sigs.k8s.io/external-dns/source/informers" + "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" ) @@ -50,7 +52,7 @@ var f5TransportServerGVR = schema.GroupVersionResource{ // transportServerSource is an implementation of Source for F5 TransportServer objects. type f5TransportServerSource struct { dynamicKubeClient dynamic.Interface - transportServerInformer informers.GenericInformer + transportServerInformer kubeinformers.GenericInformer kubeClient kubernetes.Interface annotationFilter string namespace string @@ -77,7 +79,7 @@ func NewF5TransportServerSource( informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/f5_virtualserver.go b/source/f5_virtualserver.go index 9ff80b90b..febc5dd6f 100644 --- a/source/f5_virtualserver.go +++ b/source/f5_virtualserver.go @@ -31,7 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" + "sigs.k8s.io/external-dns/source/informers" ) var f5VirtualServerGVR = schema.GroupVersionResource{ @@ -51,7 +52,7 @@ var f5VirtualServerGVR = schema.GroupVersionResource{ // virtualServerSource is an implementation of Source for F5 VirtualServer objects. type f5VirtualServerSource struct { dynamicKubeClient dynamic.Interface - virtualServerInformer informers.GenericInformer + virtualServerInformer kubeinformers.GenericInformer kubeClient kubernetes.Interface annotationFilter string namespace string @@ -78,7 +79,7 @@ func NewF5VirtualServerSource( informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/gateway.go b/source/gateway.go index 48315bbe2..6b72c56f4 100644 --- a/source/gateway.go +++ b/source/gateway.go @@ -32,16 +32,17 @@ import ( "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" - cache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache" v1 "sigs.k8s.io/gateway-api/apis/v1" - v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + "sigs.k8s.io/gateway-api/apis/v1beta1" gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" - informers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions" + gwinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions" informers_v1beta1 "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1beta1" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/fqdn" + "sigs.k8s.io/external-dns/source/informers" ) const ( @@ -64,25 +65,25 @@ type gatewayRoute interface { RouteStatus() v1.RouteStatus } -type newGatewayRouteInformerFunc func(informers.SharedInformerFactory) gatewayRouteInformer +type newGatewayRouteInformerFunc func(gwinformers.SharedInformerFactory) gatewayRouteInformer type gatewayRouteInformer interface { List(namespace string, selector labels.Selector) ([]gatewayRoute, error) Informer() cache.SharedIndexInformer } -func newGatewayInformerFactory(client gateway.Interface, namespace string, labelSelector labels.Selector) informers.SharedInformerFactory { - var opts []informers.SharedInformerOption +func newGatewayInformerFactory(client gateway.Interface, namespace string, labelSelector labels.Selector) gwinformers.SharedInformerFactory { + var opts []gwinformers.SharedInformerOption if namespace != "" { - opts = append(opts, informers.WithNamespace(namespace)) + opts = append(opts, gwinformers.WithNamespace(namespace)) } if labelSelector != nil && !labelSelector.Empty() { lbls := labelSelector.String() - opts = append(opts, informers.WithTweakListOptions(func(o *metav1.ListOptions) { + opts = append(opts, gwinformers.WithTweakListOptions(func(o *metav1.ListOptions) { o.LabelSelector = lbls })) } - return informers.NewSharedInformerFactoryWithOptions(client, 0, opts...) + return gwinformers.NewSharedInformerFactoryWithOptions(client, 0, opts...) } type gatewayRouteSource struct { @@ -154,14 +155,14 @@ func newGatewayRouteSource(clients ClientGenerator, config *Config, kind string, if rtInformerFactory != informerFactory { rtInformerFactory.Start(wait.NeverStop) - if err := waitForCacheSync(ctx, rtInformerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, rtInformerFactory); err != nil { return nil, err } } - if err := waitForCacheSync(ctx, informerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil { return nil, err } - if err := waitForCacheSync(ctx, kubeInformerFactory); err != nil { + if err := informers.WaitForCacheSync(ctx, kubeInformerFactory); err != nil { return nil, err } diff --git a/source/informers/informers.go b/source/informers/informers.go new file mode 100644 index 000000000..c0fef473a --- /dev/null +++ b/source/informers/informers.go @@ -0,0 +1,72 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "context" + "fmt" + "reflect" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +const ( + defaultRequestTimeout = 60 +) + +type informerFactory interface { + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + +func WaitForCacheSync(ctx context.Context, factory informerFactory) error { + timeout := defaultRequestTimeout * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + for typ, done := range factory.WaitForCacheSync(ctx.Done()) { + if !done { + select { + case <-ctx.Done(): + return fmt.Errorf("failed to sync %v: %w with timeout %s", typ, ctx.Err(), timeout) + default: + return fmt.Errorf("failed to sync %v with timeout %s", typ, timeout) + } + } + } + return nil +} + +type dynamicInformerFactory interface { + WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool +} + +func WaitForDynamicCacheSync(ctx context.Context, factory dynamicInformerFactory) error { + timeout := defaultRequestTimeout * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + for typ, done := range factory.WaitForCacheSync(ctx.Done()) { + if !done { + select { + case <-ctx.Done(): + return fmt.Errorf("failed to sync %v: %w with timeout %s", typ, ctx.Err(), timeout) + default: + return fmt.Errorf("failed to sync %v with timeout %s", typ, timeout) + } + } + } + return nil +} diff --git a/source/informers/informers_test.go b/source/informers/informers_test.go new file mode 100644 index 000000000..2268efb38 --- /dev/null +++ b/source/informers/informers_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "context" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type mockInformerFactory struct { + syncResults map[reflect.Type]bool +} + +func (m *mockInformerFactory) WaitForCacheSync(_ <-chan struct{}) map[reflect.Type]bool { + return m.syncResults +} + +type mockDynamicInformerFactory struct { + syncResults map[schema.GroupVersionResource]bool +} + +func (m *mockDynamicInformerFactory) WaitForCacheSync(_ <-chan struct{}) map[schema.GroupVersionResource]bool { + return m.syncResults +} + +func TestWaitForCacheSync(t *testing.T) { + tests := []struct { + name string + syncResults map[reflect.Type]bool + expectError bool + errorMsg string + }{ + { + name: "all caches synced", + syncResults: map[reflect.Type]bool{reflect.TypeOf(""): true}, + }, + { + name: "some caches not synced", + syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false}, + expectError: true, + errorMsg: "failed to sync string with timeout 1m0s", + }, + { + name: "context timeout", + syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false}, + expectError: true, + errorMsg: "failed to sync string with timeout 1m0s", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + factory := &mockInformerFactory{syncResults: tt.syncResults} + err := WaitForCacheSync(ctx, factory) + + if tt.expectError { + assert.Error(t, err) + assert.Errorf(t, err, tt.errorMsg) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestWaitForDynamicCacheSync(t *testing.T) { + tests := []struct { + name string + syncResults map[schema.GroupVersionResource]bool + expectError bool + errorMsg string + }{ + { + name: "all caches synced", + syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: true}, + }, + { + name: "some caches not synced", + syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: false}, + expectError: true, + errorMsg: "failed to sync string with timeout 1m0s", + }, + { + name: "context timeout", + syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: false}, + expectError: true, + errorMsg: "failed to sync string with timeout 1m0s", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + factory := &mockDynamicInformerFactory{syncResults: tt.syncResults} + err := WaitForDynamicCacheSync(ctx, factory) + + if tt.expectError { + assert.Error(t, err) + assert.Errorf(t, err, tt.errorMsg) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/source/ingress.go b/source/ingress.go index f00c170f4..03070bc46 100644 --- a/source/ingress.go +++ b/source/ingress.go @@ -33,6 +33,8 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/external-dns/source/informers" + "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/fqdn" @@ -102,7 +104,7 @@ func NewIngressSource(ctx context.Context, kubeClient kubernetes.Interface, name informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/istio_gateway.go b/source/istio_gateway.go index ceed678c9..72f49f4f3 100644 --- a/source/istio_gateway.go +++ b/source/istio_gateway.go @@ -35,10 +35,10 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "sigs.k8s.io/external-dns/source/fqdn" - "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" + "sigs.k8s.io/external-dns/source/fqdn" + "sigs.k8s.io/external-dns/source/informers" ) // IstioGatewayIngressSource is the annotation used to determine if the gateway is implemented by an Ingress object @@ -104,10 +104,10 @@ func NewIstioGatewaySource( istioInformerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { return nil, err } - if err := waitForCacheSync(context.Background(), istioInformerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil { return nil, err } diff --git a/source/istio_virtualservice.go b/source/istio_virtualservice.go index 1e0a75067..007021b4a 100644 --- a/source/istio_virtualservice.go +++ b/source/istio_virtualservice.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/fqdn" + "sigs.k8s.io/external-dns/source/informers" ) // IstioMeshGateway is the built in gateway for all sidecars @@ -114,10 +115,10 @@ func NewIstioVirtualServiceSource( istioInformerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { return nil, err } - if err := waitForCacheSync(context.Background(), istioInformerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil { return nil, err } diff --git a/source/kong_tcpingress.go b/source/kong_tcpingress.go index f65e178e3..6bdc5886b 100644 --- a/source/kong_tcpingress.go +++ b/source/kong_tcpingress.go @@ -31,13 +31,14 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" + "sigs.k8s.io/external-dns/source/informers" ) var kongGroupdVersionResource = schema.GroupVersionResource{ @@ -51,7 +52,7 @@ type kongTCPIngressSource struct { annotationFilter string ignoreHostnameAnnotation bool dynamicKubeClient dynamic.Interface - kongTCPIngressInformer informers.GenericInformer + kongTCPIngressInformer kubeinformers.GenericInformer kubeClient kubernetes.Interface namespace string unstructuredConverter *unstructuredConverter @@ -77,7 +78,7 @@ func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Inte informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/node.go b/source/node.go index 38ecb5457..33e7ea69e 100644 --- a/source/node.go +++ b/source/node.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/fqdn" + "sigs.k8s.io/external-dns/source/informers" ) const warningMsg = "The default behavior of exposing internal IPv6 addresses will change in the next minor version. Use --no-expose-internal-ipv6 flag to opt-in to the new behavior." @@ -70,7 +71,7 @@ func NewNodeSource(ctx context.Context, kubeClient kubernetes.Interface, annotat informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/openshift_route.go b/source/openshift_route.go index 1168d054d..40e12fec8 100644 --- a/source/openshift_route.go +++ b/source/openshift_route.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/fqdn" + "sigs.k8s.io/external-dns/source/informers" ) // ocpRouteSource is an implementation of Source for OpenShift Route objects. @@ -87,7 +88,7 @@ func NewOcpRouteSource( informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/pod.go b/source/pod.go index f9326371a..22a150a3c 100644 --- a/source/pod.go +++ b/source/pod.go @@ -19,8 +19,6 @@ package source import ( "context" - "sigs.k8s.io/external-dns/endpoint" - log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -29,7 +27,9 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" + "sigs.k8s.io/external-dns/source/informers" ) type podSource struct { @@ -64,7 +64,7 @@ func NewPodSource(ctx context.Context, kubeClient kubernetes.Interface, namespac informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/service.go b/source/service.go index 96764d347..ae9cb8df8 100644 --- a/source/service.go +++ b/source/service.go @@ -33,6 +33,8 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/external-dns/source/informers" + "sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/endpoint" @@ -112,7 +114,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. - if err := waitForCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil { return nil, err } diff --git a/source/source.go b/source/source.go index a18f85ff0..aaa2d1dc1 100644 --- a/source/source.go +++ b/source/source.go @@ -18,14 +18,10 @@ package source import ( "context" - "fmt" - "reflect" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" @@ -84,43 +80,3 @@ type eventHandlerFunc func() func (fn eventHandlerFunc) OnAdd(obj interface{}, isInInitialList bool) { fn() } func (fn eventHandlerFunc) OnUpdate(oldObj, newObj interface{}) { fn() } func (fn eventHandlerFunc) OnDelete(obj interface{}) { fn() } - -type informerFactory interface { - WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool -} - -func waitForCacheSync(ctx context.Context, factory informerFactory) error { - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() - for typ, done := range factory.WaitForCacheSync(ctx.Done()) { - if !done { - select { - case <-ctx.Done(): - return fmt.Errorf("failed to sync %v: %w", typ, ctx.Err()) - default: - return fmt.Errorf("failed to sync %v", typ) - } - } - } - return nil -} - -type dynamicInformerFactory interface { - WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool -} - -func waitForDynamicCacheSync(ctx context.Context, factory dynamicInformerFactory) error { - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() - for typ, done := range factory.WaitForCacheSync(ctx.Done()) { - if !done { - select { - case <-ctx.Done(): - return fmt.Errorf("failed to sync %v: %w", typ, ctx.Err()) - default: - return fmt.Errorf("failed to sync %v", typ) - } - } - } - return nil -} diff --git a/source/source_test.go b/source/source_test.go index fcc735747..0eba3f94f 100644 --- a/source/source_test.go +++ b/source/source_test.go @@ -17,58 +17,76 @@ limitations under the License. package source import ( - "context" - "reflect" "testing" - "time" "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/labels" ) -type mockInformerFactory struct { - syncResults map[reflect.Type]bool -} - -func (m *mockInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { - return m.syncResults -} - -func TestWaitForCacheSync(t *testing.T) { +func TestGetLabelSelector(t *testing.T) { tests := []struct { - name string - syncResults map[reflect.Type]bool - expectError bool + name string + annotationFilter string + expectError bool + expectedSelector string }{ { - name: "all caches synced", - syncResults: map[reflect.Type]bool{reflect.TypeOf(""): true}, - expectError: false, + name: "Valid label selector", + annotationFilter: "key1=value1,key2=value2", + expectedSelector: "key1=value1,key2=value2", }, { - name: "some caches not synced", - syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false}, - expectError: true, + name: "Invalid label selector", + annotationFilter: "key1==value1", + expectedSelector: "key1=value1", }, { - name: "context timeout", - syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false}, - expectError: true, + name: "Empty label selector", + annotationFilter: "", + expectedSelector: "", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) - defer cancel() - - factory := &mockInformerFactory{syncResults: tt.syncResults} - err := waitForCacheSync(ctx, factory) - - if tt.expectError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } + selector, err := getLabelSelector(tt.annotationFilter) + assert.NoError(t, err) + assert.Equal(t, tt.expectedSelector, selector.String()) + }) + } +} + +func TestMatchLabelSelector(t *testing.T) { + tests := []struct { + name string + selector labels.Selector + srcAnnotations map[string]string + expectedMatch bool + }{ + { + name: "Matching label selector", + selector: labels.SelectorFromSet(labels.Set{"key1": "value1"}), + srcAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, + expectedMatch: true, + }, + { + name: "Non-matching label selector", + selector: labels.SelectorFromSet(labels.Set{"key1": "value1"}), + srcAnnotations: map[string]string{"key2": "value2"}, + expectedMatch: false, + }, + { + name: "Empty label selector", + selector: labels.NewSelector(), + srcAnnotations: map[string]string{"key1": "value1"}, + expectedMatch: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matchLabelSelector(tt.selector, tt.srcAnnotations) + assert.Equal(t, tt.expectedMatch, result) }) } } diff --git a/source/traefik_proxy.go b/source/traefik_proxy.go index 212b604c4..48fb2335d 100644 --- a/source/traefik_proxy.go +++ b/source/traefik_proxy.go @@ -32,13 +32,14 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/source/annotations" + "sigs.k8s.io/external-dns/source/informers" ) var ( @@ -83,12 +84,12 @@ type traefikSource struct { annotationFilter string ignoreHostnameAnnotation bool dynamicKubeClient dynamic.Interface - ingressRouteInformer informers.GenericInformer - ingressRouteTcpInformer informers.GenericInformer - ingressRouteUdpInformer informers.GenericInformer - oldIngressRouteInformer informers.GenericInformer - oldIngressRouteTcpInformer informers.GenericInformer - oldIngressRouteUdpInformer informers.GenericInformer + ingressRouteInformer kubeinformers.GenericInformer + ingressRouteTcpInformer kubeinformers.GenericInformer + ingressRouteUdpInformer kubeinformers.GenericInformer + oldIngressRouteInformer kubeinformers.GenericInformer + oldIngressRouteTcpInformer kubeinformers.GenericInformer + oldIngressRouteUdpInformer kubeinformers.GenericInformer kubeClient kubernetes.Interface namespace string unstructuredConverter *unstructuredConverter @@ -98,8 +99,8 @@ func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface, // Use shared informer to listen for add/update/delete of Host in the specified namespace. // Set resync period to 0, to prevent processing when nothing has changed. informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil) - var ingressRouteInformer, ingressRouteTcpInformer, ingressRouteUdpInformer informers.GenericInformer - var oldIngressRouteInformer, oldIngressRouteTcpInformer, oldIngressRouteUdpInformer informers.GenericInformer + var ingressRouteInformer, ingressRouteTcpInformer, ingressRouteUdpInformer kubeinformers.GenericInformer + var oldIngressRouteInformer, oldIngressRouteTcpInformer, oldIngressRouteUdpInformer kubeinformers.GenericInformer // Add default resource event handlers to properly initialize informers. if !disableNew { @@ -146,7 +147,7 @@ func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface, informerFactory.Start((ctx.Done())) // wait for the local cache to be populated. - if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { + if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil { return nil, err }