diff --git a/main.go b/main.go index 5b34b7ba7..c408c37ee 100644 --- a/main.go +++ b/main.go @@ -140,6 +140,7 @@ func main() { RequestTimeout: cfg.RequestTimeout, DefaultTargets: cfg.DefaultTargets, OCPRouterName: cfg.OCPRouterName, + UpdateEvents: cfg.UpdateEvents, } // Lookup all the selected sources by names and pass them the desired configuration. diff --git a/source/crd.go b/source/crd.go index 705d770bc..b32b93e6e 100644 --- a/source/crd.go +++ b/source/crd.go @@ -22,6 +22,10 @@ import ( "os" "strings" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -44,6 +48,7 @@ type crdSource struct { codec runtime.ParameterCodec annotationFilter string labelSelector labels.Selector + informer *cache.SharedInformer } func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error { @@ -103,18 +108,55 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS } // NewCRDSource creates a new crdSource with the given config. -func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme) (Source, error) { - return &crdSource{ +func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme, startInformer bool) (Source, error) { + sourceCrd := crdSource{ crdResource: strings.ToLower(kind) + "s", namespace: namespace, annotationFilter: annotationFilter, labelSelector: labelSelector, crdClient: crdClient, codec: runtime.NewParameterCodec(scheme), - }, nil + } + if startInformer { + // external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any + // missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations. + informer := cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return sourceCrd.List(context.TODO(), &lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return sourceCrd.watch(context.TODO(), &lo) + }, + }, + &endpoint.DNSEndpoint{}, + 0) + sourceCrd.informer = &informer + go informer.Run(wait.NeverStop) + } + return &sourceCrd, nil } func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) { + if cs.informer != nil { + log.Debug("Adding event handler for CRD") + // Right now there is no way to remove event handler from informer, see: + // https://github.com/kubernetes/kubernetes/issues/79610 + informer := *cs.informer + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + handler() + }, + UpdateFunc: func(old interface{}, new interface{}) { + handler() + }, + DeleteFunc: func(obj interface{}) { + handler() + }, + }, + ) + } } // Endpoints returns endpoint objects. @@ -189,6 +231,15 @@ func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*en } } +func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return cs.crdClient.Get(). + Namespace(cs.namespace). + Resource(cs.crdResource). + VersionedParams(opts, cs.codec). + Watch(ctx) +} + func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) { result = &endpoint.DNSEndpointList{} err = cs.crdClient.Get(). diff --git a/source/crd_test.go b/source/crd_test.go index aaada1802..4f26aacfa 100644 --- a/source/crd_test.go +++ b/source/crd_test.go @@ -398,7 +398,12 @@ func testCRDSourceEndpoints(t *testing.T) { labelSelector, err := labels.Parse(ti.labelFilter) require.NoError(t, err) - cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme) + // At present, client-go's fake.RESTClient (used by crd_test.go) is known to cause race conditions when used + // with informers: https://github.com/kubernetes/kubernetes/issues/95372 + // So don't start the informer during testing. + startInformer := false + + cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, startInformer) require.NoError(t, err) receivedEndpoints, err := cs.Endpoints(context.Background()) diff --git a/source/store.go b/source/store.go index 9f5d24b00..2df28cf41 100644 --- a/source/store.go +++ b/source/store.go @@ -72,6 +72,7 @@ type Config struct { RequestTimeout time.Duration DefaultTargets []string OCPRouterName string + UpdateEvents bool } // ClientGenerator provides clients @@ -308,7 +309,7 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg if err != nil { return nil, err } - return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme) + return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme, cfg.UpdateEvents) case "skipper-routegroup": apiServerURL := cfg.APIServerURL tokenPath := ""