Merge pull request #5466 from gofogo/chore-informers

chore(source): move cache informer to dedicated folder
This commit is contained in:
Kubernetes Prow Robot 2025-05-28 13:18:24 -07:00 committed by GitHub
commit 60bfa0754e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 323 additions and 137 deletions

View File

@ -32,20 +32,22 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/informers"
) )
const (
// ambHostAnnotation is the annotation in the Host that maps to a Service // ambHostAnnotation is the annotation in the Host that maps to a Service
const ambHostAnnotation = "external-dns.ambassador-service" ambHostAnnotation = "external-dns.ambassador-service"
// groupName is the group name for the Ambassador API // groupName is the group name for the Ambassador API
const groupName = "getambassador.io" groupName = "getambassador.io"
)
var schemeGroupVersion = schema.GroupVersion{Group: groupName, Version: "v2"} var schemeGroupVersion = schema.GroupVersion{Group: groupName, Version: "v2"}
@ -59,7 +61,7 @@ type ambassadorHostSource struct {
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
namespace string namespace string
annotationFilter string annotationFilter string
ambassadorHostInformer informers.GenericInformer ambassadorHostInformer kubeinformers.GenericInformer
unstructuredConverter *unstructuredConverter unstructuredConverter *unstructuredConverter
labelSelector labels.Selector labelSelector labels.Selector
} }
@ -91,7 +93,7 @@ func NewAmbassadorHostSource(
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -30,13 +30,13 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/source/fqdn"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/fqdn"
"sigs.k8s.io/external-dns/source/informers"
) )
// HTTPProxySource is an implementation of Source for ProjectContour HTTPProxy objects. // HTTPProxySource is an implementation of Source for ProjectContour HTTPProxy objects.
@ -49,7 +49,7 @@ type httpProxySource struct {
fqdnTemplate *template.Template fqdnTemplate *template.Template
combineFQDNAnnotation bool combineFQDNAnnotation bool
ignoreHostnameAnnotation bool ignoreHostnameAnnotation bool
httpProxyInformer informers.GenericInformer httpProxyInformer kubeinformers.GenericInformer
unstructuredConverter *UnstructuredConverter unstructuredConverter *UnstructuredConverter
} }
@ -84,7 +84,7 @@ func NewContourHTTPProxySource(
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }
@ -113,7 +113,6 @@ func (sc *httpProxySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint,
return nil, err return nil, err
} }
// Convert to []*projectcontour.HTTPProxy
var httpProxies []*projectcontour.HTTPProxy var httpProxies []*projectcontour.HTTPProxy
for _, hp := range hps { for _, hp := range hps {
unstructuredHP, ok := hp.(*unstructured.Unstructured) unstructuredHP, ok := hp.(*unstructured.Unstructured)

View File

@ -30,13 +30,15 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
f5 "github.com/F5Networks/k8s-bigip-ctlr/v2/config/apis/cis/v1" f5 "github.com/F5Networks/k8s-bigip-ctlr/v2/config/apis/cis/v1"
"sigs.k8s.io/external-dns/source/informers"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
) )
@ -50,7 +52,7 @@ var f5TransportServerGVR = schema.GroupVersionResource{
// transportServerSource is an implementation of Source for F5 TransportServer objects. // transportServerSource is an implementation of Source for F5 TransportServer objects.
type f5TransportServerSource struct { type f5TransportServerSource struct {
dynamicKubeClient dynamic.Interface dynamicKubeClient dynamic.Interface
transportServerInformer informers.GenericInformer transportServerInformer kubeinformers.GenericInformer
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
annotationFilter string annotationFilter string
namespace string namespace string
@ -77,7 +79,7 @@ func NewF5TransportServerSource(
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -31,7 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -40,6 +40,7 @@ import (
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/informers"
) )
var f5VirtualServerGVR = schema.GroupVersionResource{ var f5VirtualServerGVR = schema.GroupVersionResource{
@ -51,7 +52,7 @@ var f5VirtualServerGVR = schema.GroupVersionResource{
// virtualServerSource is an implementation of Source for F5 VirtualServer objects. // virtualServerSource is an implementation of Source for F5 VirtualServer objects.
type f5VirtualServerSource struct { type f5VirtualServerSource struct {
dynamicKubeClient dynamic.Interface dynamicKubeClient dynamic.Interface
virtualServerInformer informers.GenericInformer virtualServerInformer kubeinformers.GenericInformer
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
annotationFilter string annotationFilter string
namespace string namespace string
@ -78,7 +79,7 @@ func NewF5VirtualServerSource(
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -32,16 +32,17 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
cache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
v1 "sigs.k8s.io/gateway-api/apis/v1" v1 "sigs.k8s.io/gateway-api/apis/v1"
v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" "sigs.k8s.io/gateway-api/apis/v1beta1"
gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
informers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions" gwinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
informers_v1beta1 "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1beta1" informers_v1beta1 "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1beta1"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/fqdn" "sigs.k8s.io/external-dns/source/fqdn"
"sigs.k8s.io/external-dns/source/informers"
) )
const ( const (
@ -64,25 +65,25 @@ type gatewayRoute interface {
RouteStatus() v1.RouteStatus RouteStatus() v1.RouteStatus
} }
type newGatewayRouteInformerFunc func(informers.SharedInformerFactory) gatewayRouteInformer type newGatewayRouteInformerFunc func(gwinformers.SharedInformerFactory) gatewayRouteInformer
type gatewayRouteInformer interface { type gatewayRouteInformer interface {
List(namespace string, selector labels.Selector) ([]gatewayRoute, error) List(namespace string, selector labels.Selector) ([]gatewayRoute, error)
Informer() cache.SharedIndexInformer Informer() cache.SharedIndexInformer
} }
func newGatewayInformerFactory(client gateway.Interface, namespace string, labelSelector labels.Selector) informers.SharedInformerFactory { func newGatewayInformerFactory(client gateway.Interface, namespace string, labelSelector labels.Selector) gwinformers.SharedInformerFactory {
var opts []informers.SharedInformerOption var opts []gwinformers.SharedInformerOption
if namespace != "" { if namespace != "" {
opts = append(opts, informers.WithNamespace(namespace)) opts = append(opts, gwinformers.WithNamespace(namespace))
} }
if labelSelector != nil && !labelSelector.Empty() { if labelSelector != nil && !labelSelector.Empty() {
lbls := labelSelector.String() lbls := labelSelector.String()
opts = append(opts, informers.WithTweakListOptions(func(o *metav1.ListOptions) { opts = append(opts, gwinformers.WithTweakListOptions(func(o *metav1.ListOptions) {
o.LabelSelector = lbls o.LabelSelector = lbls
})) }))
} }
return informers.NewSharedInformerFactoryWithOptions(client, 0, opts...) return gwinformers.NewSharedInformerFactoryWithOptions(client, 0, opts...)
} }
type gatewayRouteSource struct { type gatewayRouteSource struct {
@ -154,14 +155,14 @@ func newGatewayRouteSource(clients ClientGenerator, config *Config, kind string,
if rtInformerFactory != informerFactory { if rtInformerFactory != informerFactory {
rtInformerFactory.Start(wait.NeverStop) rtInformerFactory.Start(wait.NeverStop)
if err := waitForCacheSync(ctx, rtInformerFactory); err != nil { if err := informers.WaitForCacheSync(ctx, rtInformerFactory); err != nil {
return nil, err return nil, err
} }
} }
if err := waitForCacheSync(ctx, informerFactory); err != nil { if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
return nil, err return nil, err
} }
if err := waitForCacheSync(ctx, kubeInformerFactory); err != nil { if err := informers.WaitForCacheSync(ctx, kubeInformerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -0,0 +1,72 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"context"
"fmt"
"reflect"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const (
defaultRequestTimeout = 60
)
type informerFactory interface {
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}
func WaitForCacheSync(ctx context.Context, factory informerFactory) error {
timeout := defaultRequestTimeout * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for typ, done := range factory.WaitForCacheSync(ctx.Done()) {
if !done {
select {
case <-ctx.Done():
return fmt.Errorf("failed to sync %v: %w with timeout %s", typ, ctx.Err(), timeout)
default:
return fmt.Errorf("failed to sync %v with timeout %s", typ, timeout)
}
}
}
return nil
}
type dynamicInformerFactory interface {
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
}
func WaitForDynamicCacheSync(ctx context.Context, factory dynamicInformerFactory) error {
timeout := defaultRequestTimeout * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for typ, done := range factory.WaitForCacheSync(ctx.Done()) {
if !done {
select {
case <-ctx.Done():
return fmt.Errorf("failed to sync %v: %w with timeout %s", typ, ctx.Err(), timeout)
default:
return fmt.Errorf("failed to sync %v with timeout %s", typ, timeout)
}
}
}
return nil
}

View File

@ -0,0 +1,126 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"context"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type mockInformerFactory struct {
syncResults map[reflect.Type]bool
}
func (m *mockInformerFactory) WaitForCacheSync(_ <-chan struct{}) map[reflect.Type]bool {
return m.syncResults
}
type mockDynamicInformerFactory struct {
syncResults map[schema.GroupVersionResource]bool
}
func (m *mockDynamicInformerFactory) WaitForCacheSync(_ <-chan struct{}) map[schema.GroupVersionResource]bool {
return m.syncResults
}
func TestWaitForCacheSync(t *testing.T) {
tests := []struct {
name string
syncResults map[reflect.Type]bool
expectError bool
errorMsg string
}{
{
name: "all caches synced",
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): true},
},
{
name: "some caches not synced",
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false},
expectError: true,
errorMsg: "failed to sync string with timeout 1m0s",
},
{
name: "context timeout",
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false},
expectError: true,
errorMsg: "failed to sync string with timeout 1m0s",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
factory := &mockInformerFactory{syncResults: tt.syncResults}
err := WaitForCacheSync(ctx, factory)
if tt.expectError {
assert.Error(t, err)
assert.Errorf(t, err, tt.errorMsg)
} else {
assert.NoError(t, err)
}
})
}
}
func TestWaitForDynamicCacheSync(t *testing.T) {
tests := []struct {
name string
syncResults map[schema.GroupVersionResource]bool
expectError bool
errorMsg string
}{
{
name: "all caches synced",
syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: true},
},
{
name: "some caches not synced",
syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: false},
expectError: true,
errorMsg: "failed to sync string with timeout 1m0s",
},
{
name: "context timeout",
syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: false},
expectError: true,
errorMsg: "failed to sync string with timeout 1m0s",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
factory := &mockDynamicInformerFactory{syncResults: tt.syncResults}
err := WaitForDynamicCacheSync(ctx, factory)
if tt.expectError {
assert.Error(t, err)
assert.Errorf(t, err, tt.errorMsg)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@ -33,6 +33,8 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/source/informers"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/fqdn" "sigs.k8s.io/external-dns/source/fqdn"
@ -102,7 +104,7 @@ func NewIngressSource(ctx context.Context, kubeClient kubernetes.Interface, name
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -35,10 +35,10 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/source/fqdn"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/fqdn"
"sigs.k8s.io/external-dns/source/informers"
) )
// IstioGatewayIngressSource is the annotation used to determine if the gateway is implemented by an Ingress object // IstioGatewayIngressSource is the annotation used to determine if the gateway is implemented by an Ingress object
@ -104,10 +104,10 @@ func NewIstioGatewaySource(
istioInformerFactory.Start(ctx.Done()) istioInformerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }
if err := waitForCacheSync(context.Background(), istioInformerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -40,6 +40,7 @@ import (
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/fqdn" "sigs.k8s.io/external-dns/source/fqdn"
"sigs.k8s.io/external-dns/source/informers"
) )
// IstioMeshGateway is the built in gateway for all sidecars // IstioMeshGateway is the built in gateway for all sidecars
@ -114,10 +115,10 @@ func NewIstioVirtualServiceSource(
istioInformerFactory.Start(ctx.Done()) istioInformerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }
if err := waitForCacheSync(context.Background(), istioInformerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -31,13 +31,14 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/informers"
) )
var kongGroupdVersionResource = schema.GroupVersionResource{ var kongGroupdVersionResource = schema.GroupVersionResource{
@ -51,7 +52,7 @@ type kongTCPIngressSource struct {
annotationFilter string annotationFilter string
ignoreHostnameAnnotation bool ignoreHostnameAnnotation bool
dynamicKubeClient dynamic.Interface dynamicKubeClient dynamic.Interface
kongTCPIngressInformer informers.GenericInformer kongTCPIngressInformer kubeinformers.GenericInformer
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
namespace string namespace string
unstructuredConverter *unstructuredConverter unstructuredConverter *unstructuredConverter
@ -77,7 +78,7 @@ func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Inte
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -32,6 +32,7 @@ import (
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/fqdn" "sigs.k8s.io/external-dns/source/fqdn"
"sigs.k8s.io/external-dns/source/informers"
) )
const warningMsg = "The default behavior of exposing internal IPv6 addresses will change in the next minor version. Use --no-expose-internal-ipv6 flag to opt-in to the new behavior." const warningMsg = "The default behavior of exposing internal IPv6 addresses will change in the next minor version. Use --no-expose-internal-ipv6 flag to opt-in to the new behavior."
@ -70,7 +71,7 @@ func NewNodeSource(ctx context.Context, kubeClient kubernetes.Interface, annotat
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -35,6 +35,7 @@ import (
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/fqdn" "sigs.k8s.io/external-dns/source/fqdn"
"sigs.k8s.io/external-dns/source/informers"
) )
// ocpRouteSource is an implementation of Source for OpenShift Route objects. // ocpRouteSource is an implementation of Source for OpenShift Route objects.
@ -87,7 +88,7 @@ func NewOcpRouteSource(
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -19,8 +19,6 @@ package source
import ( import (
"context" "context"
"sigs.k8s.io/external-dns/endpoint"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -29,7 +27,9 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/informers"
) )
type podSource struct { type podSource struct {
@ -64,7 +64,7 @@ func NewPodSource(ctx context.Context, kubeClient kubernetes.Interface, namespac
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -33,6 +33,8 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/source/informers"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
@ -112,7 +114,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }

View File

@ -18,14 +18,10 @@ package source
import ( import (
"context" "context"
"fmt"
"reflect"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
@ -84,43 +80,3 @@ type eventHandlerFunc func()
func (fn eventHandlerFunc) OnAdd(obj interface{}, isInInitialList bool) { fn() } func (fn eventHandlerFunc) OnAdd(obj interface{}, isInInitialList bool) { fn() }
func (fn eventHandlerFunc) OnUpdate(oldObj, newObj interface{}) { fn() } func (fn eventHandlerFunc) OnUpdate(oldObj, newObj interface{}) { fn() }
func (fn eventHandlerFunc) OnDelete(obj interface{}) { fn() } func (fn eventHandlerFunc) OnDelete(obj interface{}) { fn() }
type informerFactory interface {
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}
func waitForCacheSync(ctx context.Context, factory informerFactory) error {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
for typ, done := range factory.WaitForCacheSync(ctx.Done()) {
if !done {
select {
case <-ctx.Done():
return fmt.Errorf("failed to sync %v: %w", typ, ctx.Err())
default:
return fmt.Errorf("failed to sync %v", typ)
}
}
}
return nil
}
type dynamicInformerFactory interface {
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
}
func waitForDynamicCacheSync(ctx context.Context, factory dynamicInformerFactory) error {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
for typ, done := range factory.WaitForCacheSync(ctx.Done()) {
if !done {
select {
case <-ctx.Done():
return fmt.Errorf("failed to sync %v: %w", typ, ctx.Err())
default:
return fmt.Errorf("failed to sync %v", typ)
}
}
}
return nil
}

View File

@ -17,58 +17,76 @@ limitations under the License.
package source package source
import ( import (
"context"
"reflect"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
) )
type mockInformerFactory struct { func TestGetLabelSelector(t *testing.T) {
syncResults map[reflect.Type]bool
}
func (m *mockInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
return m.syncResults
}
func TestWaitForCacheSync(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
syncResults map[reflect.Type]bool annotationFilter string
expectError bool expectError bool
expectedSelector string
}{ }{
{ {
name: "all caches synced", name: "Valid label selector",
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): true}, annotationFilter: "key1=value1,key2=value2",
expectError: false, expectedSelector: "key1=value1,key2=value2",
}, },
{ {
name: "some caches not synced", name: "Invalid label selector",
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false}, annotationFilter: "key1==value1",
expectError: true, expectedSelector: "key1=value1",
}, },
{ {
name: "context timeout", name: "Empty label selector",
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false}, annotationFilter: "",
expectError: true, expectedSelector: "",
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) selector, err := getLabelSelector(tt.annotationFilter)
defer cancel()
factory := &mockInformerFactory{syncResults: tt.syncResults}
err := waitForCacheSync(ctx, factory)
if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err) assert.NoError(t, err)
} assert.Equal(t, tt.expectedSelector, selector.String())
})
}
}
func TestMatchLabelSelector(t *testing.T) {
tests := []struct {
name string
selector labels.Selector
srcAnnotations map[string]string
expectedMatch bool
}{
{
name: "Matching label selector",
selector: labels.SelectorFromSet(labels.Set{"key1": "value1"}),
srcAnnotations: map[string]string{"key1": "value1", "key2": "value2"},
expectedMatch: true,
},
{
name: "Non-matching label selector",
selector: labels.SelectorFromSet(labels.Set{"key1": "value1"}),
srcAnnotations: map[string]string{"key2": "value2"},
expectedMatch: false,
},
{
name: "Empty label selector",
selector: labels.NewSelector(),
srcAnnotations: map[string]string{"key1": "value1"},
expectedMatch: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := matchLabelSelector(tt.selector, tt.srcAnnotations)
assert.Equal(t, tt.expectedMatch, result)
}) })
} }
} }

View File

@ -32,13 +32,14 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/informers"
) )
var ( var (
@ -83,12 +84,12 @@ type traefikSource struct {
annotationFilter string annotationFilter string
ignoreHostnameAnnotation bool ignoreHostnameAnnotation bool
dynamicKubeClient dynamic.Interface dynamicKubeClient dynamic.Interface
ingressRouteInformer informers.GenericInformer ingressRouteInformer kubeinformers.GenericInformer
ingressRouteTcpInformer informers.GenericInformer ingressRouteTcpInformer kubeinformers.GenericInformer
ingressRouteUdpInformer informers.GenericInformer ingressRouteUdpInformer kubeinformers.GenericInformer
oldIngressRouteInformer informers.GenericInformer oldIngressRouteInformer kubeinformers.GenericInformer
oldIngressRouteTcpInformer informers.GenericInformer oldIngressRouteTcpInformer kubeinformers.GenericInformer
oldIngressRouteUdpInformer informers.GenericInformer oldIngressRouteUdpInformer kubeinformers.GenericInformer
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
namespace string namespace string
unstructuredConverter *unstructuredConverter unstructuredConverter *unstructuredConverter
@ -98,8 +99,8 @@ func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface,
// Use shared informer to listen for add/update/delete of Host in the specified namespace. // Use shared informer to listen for add/update/delete of Host in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed. // Set resync period to 0, to prevent processing when nothing has changed.
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil) informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
var ingressRouteInformer, ingressRouteTcpInformer, ingressRouteUdpInformer informers.GenericInformer var ingressRouteInformer, ingressRouteTcpInformer, ingressRouteUdpInformer kubeinformers.GenericInformer
var oldIngressRouteInformer, oldIngressRouteTcpInformer, oldIngressRouteUdpInformer informers.GenericInformer var oldIngressRouteInformer, oldIngressRouteTcpInformer, oldIngressRouteUdpInformer kubeinformers.GenericInformer
// Add default resource event handlers to properly initialize informers. // Add default resource event handlers to properly initialize informers.
if !disableNew { if !disableNew {
@ -146,7 +147,7 @@ func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface,
informerFactory.Start((ctx.Done())) informerFactory.Start((ctx.Done()))
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err
} }