mirror of
https://github.com/kubernetes-sigs/external-dns.git
synced 2025-08-05 17:16:59 +02:00
chore(source): move cache informer to dedicated folder
Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
parent
36bc7d6bc4
commit
2b7d236734
@ -32,20 +32,22 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/informers"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
const (
|
||||
// ambHostAnnotation is the annotation in the Host that maps to a Service
|
||||
const ambHostAnnotation = "external-dns.ambassador-service"
|
||||
|
||||
ambHostAnnotation = "external-dns.ambassador-service"
|
||||
// groupName is the group name for the Ambassador API
|
||||
const groupName = "getambassador.io"
|
||||
groupName = "getambassador.io"
|
||||
)
|
||||
|
||||
var schemeGroupVersion = schema.GroupVersion{Group: groupName, Version: "v2"}
|
||||
|
||||
@ -59,7 +61,7 @@ type ambassadorHostSource struct {
|
||||
kubeClient kubernetes.Interface
|
||||
namespace string
|
||||
annotationFilter string
|
||||
ambassadorHostInformer informers.GenericInformer
|
||||
ambassadorHostInformer kubeinformers.GenericInformer
|
||||
unstructuredConverter *unstructuredConverter
|
||||
labelSelector labels.Selector
|
||||
}
|
||||
@ -91,7 +93,7 @@ func NewAmbassadorHostSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -30,13 +30,13 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/informers"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
// HTTPProxySource is an implementation of Source for ProjectContour HTTPProxy objects.
|
||||
@ -49,7 +49,7 @@ type httpProxySource struct {
|
||||
fqdnTemplate *template.Template
|
||||
combineFQDNAnnotation bool
|
||||
ignoreHostnameAnnotation bool
|
||||
httpProxyInformer informers.GenericInformer
|
||||
httpProxyInformer kubeinformers.GenericInformer
|
||||
unstructuredConverter *UnstructuredConverter
|
||||
}
|
||||
|
||||
@ -84,7 +84,7 @@ func NewContourHTTPProxySource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -113,7 +113,6 @@ func (sc *httpProxySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Convert to []*projectcontour.HTTPProxy
|
||||
var httpProxies []*projectcontour.HTTPProxy
|
||||
for _, hp := range hps {
|
||||
unstructuredHP, ok := hp.(*unstructured.Unstructured)
|
||||
|
@ -30,13 +30,15 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/informers"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
f5 "github.com/F5Networks/k8s-bigip-ctlr/v2/config/apis/cis/v1"
|
||||
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
)
|
||||
@ -50,7 +52,7 @@ var f5TransportServerGVR = schema.GroupVersionResource{
|
||||
// transportServerSource is an implementation of Source for F5 TransportServer objects.
|
||||
type f5TransportServerSource struct {
|
||||
dynamicKubeClient dynamic.Interface
|
||||
transportServerInformer informers.GenericInformer
|
||||
transportServerInformer kubeinformers.GenericInformer
|
||||
kubeClient kubernetes.Interface
|
||||
annotationFilter string
|
||||
namespace string
|
||||
@ -77,7 +79,7 @@ func NewF5TransportServerSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/informers"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
@ -40,6 +40,7 @@ import (
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
var f5VirtualServerGVR = schema.GroupVersionResource{
|
||||
@ -51,7 +52,7 @@ var f5VirtualServerGVR = schema.GroupVersionResource{
|
||||
// virtualServerSource is an implementation of Source for F5 VirtualServer objects.
|
||||
type f5VirtualServerSource struct {
|
||||
dynamicKubeClient dynamic.Interface
|
||||
virtualServerInformer informers.GenericInformer
|
||||
virtualServerInformer kubeinformers.GenericInformer
|
||||
kubeClient kubernetes.Interface
|
||||
annotationFilter string
|
||||
namespace string
|
||||
@ -78,7 +79,7 @@ func NewF5VirtualServerSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -32,16 +32,17 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
cache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
v1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
|
||||
"sigs.k8s.io/gateway-api/apis/v1beta1"
|
||||
gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
|
||||
informers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
|
||||
gwinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
|
||||
informers_v1beta1 "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1beta1"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -64,25 +65,25 @@ type gatewayRoute interface {
|
||||
RouteStatus() v1.RouteStatus
|
||||
}
|
||||
|
||||
type newGatewayRouteInformerFunc func(informers.SharedInformerFactory) gatewayRouteInformer
|
||||
type newGatewayRouteInformerFunc func(gwinformers.SharedInformerFactory) gatewayRouteInformer
|
||||
|
||||
type gatewayRouteInformer interface {
|
||||
List(namespace string, selector labels.Selector) ([]gatewayRoute, error)
|
||||
Informer() cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
func newGatewayInformerFactory(client gateway.Interface, namespace string, labelSelector labels.Selector) informers.SharedInformerFactory {
|
||||
var opts []informers.SharedInformerOption
|
||||
func newGatewayInformerFactory(client gateway.Interface, namespace string, labelSelector labels.Selector) gwinformers.SharedInformerFactory {
|
||||
var opts []gwinformers.SharedInformerOption
|
||||
if namespace != "" {
|
||||
opts = append(opts, informers.WithNamespace(namespace))
|
||||
opts = append(opts, gwinformers.WithNamespace(namespace))
|
||||
}
|
||||
if labelSelector != nil && !labelSelector.Empty() {
|
||||
lbls := labelSelector.String()
|
||||
opts = append(opts, informers.WithTweakListOptions(func(o *metav1.ListOptions) {
|
||||
opts = append(opts, gwinformers.WithTweakListOptions(func(o *metav1.ListOptions) {
|
||||
o.LabelSelector = lbls
|
||||
}))
|
||||
}
|
||||
return informers.NewSharedInformerFactoryWithOptions(client, 0, opts...)
|
||||
return gwinformers.NewSharedInformerFactoryWithOptions(client, 0, opts...)
|
||||
}
|
||||
|
||||
type gatewayRouteSource struct {
|
||||
@ -154,14 +155,14 @@ func newGatewayRouteSource(clients ClientGenerator, config *Config, kind string,
|
||||
if rtInformerFactory != informerFactory {
|
||||
rtInformerFactory.Start(wait.NeverStop)
|
||||
|
||||
if err := waitForCacheSync(ctx, rtInformerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, rtInformerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if err := waitForCacheSync(ctx, informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := waitForCacheSync(ctx, kubeInformerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, kubeInformerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
72
source/informers/informers.go
Normal file
72
source/informers/informers.go
Normal file
@ -0,0 +1,72 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package informers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultRequestTimeout = 60
|
||||
)
|
||||
|
||||
type informerFactory interface {
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
|
||||
}
|
||||
|
||||
func WaitForCacheSync(ctx context.Context, factory informerFactory) error {
|
||||
timeout := defaultRequestTimeout * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
for typ, done := range factory.WaitForCacheSync(ctx.Done()) {
|
||||
if !done {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("failed to sync %v: %w with timeout %s", typ, ctx.Err(), timeout)
|
||||
default:
|
||||
return fmt.Errorf("failed to sync %v with timeout %s", typ, timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type dynamicInformerFactory interface {
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
|
||||
}
|
||||
|
||||
func WaitForDynamicCacheSync(ctx context.Context, factory dynamicInformerFactory) error {
|
||||
timeout := defaultRequestTimeout * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
for typ, done := range factory.WaitForCacheSync(ctx.Done()) {
|
||||
if !done {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("failed to sync %v: %w with timeout %s", typ, ctx.Err(), timeout)
|
||||
default:
|
||||
return fmt.Errorf("failed to sync %v with timeout %s", typ, timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
126
source/informers/informers_test.go
Normal file
126
source/informers/informers_test.go
Normal file
@ -0,0 +1,126 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package informers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
type mockInformerFactory struct {
|
||||
syncResults map[reflect.Type]bool
|
||||
}
|
||||
|
||||
func (m *mockInformerFactory) WaitForCacheSync(_ <-chan struct{}) map[reflect.Type]bool {
|
||||
return m.syncResults
|
||||
}
|
||||
|
||||
type mockDynamicInformerFactory struct {
|
||||
syncResults map[schema.GroupVersionResource]bool
|
||||
}
|
||||
|
||||
func (m *mockDynamicInformerFactory) WaitForCacheSync(_ <-chan struct{}) map[schema.GroupVersionResource]bool {
|
||||
return m.syncResults
|
||||
}
|
||||
|
||||
func TestWaitForCacheSync(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
syncResults map[reflect.Type]bool
|
||||
expectError bool
|
||||
errorMsg string
|
||||
}{
|
||||
{
|
||||
name: "all caches synced",
|
||||
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): true},
|
||||
},
|
||||
{
|
||||
name: "some caches not synced",
|
||||
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false},
|
||||
expectError: true,
|
||||
errorMsg: "failed to sync string with timeout 1m0s",
|
||||
},
|
||||
{
|
||||
name: "context timeout",
|
||||
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false},
|
||||
expectError: true,
|
||||
errorMsg: "failed to sync string with timeout 1m0s",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
factory := &mockInformerFactory{syncResults: tt.syncResults}
|
||||
err := WaitForCacheSync(ctx, factory)
|
||||
|
||||
if tt.expectError {
|
||||
assert.Error(t, err)
|
||||
assert.Errorf(t, err, tt.errorMsg)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitForDynamicCacheSync(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
syncResults map[schema.GroupVersionResource]bool
|
||||
expectError bool
|
||||
errorMsg string
|
||||
}{
|
||||
{
|
||||
name: "all caches synced",
|
||||
syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: true},
|
||||
},
|
||||
{
|
||||
name: "some caches not synced",
|
||||
syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: false},
|
||||
expectError: true,
|
||||
errorMsg: "failed to sync string with timeout 1m0s",
|
||||
},
|
||||
{
|
||||
name: "context timeout",
|
||||
syncResults: map[schema.GroupVersionResource]bool{schema.GroupVersionResource{}: false},
|
||||
expectError: true,
|
||||
errorMsg: "failed to sync string with timeout 1m0s",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
factory := &mockDynamicInformerFactory{syncResults: tt.syncResults}
|
||||
err := WaitForDynamicCacheSync(ctx, factory)
|
||||
|
||||
if tt.expectError {
|
||||
assert.Error(t, err)
|
||||
assert.Errorf(t, err, tt.errorMsg)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -33,6 +33,8 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
@ -102,7 +104,7 @@ func NewIngressSource(ctx context.Context, kubeClient kubernetes.Interface, name
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -35,10 +35,10 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
// IstioGatewayIngressSource is the annotation used to determine if the gateway is implemented by an Ingress object
|
||||
@ -104,10 +104,10 @@ func NewIstioGatewaySource(
|
||||
istioInformerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := waitForCacheSync(context.Background(), istioInformerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,7 @@ import (
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
// IstioMeshGateway is the built in gateway for all sidecars
|
||||
@ -114,10 +115,10 @@ func NewIstioVirtualServiceSource(
|
||||
istioInformerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := waitForCacheSync(context.Background(), istioInformerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -31,13 +31,14 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/informers"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
var kongGroupdVersionResource = schema.GroupVersionResource{
|
||||
@ -51,7 +52,7 @@ type kongTCPIngressSource struct {
|
||||
annotationFilter string
|
||||
ignoreHostnameAnnotation bool
|
||||
dynamicKubeClient dynamic.Interface
|
||||
kongTCPIngressInformer informers.GenericInformer
|
||||
kongTCPIngressInformer kubeinformers.GenericInformer
|
||||
kubeClient kubernetes.Interface
|
||||
namespace string
|
||||
unstructuredConverter *unstructuredConverter
|
||||
@ -77,7 +78,7 @@ func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Inte
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
const warningMsg = "The default behavior of exposing internal IPv6 addresses will change in the next minor version. Use --no-expose-internal-ipv6 flag to opt-in to the new behavior."
|
||||
@ -70,7 +71,7 @@ func NewNodeSource(ctx context.Context, kubeClient kubernetes.Interface, annotat
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/fqdn"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
// ocpRouteSource is an implementation of Source for OpenShift Route objects.
|
||||
@ -87,7 +88,7 @@ func NewOcpRouteSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -19,8 +19,6 @@ package source
|
||||
import (
|
||||
"context"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@ -29,7 +27,9 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
type podSource struct {
|
||||
@ -64,7 +64,7 @@ func NewPodSource(ctx context.Context, kubeClient kubernetes.Interface, namespac
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -33,6 +33,8 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
@ -112,7 +114,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -18,14 +18,10 @@ package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
@ -84,43 +80,3 @@ type eventHandlerFunc func()
|
||||
func (fn eventHandlerFunc) OnAdd(obj interface{}, isInInitialList bool) { fn() }
|
||||
func (fn eventHandlerFunc) OnUpdate(oldObj, newObj interface{}) { fn() }
|
||||
func (fn eventHandlerFunc) OnDelete(obj interface{}) { fn() }
|
||||
|
||||
type informerFactory interface {
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
|
||||
}
|
||||
|
||||
func waitForCacheSync(ctx context.Context, factory informerFactory) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||
defer cancel()
|
||||
for typ, done := range factory.WaitForCacheSync(ctx.Done()) {
|
||||
if !done {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("failed to sync %v: %w", typ, ctx.Err())
|
||||
default:
|
||||
return fmt.Errorf("failed to sync %v", typ)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type dynamicInformerFactory interface {
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
|
||||
}
|
||||
|
||||
func waitForDynamicCacheSync(ctx context.Context, factory dynamicInformerFactory) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||
defer cancel()
|
||||
for typ, done := range factory.WaitForCacheSync(ctx.Done()) {
|
||||
if !done {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("failed to sync %v: %w", typ, ctx.Err())
|
||||
default:
|
||||
return fmt.Errorf("failed to sync %v", typ)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -17,58 +17,76 @@ limitations under the License.
|
||||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
type mockInformerFactory struct {
|
||||
syncResults map[reflect.Type]bool
|
||||
}
|
||||
|
||||
func (m *mockInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
|
||||
return m.syncResults
|
||||
}
|
||||
|
||||
func TestWaitForCacheSync(t *testing.T) {
|
||||
func TestGetLabelSelector(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
syncResults map[reflect.Type]bool
|
||||
annotationFilter string
|
||||
expectError bool
|
||||
expectedSelector string
|
||||
}{
|
||||
{
|
||||
name: "all caches synced",
|
||||
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): true},
|
||||
expectError: false,
|
||||
name: "Valid label selector",
|
||||
annotationFilter: "key1=value1,key2=value2",
|
||||
expectedSelector: "key1=value1,key2=value2",
|
||||
},
|
||||
{
|
||||
name: "some caches not synced",
|
||||
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false},
|
||||
expectError: true,
|
||||
name: "Invalid label selector",
|
||||
annotationFilter: "key1==value1",
|
||||
expectedSelector: "key1=value1",
|
||||
},
|
||||
{
|
||||
name: "context timeout",
|
||||
syncResults: map[reflect.Type]bool{reflect.TypeOf(""): false},
|
||||
expectError: true,
|
||||
name: "Empty label selector",
|
||||
annotationFilter: "",
|
||||
expectedSelector: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
factory := &mockInformerFactory{syncResults: tt.syncResults}
|
||||
err := waitForCacheSync(ctx, factory)
|
||||
|
||||
if tt.expectError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
selector, err := getLabelSelector(tt.annotationFilter)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
assert.Equal(t, tt.expectedSelector, selector.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatchLabelSelector(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
selector labels.Selector
|
||||
srcAnnotations map[string]string
|
||||
expectedMatch bool
|
||||
}{
|
||||
{
|
||||
name: "Matching label selector",
|
||||
selector: labels.SelectorFromSet(labels.Set{"key1": "value1"}),
|
||||
srcAnnotations: map[string]string{"key1": "value1", "key2": "value2"},
|
||||
expectedMatch: true,
|
||||
},
|
||||
{
|
||||
name: "Non-matching label selector",
|
||||
selector: labels.SelectorFromSet(labels.Set{"key1": "value1"}),
|
||||
srcAnnotations: map[string]string{"key2": "value2"},
|
||||
expectedMatch: false,
|
||||
},
|
||||
{
|
||||
name: "Empty label selector",
|
||||
selector: labels.NewSelector(),
|
||||
srcAnnotations: map[string]string{"key1": "value1"},
|
||||
expectedMatch: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := matchLabelSelector(tt.selector, tt.srcAnnotations)
|
||||
assert.Equal(t, tt.expectedMatch, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -32,13 +32,14 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/informers"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -83,12 +84,12 @@ type traefikSource struct {
|
||||
annotationFilter string
|
||||
ignoreHostnameAnnotation bool
|
||||
dynamicKubeClient dynamic.Interface
|
||||
ingressRouteInformer informers.GenericInformer
|
||||
ingressRouteTcpInformer informers.GenericInformer
|
||||
ingressRouteUdpInformer informers.GenericInformer
|
||||
oldIngressRouteInformer informers.GenericInformer
|
||||
oldIngressRouteTcpInformer informers.GenericInformer
|
||||
oldIngressRouteUdpInformer informers.GenericInformer
|
||||
ingressRouteInformer kubeinformers.GenericInformer
|
||||
ingressRouteTcpInformer kubeinformers.GenericInformer
|
||||
ingressRouteUdpInformer kubeinformers.GenericInformer
|
||||
oldIngressRouteInformer kubeinformers.GenericInformer
|
||||
oldIngressRouteTcpInformer kubeinformers.GenericInformer
|
||||
oldIngressRouteUdpInformer kubeinformers.GenericInformer
|
||||
kubeClient kubernetes.Interface
|
||||
namespace string
|
||||
unstructuredConverter *unstructuredConverter
|
||||
@ -98,8 +99,8 @@ func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface,
|
||||
// Use shared informer to listen for add/update/delete of Host in the specified namespace.
|
||||
// Set resync period to 0, to prevent processing when nothing has changed.
|
||||
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
|
||||
var ingressRouteInformer, ingressRouteTcpInformer, ingressRouteUdpInformer informers.GenericInformer
|
||||
var oldIngressRouteInformer, oldIngressRouteTcpInformer, oldIngressRouteUdpInformer informers.GenericInformer
|
||||
var ingressRouteInformer, ingressRouteTcpInformer, ingressRouteUdpInformer kubeinformers.GenericInformer
|
||||
var oldIngressRouteInformer, oldIngressRouteTcpInformer, oldIngressRouteUdpInformer kubeinformers.GenericInformer
|
||||
|
||||
// Add default resource event handlers to properly initialize informers.
|
||||
if !disableNew {
|
||||
@ -146,7 +147,7 @@ func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface,
|
||||
informerFactory.Start((ctx.Done()))
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user