CRD source: add event-handler support

When the --events flag is passed at startup, Source.AddEventHandler() is called
on each configured source.  Most sources provide AddEventHandler()
implementations that invoke the reconciliation loop when the configured source
changes, but the CRD source had a no-op implementation.  I.e. when a custom
resource was created, updated, or deleted, external-dns remained unware, and the
reconciliation loop would not fire until the configured interval had passed.

This change adds an informer (on the CRD specified by --crd-source-apiversion
and --crd-source-kind=DNSEndpoint), and a Source.AddEventHandler()
implementation that calls Informer.AddEventHandler().  Now when a custom
resource is created, updated, or deleted, the reconciliation loop is invoked.
This commit is contained in:
Eric R. Rath 2021-08-06 15:02:22 -07:00
parent 261bcadd7c
commit d9bf8fb329

View File

@ -19,6 +19,9 @@ package source
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"os"
"strings"
@ -44,6 +47,7 @@ type crdSource struct {
codec runtime.ParameterCodec
annotationFilter string
labelSelector labels.Selector
informer *cache.SharedInformer
}
func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error {
@ -104,17 +108,51 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS
// NewCRDSource creates a new crdSource with the given config.
func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme) (Source, error) {
return &crdSource{
sourceCrd := crdSource{
crdResource: strings.ToLower(kind) + "s",
namespace: namespace,
annotationFilter: annotationFilter,
labelSelector: labelSelector,
crdClient: crdClient,
codec: runtime.NewParameterCodec(scheme),
}, nil
}
// external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any
// missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations.
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) {
return sourceCrd.List(context.TODO(), &lo)
},
WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) {
return sourceCrd.watch(context.TODO(), &lo)
},
},
&endpoint.DNSEndpoint{},
0)
sourceCrd.informer = &informer
go informer.Run(wait.NeverStop)
return &sourceCrd, nil
}
func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) {
log.Debug("Adding event handler for CRD")
// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
informer := *cs.informer
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler()
},
UpdateFunc: func(old interface{}, new interface{}) {
handler()
},
DeleteFunc: func(obj interface{}) {
handler()
},
},
)
}
// Endpoints returns endpoint objects.
@ -189,6 +227,15 @@ func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*en
}
}
func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
return cs.crdClient.Get().
Namespace(cs.namespace).
Resource(cs.crdResource).
VersionedParams(opts, cs.codec).
Watch(ctx)
}
func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) {
result = &endpoint.DNSEndpointList{}
err = cs.crdClient.Get().