From d9bf8fb32998cac65527cf0d0cd8bc5f513079db Mon Sep 17 00:00:00 2001 From: "Eric R. Rath" Date: Fri, 6 Aug 2021 15:02:22 -0700 Subject: [PATCH 1/5] CRD source: add event-handler support When the --events flag is passed at startup, Source.AddEventHandler() is called on each configured source. Most sources provide AddEventHandler() implementations that invoke the reconciliation loop when the configured source changes, but the CRD source had a no-op implementation. I.e. when a custom resource was created, updated, or deleted, external-dns remained unware, and the reconciliation loop would not fire until the configured interval had passed. This change adds an informer (on the CRD specified by --crd-source-apiversion and --crd-source-kind=DNSEndpoint), and a Source.AddEventHandler() implementation that calls Informer.AddEventHandler(). Now when a custom resource is created, updated, or deleted, the reconciliation loop is invoked. --- source/crd.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/source/crd.go b/source/crd.go index 705d770bc..b4d8cf3c6 100644 --- a/source/crd.go +++ b/source/crd.go @@ -19,6 +19,9 @@ package source import ( "context" "fmt" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" "os" "strings" @@ -44,6 +47,7 @@ type crdSource struct { codec runtime.ParameterCodec annotationFilter string labelSelector labels.Selector + informer *cache.SharedInformer } func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error { @@ -104,17 +108,51 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS // NewCRDSource creates a new crdSource with the given config. func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme) (Source, error) { - return &crdSource{ + sourceCrd := crdSource{ crdResource: strings.ToLower(kind) + "s", namespace: namespace, annotationFilter: annotationFilter, labelSelector: labelSelector, crdClient: crdClient, codec: runtime.NewParameterCodec(scheme), - }, nil + } + // external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any + // missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations. + informer := cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return sourceCrd.List(context.TODO(), &lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return sourceCrd.watch(context.TODO(), &lo) + }, + }, + &endpoint.DNSEndpoint{}, + 0) + sourceCrd.informer = &informer + go informer.Run(wait.NeverStop) + return &sourceCrd, nil } func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) { + log.Debug("Adding event handler for CRD") + + // Right now there is no way to remove event handler from informer, see: + // https://github.com/kubernetes/kubernetes/issues/79610 + informer := *cs.informer + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + handler() + }, + UpdateFunc: func(old interface{}, new interface{}) { + handler() + }, + DeleteFunc: func(obj interface{}) { + handler() + }, + }, + ) } // Endpoints returns endpoint objects. @@ -189,6 +227,15 @@ func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*en } } +func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return cs.crdClient.Get(). + Namespace(cs.namespace). + Resource(cs.crdResource). + VersionedParams(opts, cs.codec). + Watch(ctx) +} + func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) { result = &endpoint.DNSEndpointList{} err = cs.crdClient.Get(). From 5146dab6fa739be3860b30881d495ea83b77acb7 Mon Sep 17 00:00:00 2001 From: "Eric R. Rath" Date: Thu, 12 Aug 2021 12:32:15 -0700 Subject: [PATCH 2/5] ran goimports on crd.go to fix linter warning --- source/crd.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/crd.go b/source/crd.go index b4d8cf3c6..c15227592 100644 --- a/source/crd.go +++ b/source/crd.go @@ -19,11 +19,12 @@ package source import ( "context" "fmt" + "os" + "strings" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" - "os" - "strings" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" From 585d752ca4921ad02a1fd54e203672ecc1dc1a1c Mon Sep 17 00:00:00 2001 From: "Eric R. Rath" Date: Fri, 13 Aug 2021 11:45:22 -0700 Subject: [PATCH 3/5] Don't run CRD informer during tests This change disables the CRD source's informer during tests. I made the mistake of not running `make test` before the previous commit, and thus didn't realize that leaving the informer enabled during the tests introduced a race condition: WARNING: DATA RACE Write at 0x00c0005aa130 by goroutine 59: k8s.io/client-go/rest/fake.(*RESTClient).do() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/fake/fake.go:113 +0x69 k8s.io/client-go/rest/fake.(*RESTClient).do-fm() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/fake/fake.go:109 +0x64 k8s.io/client-go/rest/fake.roundTripperFunc.RoundTrip() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/fake/fake.go:43 +0x3d net/http.send() /usr/local/go/src/net/http/client.go:251 +0x6da net/http.(*Client).send() /usr/local/go/src/net/http/client.go:175 +0x1d5 net/http.(*Client).do() /usr/local/go/src/net/http/client.go:717 +0x2cb net/http.(*Client).Do() /usr/local/go/src/net/http/client.go:585 +0x68b k8s.io/client-go/rest.(*Request).request() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/request.go:855 +0x209 k8s.io/client-go/rest.(*Request).Do() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/request.go:928 +0xf0 sigs.k8s.io/external-dns/source.(*crdSource).List() /Users/erath/go/src/github.com/ericrrath/external-dns/source/crd.go:250 +0x28c sigs.k8s.io/external-dns/source.NewCRDSource.func1() /Users/erath/go/src/github.com/ericrrath/external-dns/source/crd.go:125 +0x10a k8s.io/client-go/tools/cache.(*ListWatch).List() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/cache/listwatch.go:106 +0x94 k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1.1.2() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/cache/reflector.go:233 +0xf4 k8s.io/client-go/tools/pager.SimplePageFunc.func1() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/pager/pager.go:40 +0x94 k8s.io/client-go/tools/pager.(*ListPager).List() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/pager/pager.go:91 +0x1f4 k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1.1() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/cache/reflector.go:258 +0x2b7 Previous write at 0x00c0005aa130 by goroutine 37: k8s.io/client-go/rest/fake.(*RESTClient).do() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/fake/fake.go:113 +0x69 k8s.io/client-go/rest/fake.(*RESTClient).do-fm() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/fake/fake.go:109 +0x64 k8s.io/client-go/rest/fake.roundTripperFunc.RoundTrip() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/fake/fake.go:43 +0x3d net/http.send() /usr/local/go/src/net/http/client.go:251 +0x6da net/http.(*Client).send() /usr/local/go/src/net/http/client.go:175 +0x1d5 net/http.(*Client).do() /usr/local/go/src/net/http/client.go:717 +0x2cb net/http.(*Client).Do() /usr/local/go/src/net/http/client.go:585 +0x68b k8s.io/client-go/rest.(*Request).request() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/request.go:855 +0x209 k8s.io/client-go/rest.(*Request).Do() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/rest/request.go:928 +0xf0 sigs.k8s.io/external-dns/source.(*crdSource).List() /Users/erath/go/src/github.com/ericrrath/external-dns/source/crd.go:250 +0x28c sigs.k8s.io/external-dns/source.(*crdSource).Endpoints() /Users/erath/go/src/github.com/ericrrath/external-dns/source/crd.go:171 +0x13c4 sigs.k8s.io/external-dns/source.testCRDSourceEndpoints.func1() /Users/erath/go/src/github.com/ericrrath/external-dns/source/crd_test.go:388 +0x4f6 testing.tRunner() /usr/local/go/src/testing/testing.go:1193 +0x202 Goroutine 59 (running) created at: k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/cache/reflector.go:224 +0x36f k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/cache/reflector.go:316 +0x1ab k8s.io/client-go/tools/cache.(*Reflector).Run.func1() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/cache/reflector.go:177 +0x4a k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1() /Users/erath/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:155 +0x75 k8s.io/apimachinery/pkg/util/wait.BackoffUntil() /Users/erath/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:156 +0xba k8s.io/client-go/tools/cache.(*Reflector).Run() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/cache/reflector.go:176 +0xee k8s.io/client-go/tools/cache.(*Reflector).Run-fm() /Users/erath/go/pkg/mod/k8s.io/client-go@v0.18.8/tools/cache/reflector.go:174 +0x54 k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1() /Users/erath/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:56 +0x45 k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1() /Users/erath/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:73 +0x6d Goroutine 37 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:1238 +0x5d7 sigs.k8s.io/external-dns/source.testCRDSourceEndpoints() /Users/erath/go/src/github.com/ericrrath/external-dns/source/crd_test.go:376 +0x1fcf testing.tRunner() /usr/local/go/src/testing/testing.go:1193 +0x202 It looks like client-go's fake.RESTClient (used by crd_test.go) is known to cause race conditions when used with informers: . None of the CRD tests _depend_ on the informer yet, so disabling the informer at least allows the existing tests to pass without race conditions. I'll look into further changes that 1) test the new event-handler behavior, and 2) allow all tests to pass without race conditions. --- source/crd.go | 67 ++++++++++++++++++++++++---------------------- source/crd_test.go | 2 +- source/store.go | 2 +- 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/source/crd.go b/source/crd.go index c15227592..b32b93e6e 100644 --- a/source/crd.go +++ b/source/crd.go @@ -108,7 +108,7 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS } // NewCRDSource creates a new crdSource with the given config. -func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme) (Source, error) { +func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme, startInformer bool) (Source, error) { sourceCrd := crdSource{ crdResource: strings.ToLower(kind) + "s", namespace: namespace, @@ -117,43 +117,46 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFi crdClient: crdClient, codec: runtime.NewParameterCodec(scheme), } - // external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any - // missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations. - informer := cache.NewSharedInformer( - &cache.ListWatch{ - ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { - return sourceCrd.List(context.TODO(), &lo) + if startInformer { + // external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any + // missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations. + informer := cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return sourceCrd.List(context.TODO(), &lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return sourceCrd.watch(context.TODO(), &lo) + }, }, - WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { - return sourceCrd.watch(context.TODO(), &lo) - }, - }, - &endpoint.DNSEndpoint{}, - 0) - sourceCrd.informer = &informer - go informer.Run(wait.NeverStop) + &endpoint.DNSEndpoint{}, + 0) + sourceCrd.informer = &informer + go informer.Run(wait.NeverStop) + } return &sourceCrd, nil } func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) { - log.Debug("Adding event handler for CRD") - - // Right now there is no way to remove event handler from informer, see: - // https://github.com/kubernetes/kubernetes/issues/79610 - informer := *cs.informer - informer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - handler() + if cs.informer != nil { + log.Debug("Adding event handler for CRD") + // Right now there is no way to remove event handler from informer, see: + // https://github.com/kubernetes/kubernetes/issues/79610 + informer := *cs.informer + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + handler() + }, + UpdateFunc: func(old interface{}, new interface{}) { + handler() + }, + DeleteFunc: func(obj interface{}) { + handler() + }, }, - UpdateFunc: func(old interface{}, new interface{}) { - handler() - }, - DeleteFunc: func(obj interface{}) { - handler() - }, - }, - ) + ) + } } // Endpoints returns endpoint objects. diff --git a/source/crd_test.go b/source/crd_test.go index a88fb4616..55b375f21 100644 --- a/source/crd_test.go +++ b/source/crd_test.go @@ -387,7 +387,7 @@ func testCRDSourceEndpoints(t *testing.T) { labelSelector, err := labels.Parse(ti.labelFilter) require.NoError(t, err) - cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme) + cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, false) require.NoError(t, err) receivedEndpoints, err := cs.Endpoints(context.Background()) diff --git a/source/store.go b/source/store.go index 942400aef..3f15572d0 100644 --- a/source/store.go +++ b/source/store.go @@ -270,7 +270,7 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg if err != nil { return nil, err } - return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme) + return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme, true) case "skipper-routegroup": apiServerURL := cfg.APIServerURL tokenPath := "" From 56a8d60fff0555443e965f26505070f832d4e071 Mon Sep 17 00:00:00 2001 From: "Eric R. Rath" Date: Thu, 2 Sep 2021 09:21:21 -0700 Subject: [PATCH 4/5] Review feedback njuettner suggested using a var instead of boolean literals for the startInformer arg to NewCRDSource; good idea. --- source/crd_test.go | 7 ++++++- source/store.go | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/crd_test.go b/source/crd_test.go index 55b375f21..e285a80da 100644 --- a/source/crd_test.go +++ b/source/crd_test.go @@ -387,7 +387,12 @@ func testCRDSourceEndpoints(t *testing.T) { labelSelector, err := labels.Parse(ti.labelFilter) require.NoError(t, err) - cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, false) + // At present, client-go's fake.RESTClient (used by crd_test.go) is known to cause race conditions when used + // with informers: https://github.com/kubernetes/kubernetes/issues/95372 + // So don't start the informer during testing. + startInformer := false + + cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, startInformer) require.NoError(t, err) receivedEndpoints, err := cs.Endpoints(context.Background()) diff --git a/source/store.go b/source/store.go index 3f15572d0..da7ce2a65 100644 --- a/source/store.go +++ b/source/store.go @@ -270,7 +270,8 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg if err != nil { return nil, err } - return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme, true) + startInformer := true + return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme, startInformer) case "skipper-routegroup": apiServerURL := cfg.APIServerURL tokenPath := "" From 929e618935c6d1c8a751b0d61f7fdb69fe6ef886 Mon Sep 17 00:00:00 2001 From: "Eric R. Rath" Date: Mon, 27 Feb 2023 12:34:16 -0800 Subject: [PATCH 5/5] --events controls CRD informer creation mgruener suggested that the --events flag could be wired to control whether or not the CRD source created and started its informer. This commit makes that change; good idea! --- main.go | 1 + source/store.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 3ef760edb..4347c546f 100644 --- a/main.go +++ b/main.go @@ -133,6 +133,7 @@ func main() { RequestTimeout: cfg.RequestTimeout, DefaultTargets: cfg.DefaultTargets, OCPRouterName: cfg.OCPRouterName, + UpdateEvents: cfg.UpdateEvents, } // Lookup all the selected sources by names and pass them the desired configuration. diff --git a/source/store.go b/source/store.go index da7ce2a65..9194030b2 100644 --- a/source/store.go +++ b/source/store.go @@ -69,6 +69,7 @@ type Config struct { RequestTimeout time.Duration DefaultTargets []string OCPRouterName string + UpdateEvents bool } // ClientGenerator provides clients @@ -270,8 +271,7 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg if err != nil { return nil, err } - startInformer := true - return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme, startInformer) + return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme, cfg.UpdateEvents) case "skipper-routegroup": apiServerURL := cfg.APIServerURL tokenPath := ""