diff --git a/source/crd.go b/source/crd.go index 705d770bc..b4d8cf3c6 100644 --- a/source/crd.go +++ b/source/crd.go @@ -19,6 +19,9 @@ package source import ( "context" "fmt" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" "os" "strings" @@ -44,6 +47,7 @@ type crdSource struct { codec runtime.ParameterCodec annotationFilter string labelSelector labels.Selector + informer *cache.SharedInformer } func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error { @@ -104,17 +108,51 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS // NewCRDSource creates a new crdSource with the given config. func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme) (Source, error) { - return &crdSource{ + sourceCrd := crdSource{ crdResource: strings.ToLower(kind) + "s", namespace: namespace, annotationFilter: annotationFilter, labelSelector: labelSelector, crdClient: crdClient, codec: runtime.NewParameterCodec(scheme), - }, nil + } + // external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any + // missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations. + informer := cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return sourceCrd.List(context.TODO(), &lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return sourceCrd.watch(context.TODO(), &lo) + }, + }, + &endpoint.DNSEndpoint{}, + 0) + sourceCrd.informer = &informer + go informer.Run(wait.NeverStop) + return &sourceCrd, nil } func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) { + log.Debug("Adding event handler for CRD") + + // Right now there is no way to remove event handler from informer, see: + // https://github.com/kubernetes/kubernetes/issues/79610 + informer := *cs.informer + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + handler() + }, + UpdateFunc: func(old interface{}, new interface{}) { + handler() + }, + DeleteFunc: func(obj interface{}) { + handler() + }, + }, + ) } // Endpoints returns endpoint objects. @@ -189,6 +227,15 @@ func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*en } } +func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return cs.crdClient.Get(). + Namespace(cs.namespace). + Resource(cs.crdResource). + VersionedParams(opts, cs.codec). + Watch(ctx) +} + func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) { result = &endpoint.DNSEndpointList{} err = cs.crdClient.Get().