From d9bf8fb32998cac65527cf0d0cd8bc5f513079db Mon Sep 17 00:00:00 2001 From: "Eric R. Rath" Date: Fri, 6 Aug 2021 15:02:22 -0700 Subject: [PATCH] CRD source: add event-handler support When the --events flag is passed at startup, Source.AddEventHandler() is called on each configured source. Most sources provide AddEventHandler() implementations that invoke the reconciliation loop when the configured source changes, but the CRD source had a no-op implementation. I.e. when a custom resource was created, updated, or deleted, external-dns remained unware, and the reconciliation loop would not fire until the configured interval had passed. This change adds an informer (on the CRD specified by --crd-source-apiversion and --crd-source-kind=DNSEndpoint), and a Source.AddEventHandler() implementation that calls Informer.AddEventHandler(). Now when a custom resource is created, updated, or deleted, the reconciliation loop is invoked. --- source/crd.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/source/crd.go b/source/crd.go index 705d770bc..b4d8cf3c6 100644 --- a/source/crd.go +++ b/source/crd.go @@ -19,6 +19,9 @@ package source import ( "context" "fmt" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" "os" "strings" @@ -44,6 +47,7 @@ type crdSource struct { codec runtime.ParameterCodec annotationFilter string labelSelector labels.Selector + informer *cache.SharedInformer } func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error { @@ -104,17 +108,51 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS // NewCRDSource creates a new crdSource with the given config. func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme) (Source, error) { - return &crdSource{ + sourceCrd := crdSource{ crdResource: strings.ToLower(kind) + "s", namespace: namespace, annotationFilter: annotationFilter, labelSelector: labelSelector, crdClient: crdClient, codec: runtime.NewParameterCodec(scheme), - }, nil + } + // external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any + // missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations. + informer := cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return sourceCrd.List(context.TODO(), &lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return sourceCrd.watch(context.TODO(), &lo) + }, + }, + &endpoint.DNSEndpoint{}, + 0) + sourceCrd.informer = &informer + go informer.Run(wait.NeverStop) + return &sourceCrd, nil } func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) { + log.Debug("Adding event handler for CRD") + + // Right now there is no way to remove event handler from informer, see: + // https://github.com/kubernetes/kubernetes/issues/79610 + informer := *cs.informer + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + handler() + }, + UpdateFunc: func(old interface{}, new interface{}) { + handler() + }, + DeleteFunc: func(obj interface{}) { + handler() + }, + }, + ) } // Endpoints returns endpoint objects. @@ -189,6 +227,15 @@ func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*en } } +func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return cs.crdClient.Get(). + Namespace(cs.namespace). + Resource(cs.crdResource). + VersionedParams(opts, cs.codec). + Watch(ctx) +} + func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) { result = &endpoint.DNSEndpointList{} err = cs.crdClient.Get().