diff --git a/controller/execute.go b/controller/execute.go index 06175743a..2c61240e6 100644 --- a/controller/execute.go +++ b/controller/execute.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "k8s.io/klog/v2" + crlog "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/pkg/apis/externaldns" @@ -74,6 +75,10 @@ func Execute() { klog.SetLogger(logr.Discard()) } + // controller-runtime prints a stack trace warning if its logger is never initialized. + // external-dns uses logrus for all logging, so controller-runtime's logr output is discarded here. + crlog.SetLogger(logr.Discard()) + log.Info(externaldns.Banner()) ctx, cancel := context.WithCancel(context.Background()) diff --git a/source/crd.go b/source/crd.go index 8af13cdf9..48519a585 100644 --- a/source/crd.go +++ b/source/crd.go @@ -19,30 +19,21 @@ package source import ( "context" "fmt" - "os" "strings" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + crcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + apiv1alpha1 "sigs.k8s.io/external-dns/apis/v1alpha1" + "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/pkg/events" "sigs.k8s.io/external-dns/source/annotations" "sigs.k8s.io/external-dns/source/informers" "sigs.k8s.io/external-dns/source/types" - - log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - - apiv1alpha1 "sigs.k8s.io/external-dns/apis/v1alpha1" - "sigs.k8s.io/external-dns/endpoint" ) // crdSource is an implementation of Source that provides endpoints by listing @@ -58,147 +49,68 @@ import ( // +externaldns:source:events=true // +externaldns:source:provider-specific=true type crdSource struct { - crdClient rest.Interface - namespace string - crdResource string - codec runtime.ParameterCodec - annotationFilter string - labelSelector labels.Selector - informer cache.SharedInformer + crReader client.Reader + crWriter client.Client // status writes + informer crcache.Informer + listOpts []client.ListOption } -// NewCRDClientForAPIVersionKind return rest client for the given apiVersion and kind of the CRD -func NewCRDClientForAPIVersionKind( - client kubernetes.Interface, - cfg *Config, -) (*rest.RESTClient, *runtime.Scheme, error) { - kubeConfig := cfg.KubeConfig - if kubeConfig == "" { - if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil { - kubeConfig = clientcmd.RecommendedHomeFile - } - } - - // TODO: GetRestConfig logic is duplicated from store.go, refactor to avoid duplication - config, err := clientcmd.BuildConfigFromFlags(cfg.APIServerURL, kubeConfig) +// NewCRDSource creates a new crdSource backed by a controller-runtime cache. +// It builds the scheme, cache, and status-write client from restConfig and cfg. +func NewCRDSource(ctx context.Context, restConfig *rest.Config, cfg *Config) (Source, error) { + annotationSelector, err := annotations.ParseFilter(cfg.AnnotationFilter) if err != nil { - return nil, nil, err + return nil, err } - - groupVersion, err := schema.ParseGroupVersion(cfg.CRDSourceAPIVersion) + opts, err := buildCacheOptions(cfg.Namespace, cfg.LabelFilter, annotationSelector) if err != nil { - return nil, nil, err + return nil, err } - apiResourceList, err := client.Discovery().ServerResourcesForGroupVersion(groupVersion.String()) + + c, err := crcache.New(restConfig, opts) if err != nil { - return nil, nil, fmt.Errorf("error listing resources in GroupVersion %q: %w", groupVersion.String(), err) + return nil, err } - var crdAPIResource *metav1.APIResource - for _, apiResource := range apiResourceList.APIResources { - if apiResource.Kind == cfg.CRDSourceKind { - crdAPIResource = &apiResource - break - } - } - if crdAPIResource == nil { - return nil, nil, fmt.Errorf("unable to find Resource Kind %q in GroupVersion %q", cfg.CRDSourceKind, cfg.CRDSourceAPIVersion) - } - - scheme := runtime.NewScheme() - _ = apiv1alpha1.AddToScheme(scheme) - - config.GroupVersion = &groupVersion - config.APIPath = "/apis" - config.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)} - - crdClient, err := rest.UnversionedRESTClientFor(config) + // crWriter is used exclusively for status writes; reads come from the cache. + crWriter, err := client.New(restConfig, client.Options{Scheme: opts.Scheme}) if err != nil { - return nil, nil, err + return nil, err } - return crdClient, scheme, nil -} -// NewCRDSource creates a new crdSource with the given config. -func NewCRDSource( - crdClient rest.Interface, - cfg *Config, - scheme *runtime.Scheme) (Source, error) { - sourceCrd := crdSource{ - crdResource: strings.ToLower(cfg.CRDSourceKind) + "s", - namespace: cfg.Namespace, - annotationFilter: cfg.AnnotationFilter, - labelSelector: cfg.LabelFilter, - crdClient: crdClient, - codec: runtime.NewParameterCodec(scheme), - } - if cfg.UpdateEvents { - // external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any - // missed or dropped events are handled. specify resync period 0 to avoid unnecessary sync handler invocations. - sourceCrd.informer = cache.NewSharedInformer( - &cache.ListWatch{ - ListWithContextFunc: func(ctx context.Context, lo metav1.ListOptions) (runtime.Object, error) { - return sourceCrd.List(ctx, &lo) - }, - WatchFuncWithContext: func(ctx context.Context, lo metav1.ListOptions) (watch.Interface, error) { - return sourceCrd.watch(ctx, &lo) - }, - }, - &apiv1alpha1.DNSEndpoint{}, - 0) - go sourceCrd.informer.Run(wait.NeverStop) - } - return &sourceCrd, nil + return newCrdSource(ctx, c, crWriter, cfg.Namespace, cfg.LabelFilter) } func (cs *crdSource) AddEventHandler(_ context.Context, handler func()) { - if cs.informer != nil { - log.Debug("Adding event handler for CRD") - // Right now there is no way to remove event handler from informer, see: - // https://github.com/kubernetes/kubernetes/issues/79610 - informers.MustAddEventHandler(cs.informer, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(_ any) { - handler() - }, - UpdateFunc: func(_ any, _ any) { - handler() - }, - DeleteFunc: func(_ any) { - handler() - }, - }, - ) - } + log.Debug("crd: adding event handler") + // Right now there is no way to remove event handler from informer, see: + // https://github.com/kubernetes/kubernetes/issues/79610 + _, _ = cs.informer.AddEventHandler(eventHandlerFunc(handler)) } -// Endpoints returns endpoint objects. +// Endpoints returns endpoint objects for all DNSEndpoint resources visible to +// this source. Namespace, label, and annotation filtering are handled at the +// cache level via buildCacheOptions; target-format validation is applied here. func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) { - endpoints := []*endpoint.Endpoint{} - - var ( - result *apiv1alpha1.DNSEndpointList - err error - ) - - result, err = cs.List(ctx, &metav1.ListOptions{LabelSelector: cs.labelSelector.String()}) - if err != nil { + list := &apiv1alpha1.DNSEndpointList{} + if err := cs.crReader.List(ctx, list, cs.listOpts...); err != nil { return nil, err } - itemPtrs := make([]*apiv1alpha1.DNSEndpoint, len(result.Items)) - for i := range result.Items { - itemPtrs[i] = &result.Items[i] - } - - filtered, err := annotations.Filter(itemPtrs, cs.annotationFilter) - if err != nil { - return nil, err - } - - for _, dnsEndpoint := range filtered { + endpoints := make([]*endpoint.Endpoint, 0, len(list.Items)) + for i := range list.Items { + dnsEndpoint := &list.Items[i] var crdEndpoints []*endpoint.Endpoint for _, ep := range dnsEndpoint.Spec.Endpoints { + if ep == nil { + log.Debugf( + "Skipping nil endpoint in DNSEndpoint %s/%s at spec.endpoints", + dnsEndpoint.Namespace, + dnsEndpoint.Name, + ) + continue + } + if (ep.RecordType == endpoint.RecordTypeCNAME || ep.RecordType == endpoint.RecordTypeA || ep.RecordType == endpoint.RecordTypeAAAA) && len(ep.Targets) < 1 { log.Debugf("Endpoint %s with DNSName %s has an empty list of targets, allowing it to pass through for default-targets processing", dnsEndpoint.Name, ep.DNSName) } @@ -206,7 +118,7 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error for _, target := range ep.Targets { switch ep.RecordType { case endpoint.RecordTypeTXT, endpoint.RecordTypeMX: - continue // TXT records allow arbitrary text, skip validation; MX records can have trailing dot but it's not required, skip validation + continue // no format constraint on targets case endpoint.RecordTypeCNAME: continue // RFC 1035 §5.1: trailing dot denotes an absolute FQDN in zone file notation; both forms are valid } @@ -215,9 +127,9 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error switch ep.RecordType { case endpoint.RecordTypeNAPTR: - illegalTarget = !hasDot // Must have trailing dot + illegalTarget = !hasDot default: - illegalTarget = hasDot // Must NOT have trailing dot + illegalTarget = hasDot } if illegalTarget { @@ -235,7 +147,6 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error } ep.WithLabel(endpoint.ResourceLabelKey, fmt.Sprintf("crd/%s/%s", dnsEndpoint.Namespace, dnsEndpoint.Name)) - crdEndpoints = append(crdEndpoints, ep) } @@ -247,9 +158,7 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error } dnsEndpoint.Status.ObservedGeneration = dnsEndpoint.Generation - // Update the ObservedGeneration - _, err = cs.UpdateStatus(ctx, dnsEndpoint) - if err != nil { + if err := cs.crWriter.Status().Update(ctx, dnsEndpoint); err != nil { log.Warnf("Could not update ObservedGeneration of the CRD: %v", err) } } @@ -257,33 +166,85 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error return MergeEndpoints(endpoints), nil } -func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) { - opts.Watch = true - return cs.crdClient.Get(). - Namespace(cs.namespace). - Resource(cs.crdResource). - VersionedParams(opts, cs.codec). - Watch(ctx) +// newCrdSource wires a cache and writer into a running crdSource. +func newCrdSource( + ctx context.Context, + c crcache.Cache, + crWriter client.Client, + namespace string, + labelSelector labels.Selector) (*crdSource, error) { + inf, err := c.GetInformer(ctx, &apiv1alpha1.DNSEndpoint{}) + if err != nil { + return nil, err + } + + _, _ = inf.AddEventHandler(informers.DefaultEventHandler()) + + listOpts := []client.ListOption{client.InNamespace(namespace)} + if labelSelector != nil && !labelSelector.Empty() { + listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector}) + } + + cs := &crdSource{ + crReader: c, + crWriter: crWriter, + informer: inf, + listOpts: listOpts, + } + + if err := startAndSync(ctx, c); err != nil { + return nil, err + } + + return cs, nil } -func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (*apiv1alpha1.DNSEndpointList, error) { - result := &apiv1alpha1.DNSEndpointList{} - return result, cs.crdClient.Get(). - Namespace(cs.namespace). - Resource(cs.crdResource). - VersionedParams(opts, cs.codec). - Do(ctx). - Into(result) +// startAndSync starts the cache in a goroutine and waits for it to sync. +// Returns an error if the cache fails to start or sync. +func startAndSync(ctx context.Context, c crcache.Cache) error { + errCh := make(chan error, 1) + go func() { errCh <- c.Start(ctx) }() + if !c.WaitForCacheSync(ctx) { + select { + case err := <-errCh: + if err != nil { + return fmt.Errorf("cache failed to sync: %w", err) + } + return fmt.Errorf("cache failed to sync") + case <-ctx.Done(): + return fmt.Errorf("cache failed to sync: %w", ctx.Err()) + } + } + return nil } -func (cs *crdSource) UpdateStatus(ctx context.Context, dnsEndpoint *apiv1alpha1.DNSEndpoint) (*apiv1alpha1.DNSEndpoint, error) { - result := &apiv1alpha1.DNSEndpoint{} - return result, cs.crdClient.Put(). - Namespace(dnsEndpoint.Namespace). - Resource(cs.crdResource). - Name(dnsEndpoint.Name). - SubResource("status"). - Body(dnsEndpoint). - Do(ctx). - Into(result) +// buildCacheOptions constructs the controller-runtime cache options for the +// given namespace and label selector. Extracted so the namespace/label scoping +// logic can be unit-tested without a running API server. +func buildCacheOptions(namespace string, labelFilter, annotationSelector labels.Selector) (crcache.Options, error) { + scheme := runtime.NewScheme() + if err := apiv1alpha1.AddToScheme(scheme); err != nil { + return crcache.Options{}, err + } + + nsMap := map[string]crcache.Config{ + namespace: {}, // "" == NamespaceAll + } + byObj := crcache.ByObject{ + Namespaces: nsMap, + Transform: informers.TransformerWithOptions[*apiv1alpha1.DNSEndpoint]( + informers.TransformRemoveManagedFields(), + informers.TransformRemoveLastAppliedConfig(), + informers.TransformRequireAnnotation(annotationSelector), + ), + } + if labelFilter != nil && !labelFilter.Empty() { + byObj.Label = labelFilter + } + return crcache.Options{ + Scheme: scheme, + ByObject: map[client.Object]crcache.ByObject{ + &apiv1alpha1.DNSEndpoint{}: byObj, + }, + }, nil } diff --git a/source/crd_test.go b/source/crd_test.go index 151f7a396..9e72f0eea 100644 --- a/source/crd_test.go +++ b/source/crd_test.go @@ -17,189 +17,144 @@ limitations under the License. package source import ( - "bytes" + "context" "encoding/json" "fmt" - "io" "math/rand" "net/http" + "net/http/httptest" "strings" "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" - "k8s.io/client-go/rest/fake" - "k8s.io/client-go/tools/cache" + toolscache "k8s.io/client-go/tools/cache" cachetesting "k8s.io/client-go/tools/cache/testing" - - "sigs.k8s.io/external-dns/source/types" + crcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" log "github.com/sirupsen/logrus" - k8stypes "k8s.io/apimachinery/pkg/types" apiv1alpha1 "sigs.k8s.io/external-dns/apis/v1alpha1" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/internal/testutils" logtest "sigs.k8s.io/external-dns/internal/testutils/log" + "sigs.k8s.io/external-dns/source/types" ) -type CRDSuite struct { - suite.Suite -} +var ( + _ Source = &crdSource{} +) -func (suite *CRDSuite) SetupTest() { -} - -func defaultHeader() http.Header { - header := http.Header{} - header.Set("Content-Type", runtime.ContentTypeJSON) - return header -} - -func objBody(codec runtime.Encoder, obj runtime.Object) io.ReadCloser { - return io.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) -} - -func fakeRESTClient(endpoints []*endpoint.Endpoint, apiVersion, kind, namespace, name string, annotations map[string]string, labels map[string]string, _ *testing.T) rest.Interface { - groupVersion, _ := schema.ParseGroupVersion(apiVersion) - scheme := runtime.NewScheme() - _ = apiv1alpha1.AddToScheme(scheme) - - dnsEndpointList := apiv1alpha1.DNSEndpointList{} - dnsEndpoint := &apiv1alpha1.DNSEndpoint{ - TypeMeta: metav1.TypeMeta{ - APIVersion: apiVersion, - Kind: kind, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - Annotations: annotations, - Labels: labels, - Generation: 1, - }, - Spec: apiv1alpha1.DNSEndpointSpec{ - Endpoints: endpoints, - }, +// dnsEndpointByObj extracts the single ByObject entry for DNSEndpoint from +// cache options. The map key is a pointer so we cannot look it up directly — +// we iterate instead. +func dnsEndpointByObj(t *testing.T, opts crcache.Options) crcache.ByObject { + t.Helper() + for obj, bo := range opts.ByObject { + if _, ok := obj.(*apiv1alpha1.DNSEndpoint); ok { + return bo + } } + t.Fatal("no ByObject entry for DNSEndpoint") + panic("unreachable") +} - codecFactory := serializer.WithoutConversionCodecFactory{ - CodecFactory: serializer.NewCodecFactory(scheme), - } - client := &fake.RESTClient{ - GroupVersion: groupVersion, - VersionedAPIPath: "/apis/" + apiVersion, - NegotiatedSerializer: codecFactory, - Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { - codec := codecFactory.LegacyCodec(groupVersion) - switch p, m := req.URL.Path, req.Method; { - case p == "/apis/"+apiVersion+"/"+strings.ToLower(kind)+"s" && m == http.MethodGet: - fallthrough - case p == "/apis/"+apiVersion+"/namespaces/"+namespace+"/"+strings.ToLower(kind)+"s" && m == http.MethodGet: - dnsEndpointList.Items = dnsEndpointList.Items[:0] - dnsEndpointList.Items = append(dnsEndpointList.Items, *dnsEndpoint) - return &http.Response{StatusCode: http.StatusOK, Header: defaultHeader(), Body: objBody(codec, &dnsEndpointList)}, nil - case strings.HasPrefix(p, "/apis/"+apiVersion+"/namespaces/") && strings.HasSuffix(p, strings.ToLower(kind)+"s") && m == http.MethodGet: - return &http.Response{StatusCode: http.StatusOK, Header: defaultHeader(), Body: objBody(codec, &dnsEndpointList)}, nil - case p == "/apis/"+apiVersion+"/namespaces/"+namespace+"/"+strings.ToLower(kind)+"s/"+name+"/status" && m == http.MethodPut: - decoder := json.NewDecoder(req.Body) +func TestBuildCacheOptions(t *testing.T) { + t.Run("all namespaces when namespace is empty", func(t *testing.T) { + opts, err := buildCacheOptions("", nil, nil) + require.NoError(t, err) + byObj := dnsEndpointByObj(t, opts) + require.Contains(t, byObj.Namespaces, "", "empty string key means NamespaceAll") + require.Nil(t, byObj.Label, "no label filter expected") + }) - var body apiv1alpha1.DNSEndpoint - err := decoder.Decode(&body) - if err != nil { - return nil, err - } - dnsEndpoint.Status.ObservedGeneration = body.Status.ObservedGeneration - return &http.Response{StatusCode: http.StatusOK, Header: defaultHeader(), Body: objBody(codec, dnsEndpoint)}, nil - default: - return nil, fmt.Errorf("unexpected request: %#v\n%#v", req.URL, req) - } - }), - } + t.Run("single namespace", func(t *testing.T) { + opts, err := buildCacheOptions("my-ns", nil, nil) + require.NoError(t, err) + byObj := dnsEndpointByObj(t, opts) + require.Contains(t, byObj.Namespaces, "my-ns") + require.NotContains(t, byObj.Namespaces, "") + }) - return client + t.Run("label filter applied", func(t *testing.T) { + sel := labels.SelectorFromSet(labels.Set{"app": "foo"}) + opts, err := buildCacheOptions("", sel, nil) + require.NoError(t, err) + byObj := dnsEndpointByObj(t, opts) + require.NotNil(t, byObj.Label) + require.True(t, byObj.Label.Matches(labels.Set{"app": "foo"})) + require.False(t, byObj.Label.Matches(labels.Set{"app": "bar"})) + }) + + t.Run("empty label selector not applied", func(t *testing.T) { + opts, err := buildCacheOptions("", labels.Everything(), nil) + require.NoError(t, err) + byObj := dnsEndpointByObj(t, opts) + require.Nil(t, byObj.Label) + }) + + t.Run("transform keeps object matching annotation filter", func(t *testing.T) { + opts, err := buildCacheOptions("", nil, labels.SelectorFromSet(labels.Set{"env": "prod"})) + require.NoError(t, err) + byObj := dnsEndpointByObj(t, opts) + + obj := &apiv1alpha1.DNSEndpoint{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"env": "prod"}}} + got, err := byObj.Transform(obj) + require.NoError(t, err) + require.NotNil(t, got) + }) + + t.Run("transform drops object not matching annotation filter", func(t *testing.T) { + opts, err := buildCacheOptions("", nil, labels.SelectorFromSet(labels.Set{"env": "prod"})) + require.NoError(t, err) + byObj := dnsEndpointByObj(t, opts) + + obj := &apiv1alpha1.DNSEndpoint{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"env": "staging"}}} + got, err := byObj.Transform(obj) + require.NoError(t, err) + require.Nil(t, got) + }) } func TestCRDSource(t *testing.T) { - suite.Run(t, new(CRDSuite)) - t.Run("Interface", testCRDSourceImplementsSource) t.Run("Endpoints", testCRDSourceEndpoints) } -// testCRDSourceImplementsSource tests that crdSource is a valid Source. -func testCRDSourceImplementsSource(t *testing.T) { - require.Implements(t, (*Source)(nil), new(crdSource)) -} - // testCRDSourceEndpoints tests various scenarios of using CRD source. +// +// Namespace and label filtering are handled by the controller-runtime cache via +// ByObject at construction time — not inside Endpoints(). Tests mirror this by +// only adding objects to the fake cache that the real cache would deliver: +// objects whose namespace and labels match the source configuration. +// Annotation filtering and target validation are performed inside Endpoints() +// and are tested with objects already present in the fake cache. func testCRDSourceEndpoints(t *testing.T) { for _, ti := range []struct { - title string - registeredNamespace string - namespace string - registeredAPIVersion string - apiVersion string - registeredKind string - kind string - endpoints []*endpoint.Endpoint - expectEndpoints bool - expectError bool - annotationFilter string - labelFilter string - annotations map[string]string - labels map[string]string + title string + namespaceFilter string + objectNamespace string + endpoints []*endpoint.Endpoint + expectEndpoints bool + annotationSelector labels.Selector + labelSelector labels.Selector + annotations map[string]string + labels map[string]string }{ { - title: "invalid crd api version", - registeredAPIVersion: "test.k8s.io/v1alpha1", - apiVersion: "blah.k8s.io/v1alpha1", - registeredKind: "DNSEndpoint", - kind: "DNSEndpoint", - endpoints: []*endpoint.Endpoint{ - { - DNSName: "abc.example.org", - Targets: endpoint.Targets{"1.2.3.4"}, - RecordType: endpoint.RecordTypeA, - RecordTTL: 180, - }, - }, - expectEndpoints: false, - expectError: true, - }, - { - title: "invalid crd kind", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: "JustEndpoint", - endpoints: []*endpoint.Endpoint{ - { - DNSName: "abc.example.org", - Targets: endpoint.Targets{"1.2.3.4"}, - RecordType: endpoint.RecordTypeA, - RecordTTL: 180, - }, - }, - expectEndpoints: false, - expectError: true, - }, - { - title: "endpoints within a specific namespace", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", + title: "endpoints within a specific namespace", + namespaceFilter: "foo", + objectNamespace: "foo", endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -209,16 +164,11 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "no endpoints within a specific namespace", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "bar", + title: "no endpoints within a specific namespace", + namespaceFilter: "foo", + objectNamespace: "bar", endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -227,17 +177,11 @@ func testCRDSourceEndpoints(t *testing.T) { RecordTTL: 180, }, }, - expectEndpoints: false, - expectError: false, }, { - title: "valid crd with no targets (relies on default-targets)", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", + title: "valid crd with no targets (relies on default-targets)", + namespaceFilter: "foo", + objectNamespace: "foo", endpoints: []*endpoint.Endpoint{ { DNSName: "no-targets.example.org", @@ -247,16 +191,11 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "valid crd gvk with single endpoint", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", + title: "valid crd gvk with single endpoint", + namespaceFilter: "foo", + objectNamespace: "foo", endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -266,16 +205,11 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "valid crd gvk with multiple endpoints", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", + title: "valid crd gvk with multiple endpoints", + namespaceFilter: "foo", + objectNamespace: "foo", endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -291,18 +225,13 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "valid crd gvk with annotation and non matching annotation filter", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - annotations: map[string]string{"test": "that"}, - annotationFilter: "test=filter_something_else", + title: "valid crd gvk with annotation and non matching annotation filter", + namespaceFilter: "foo", + objectNamespace: "foo", + annotations: map[string]string{"test": "that"}, + annotationSelector: labels.SelectorFromSet(labels.Set{"test": "filter_something_else"}), endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -311,19 +240,13 @@ func testCRDSourceEndpoints(t *testing.T) { RecordTTL: 180, }, }, - expectEndpoints: false, - expectError: false, }, { - title: "valid crd gvk with annotation and matching annotation filter", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - annotations: map[string]string{"test": "that"}, - annotationFilter: "test=that", + title: "valid crd gvk with annotation and matching annotation filter", + namespaceFilter: "foo", + objectNamespace: "foo", + annotations: map[string]string{"test": "that"}, + annotationSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -333,18 +256,13 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "valid crd gvk with label and non matching label filter", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=filter_something_else", + title: "valid crd gvk with label and non matching label filter", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "filter_something_else"}), endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -353,19 +271,13 @@ func testCRDSourceEndpoints(t *testing.T) { RecordTTL: 180, }, }, - expectEndpoints: false, - expectError: false, }, { - title: "valid crd gvk with label and matching label filter", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "valid crd gvk with label and matching label filter", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -375,18 +287,13 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "Create NS record", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "Create NS record", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "abc.example.org", @@ -396,18 +303,13 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "Create SRV record", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "Create SRV record", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "_svc._tcp.example.org", @@ -417,18 +319,13 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "Create NAPTR record", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "Create NAPTR record", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "example.org", @@ -438,18 +335,13 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "CNAME target with trailing dot (RFC 1035 §5.1 absolute FQDN) is valid", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "CNAME target with trailing dot (RFC 1035 §5.1 absolute FQDN) is valid", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "example.org", @@ -461,15 +353,11 @@ func testCRDSourceEndpoints(t *testing.T) { expectEndpoints: true, }, { - title: "CNAME target without trailing dot (relative name)", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "CNAME target without trailing dot (relative name)", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "internal.example.com", @@ -481,15 +369,11 @@ func testCRDSourceEndpoints(t *testing.T) { expectEndpoints: true, }, { - title: "illegal target NAPTR", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "illegal target NAPTR", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "example.org", @@ -498,19 +382,13 @@ func testCRDSourceEndpoints(t *testing.T) { RecordTTL: 180, }, }, - expectEndpoints: false, - expectError: false, }, { - title: "valid target TXT", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "valid target TXT", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "example.org", @@ -520,18 +398,13 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "illegal target A", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "illegal target A", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "example.org", @@ -540,19 +413,13 @@ func testCRDSourceEndpoints(t *testing.T) { RecordTTL: 180, }, }, - expectEndpoints: false, - expectError: false, }, { - title: "MX Record allowing trailing dot in target", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "MX Record allowing trailing dot in target", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "example.org", @@ -562,18 +429,13 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "MX Record without trailing dot in target", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", - labels: map[string]string{"test": "that"}, - labelFilter: "test=that", + title: "MX Record without trailing dot in target", + namespaceFilter: "foo", + objectNamespace: "foo", + labels: map[string]string{"test": "that"}, + labelSelector: labels.SelectorFromSet(labels.Set{"test": "that"}), endpoints: []*endpoint.Endpoint{ { DNSName: "example.org", @@ -583,16 +445,11 @@ func testCRDSourceEndpoints(t *testing.T) { }, }, expectEndpoints: true, - expectError: false, }, { - title: "provider-specific properties are passed through from DNSEndpoint spec", - registeredAPIVersion: apiv1alpha1.GroupVersion.String(), - apiVersion: apiv1alpha1.GroupVersion.String(), - registeredKind: apiv1alpha1.DNSEndpointKind, - kind: apiv1alpha1.DNSEndpointKind, - namespace: "foo", - registeredNamespace: "foo", + title: "provider-specific properties are passed through from DNSEndpoint spec", + namespaceFilter: "bar", + objectNamespace: "bar", endpoints: []*endpoint.Endpoint{ { DNSName: "subdomain.example.org", @@ -613,50 +470,36 @@ func testCRDSourceEndpoints(t *testing.T) { t.Run(ti.title, func(t *testing.T) { t.Parallel() - restClient := fakeRESTClient(ti.endpoints, ti.registeredAPIVersion, ti.registeredKind, ti.registeredNamespace, "test", ti.annotations, ti.labels, t) - groupVersion, err := schema.ParseGroupVersion(ti.apiVersion) - require.NoError(t, err) - require.NotNil(t, groupVersion) + obj := &apiv1alpha1.DNSEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: ti.objectNamespace, + Annotations: ti.annotations, + Labels: ti.labels, + Generation: 1, + }, + Spec: apiv1alpha1.DNSEndpointSpec{ + Endpoints: ti.endpoints, + }, + } - scheme := runtime.NewScheme() - err = apiv1alpha1.AddToScheme(scheme) - require.NoError(t, err) - - labelSelector, err := labels.Parse(ti.labelFilter) - require.NoError(t, err) - - // At present, client-go's fake.RESTClient (used by crd_test.go) is known to cause race conditions when used - // with informers: https://github.com/kubernetes/kubernetes/issues/95372 - // So don't start the informer during testing. - cs, err := NewCRDSource(restClient, &Config{ - Namespace: ti.namespace, - AnnotationFilter: ti.annotationFilter, - LabelFilter: labelSelector, - CRDSourceKind: ti.kind, - UpdateEvents: false, - }, scheme) + fakeCache := newFakeCRDCache(t, nil, fakeCRDCacheFilter{ + ti.namespaceFilter, ti.labelSelector, ti.annotationSelector}, obj) + cs, err := newCrdSource(t.Context(), fakeCache, fakeCache.Client, ti.namespaceFilter, ti.labelSelector) require.NoError(t, err) receivedEndpoints, err := cs.Endpoints(t.Context()) - if ti.expectError { - require.Errorf(t, err, "Received err %v", err) - } else { - require.NoErrorf(t, err, "Received err %v", err) - } + require.NoError(t, err) - if len(receivedEndpoints) == 0 && !ti.expectEndpoints { + if !ti.expectEndpoints { + require.Empty(t, receivedEndpoints) return } - if err == nil { - validateCRDResource(t, cs, ti.expectError) - } - - // Validate received endpoints against expected endpoints. + validateCRDResource(t, fakeCache.Client, ti.objectNamespace, "test") testutils.ValidateEndpoints(t, receivedEndpoints, ti.endpoints) for _, e := range receivedEndpoints { - // TODO: at the moment not all sources apply ResourceLabelKey require.GreaterOrEqual(t, len(e.Labels), 1, "endpoint must have at least one label") require.Contains(t, e.Labels, endpoint.ResourceLabelKey, "endpoint must include the ResourceLabelKey label") } @@ -710,18 +553,17 @@ func TestCRDSourceIllegalTargetWarnings(t *testing.T) { t.Run(ti.title, func(t *testing.T) { hook := logtest.LogsUnderTestWithLogLevel(log.WarnLevel, t) - restClient := fakeRESTClient(ti.endpoints, apiv1alpha1.GroupVersion.String(), apiv1alpha1.DNSEndpointKind, "foo", "test", nil, nil, t) + obj := &apiv1alpha1.DNSEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "foo", + Generation: 1, + }, + Spec: apiv1alpha1.DNSEndpointSpec{Endpoints: ti.endpoints}, + } - scheme := runtime.NewScheme() - require.NoError(t, apiv1alpha1.AddToScheme(scheme)) - - cs, err := NewCRDSource(restClient, &Config{ - Namespace: "foo", - AnnotationFilter: "", - LabelFilter: labels.Everything(), - CRDSourceKind: apiv1alpha1.DNSEndpointKind, - UpdateEvents: false, - }, scheme) + fakeCache := newFakeCRDCache(t, nil, fakeCRDCacheFilter{}, obj) + cs, err := newCrdSource(t.Context(), fakeCache, fakeCache.Client, "", nil) require.NoError(t, err) _, err = cs.Endpoints(t.Context()) @@ -736,150 +578,119 @@ func TestCRDSourceIllegalTargetWarnings(t *testing.T) { } } -func TestCRDSource_NoInformer(t *testing.T) { - cs := &crdSource{informer: nil} - called := false +func TestCRDSource_Endpoints_ObservedGenerationUpdateFailure(t *testing.T) { + hook := logtest.LogsUnderTestWithLogLevel(log.WarnLevel, t) - cs.AddEventHandler(t.Context(), func() { called = true }) - require.False(t, called, "handler must not be called when informer is nil") -} - -func TestCRDSource_AddEventHandler_Add(t *testing.T) { - ctx := t.Context() - watcher, cs := helperCreateWatcherWithInformer(t) - - var counter atomic.Int32 - cs.AddEventHandler(ctx, func() { - counter.Add(1) - }) - - obj := &unstructured.Unstructured{} - obj.SetName("test") - - watcher.Add(obj) - - require.Eventually(t, func() bool { - return counter.Load() == 1 - }, 2*time.Second, 10*time.Millisecond) -} - -func TestCRDSource_AddEventHandler_Update(t *testing.T) { - ctx := t.Context() - watcher, cs := helperCreateWatcherWithInformer(t) - - var counter atomic.Int32 - cs.AddEventHandler(ctx, func() { - counter.Add(1) - }) - - obj := unstructured.Unstructured{} - obj.SetName("test") - obj.SetNamespace("default") - obj.SetUID("9be5b64e-3ee9-11f0-88ee-1eb95c6fd730") - - watcher.Add(&obj) - - require.Eventually(t, func() bool { - return len(watcher.Items) == 1 - }, 2*time.Second, 10*time.Millisecond) - - modified := obj.DeepCopy() - modified.SetLabels(map[string]string{"new-label": "this"}) - watcher.Modify(modified) - - require.Eventually(t, func() bool { - return len(watcher.Items) == 1 - }, 2*time.Second, 10*time.Millisecond) - - require.Eventually(t, func() bool { - return counter.Load() == 2 - }, 2*time.Second, 10*time.Millisecond) -} - -func TestCRDSource_AddEventHandler_Delete(t *testing.T) { - ctx := t.Context() - watcher, cs := helperCreateWatcherWithInformer(t) - - var counter atomic.Int32 - cs.AddEventHandler(ctx, func() { - counter.Add(1) - }) - - obj := &unstructured.Unstructured{} - obj.SetName("test") - - watcher.Delete(obj) - - require.Eventually(t, func() bool { - return counter.Load() == 1 - }, 2*time.Second, 10*time.Millisecond) -} - -func TestCRDSource_Watch(t *testing.T) { - scheme := runtime.NewScheme() - err := apiv1alpha1.AddToScheme(scheme) - require.NoError(t, err) - - var watchCalled bool - - codecFactory := serializer.WithoutConversionCodecFactory{ - CodecFactory: serializer.NewCodecFactory(scheme), + obj := &apiv1alpha1.DNSEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + Generation: 2, + }, + Status: apiv1alpha1.DNSEndpointStatus{ + ObservedGeneration: 1, // differs from Generation → update will be attempted + }, + Spec: apiv1alpha1.DNSEndpointSpec{ + Endpoints: []*endpoint.Endpoint{ + {DNSName: "example.org", Targets: endpoint.Targets{"1.2.3.4"}, RecordType: endpoint.RecordTypeA}, + }, + }, } - versionApiPath := fmt.Sprintf("/apis/%s", apiv1alpha1.GroupVersion.String()) + fakeCache := newFakeCRDCache(t, nil, fakeCRDCacheFilter{}, obj) - client := &fake.RESTClient{ - GroupVersion: apiv1alpha1.GroupVersion, - VersionedAPIPath: versionApiPath, - NegotiatedSerializer: codecFactory, - Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { - if req.URL.Path == fmt.Sprintf("%s/namespaces/test-ns/dnsendpoints", versionApiPath) && - req.URL.Query().Get("watch") == "true" { - watchCalled = true - return &http.Response{ - StatusCode: http.StatusOK, - Header: make(http.Header), - }, nil + failWriter := interceptor.NewClient(fakeCache.Client.(client.WithWatch), interceptor.Funcs{ + SubResourceUpdate: func( + _ context.Context, + _ client.Client, + subResource string, + _ client.Object, + _ ...client.SubResourceUpdateOption) error { + if subResource == "status" { + return fmt.Errorf("status update forbidden") } - t.Errorf("unexpected request: %v", req.URL) - return nil, fmt.Errorf("unexpected request: %v", req.URL) - }), - } + return nil + }, + }) - cs := &crdSource{ - crdClient: client, - namespace: "test-ns", - crdResource: "dnsendpoints", - codec: runtime.NewParameterCodec(scheme), - } - - opts := &metav1.ListOptions{} - - _, err = cs.watch(t.Context(), opts) + cs, err := newCrdSource(t.Context(), fakeCache, failWriter, "", nil) require.NoError(t, err) - require.True(t, watchCalled) - require.True(t, opts.Watch) + + endpoints, err := cs.Endpoints(t.Context()) + require.NoError(t, err, "status update failure must not propagate as an error") + require.Len(t, endpoints, 1, "endpoints must still be returned despite the update failure") + + logtest.TestHelperLogContainsWithLogLevel("Could not update ObservedGeneration", log.WarnLevel, hook, t) } -func validateCRDResource(t *testing.T, src Source, expectError bool) { - t.Helper() - cs := src.(*crdSource) - result, err := cs.List(t.Context(), &metav1.ListOptions{}) - if expectError { - require.Errorf(t, err, "Received err %v", err) - } else { - require.NoErrorf(t, err, "Received err %v", err) +func TestCRDSource_AddEventHandler(t *testing.T) { + tests := []struct { + name string + inject func(t *testing.T, watcher *cachetesting.FakeControllerSource) + wantCount int32 + }{ + { + name: "Add", + inject: func(_ *testing.T, watcher *cachetesting.FakeControllerSource) { + obj := &unstructured.Unstructured{} + obj.SetName("test") + watcher.Add(obj) + }, + wantCount: 1, + }, + { + name: "Delete", + inject: func(_ *testing.T, watcher *cachetesting.FakeControllerSource) { + obj := &unstructured.Unstructured{} + obj.SetName("test") + watcher.Delete(obj) + }, + wantCount: 1, + }, + { + name: "Update", + inject: func(t *testing.T, watcher *cachetesting.FakeControllerSource) { + obj := unstructured.Unstructured{} + obj.SetName("test") + obj.SetNamespace("default") + obj.SetUID("9be5b64e-3ee9-11f0-88ee-1eb95c6fd730") + watcher.Add(&obj) + require.Eventually(t, func() bool { + return len(watcher.Items) == 1 + }, 2*time.Second, 10*time.Millisecond) + modified := obj.DeepCopy() + modified.SetLabels(map[string]string{"new-label": "this"}) + watcher.Modify(modified) + require.Eventually(t, func() bool { + return len(watcher.Items) == 1 + }, 2*time.Second, 10*time.Millisecond) + }, + wantCount: 2, + }, } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + watcher, cs := helperCreateWatcherWithInformer(t) + var counter atomic.Int32 + cs.AddEventHandler(t.Context(), func() { counter.Add(1) }) + tc.inject(t, watcher) + require.Eventually(t, func() bool { + return counter.Load() == tc.wantCount + }, 2*time.Second, 10*time.Millisecond) + }) + } +} - for _, dnsEndpoint := range result.Items { - if dnsEndpoint.Status.ObservedGeneration != dnsEndpoint.Generation { - require.Errorf(t, err, "Unexpected CRD resource result: ObservedGenerations <%v> is not equal to Generation<%v>", dnsEndpoint.Status.ObservedGeneration, dnsEndpoint.Generation) - } - } +func validateCRDResource(t *testing.T, fakeClient client.Client, namespace, name string) { + t.Helper() + updated := &apiv1alpha1.DNSEndpoint{} + err := fakeClient.Get(t.Context(), client.ObjectKey{Namespace: namespace, Name: name}, updated) + require.NoError(t, err) + require.Equal(t, updated.Generation, updated.Status.ObservedGeneration, + "ObservedGeneration should be updated to match Generation after Endpoints() is called") } func TestDNSEndpointsWithSetResourceLabels(t *testing.T) { - typeCounts := map[string]int{ endpoint.RecordTypeA: 3, endpoint.RecordTypeCNAME: 2, @@ -891,40 +702,15 @@ func TestDNSEndpointsWithSetResourceLabels(t *testing.T) { for _, crd := range crds.Items { for _, ep := range crd.Spec.Endpoints { - require.Empty(t, ep.Labels, "endpoint not have labels set") + require.Empty(t, ep.Labels, "endpoint should not have labels set") require.NotContains(t, ep.Labels, endpoint.ResourceLabelKey, "endpoint must not include the ResourceLabelKey label") } } - scheme := runtime.NewScheme() - err := apiv1alpha1.AddToScheme(scheme) + fakeCache := newFakeCRDCache(t, nil, fakeCRDCacheFilter{}, dnsEndpointListToObjects(crds.Items)...) + cs, err := newCrdSource(t.Context(), fakeCache, fakeCache.Client, "", nil) require.NoError(t, err) - codecFactory := serializer.WithoutConversionCodecFactory{ - CodecFactory: serializer.NewCodecFactory(scheme), - } - - client := &fake.RESTClient{ - GroupVersion: apiv1alpha1.GroupVersion, - VersionedAPIPath: fmt.Sprintf("/apis/%s", apiv1alpha1.GroupVersion.String()), - NegotiatedSerializer: codecFactory, - Client: fake.CreateHTTPClient(func(_ *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: make(http.Header), - Body: objBody(codecFactory.LegacyCodec(apiv1alpha1.GroupVersion), &crds), - }, nil - }), - } - - cs := &crdSource{ - crdClient: client, - namespace: "test-ns", - crdResource: "dnsendpoints", - codec: runtime.NewParameterCodec(scheme), - labelSelector: labels.Everything(), - } - res, err := cs.Endpoints(t.Context()) require.NoError(t, err) @@ -941,60 +727,39 @@ func TestProcessEndpoint_CRD_RefObjectExist(t *testing.T) { elements := generateTestFixtureDNSEndpointsByType("test-ns", typeCounts) - scheme := runtime.NewScheme() - err := apiv1alpha1.AddToScheme(scheme) + fakeCache := newFakeCRDCache(t, nil, fakeCRDCacheFilter{}, dnsEndpointListToObjects(elements.Items)...) + cs, err := newCrdSource(t.Context(), fakeCache, fakeCache.Client, "", nil) require.NoError(t, err) - codecFactory := serializer.WithoutConversionCodecFactory{ - CodecFactory: serializer.NewCodecFactory(scheme), - } - - // TODO: reduce duplication and move to pkg/client/fakes - client := &fake.RESTClient{ - GroupVersion: apiv1alpha1.GroupVersion, - VersionedAPIPath: fmt.Sprintf("/apis/%s", apiv1alpha1.GroupVersion.String()), - NegotiatedSerializer: codecFactory, - Client: fake.CreateHTTPClient(func(_ *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Header: make(http.Header), - Body: objBody(codecFactory.LegacyCodec(apiv1alpha1.GroupVersion), &elements), - }, nil - }), - } - - cs := &crdSource{ - crdClient: client, - namespace: "test-ns", - crdResource: "dnsendpoints", - codec: runtime.NewParameterCodec(scheme), - labelSelector: labels.Everything(), - } - endpoints, err := cs.Endpoints(t.Context()) require.NoError(t, err) testutils.AssertEndpointsHaveRefObject(t, endpoints, types.CRD, len(elements.Items)) } -func helperCreateWatcherWithInformer(t *testing.T) (*cachetesting.FakeControllerSource, crdSource) { +// helperCreateWatcherWithInformer creates a FakeControllerSource-backed informer, +// wires it into a fakeCRDCache, and returns a crdSource so tests can inject +// events and verify handler invocations. +// +// toolscache.NewSharedIndexInformer is used (rather than NewSharedInformer) because +// SharedIndexInformer satisfies the controller-runtime cache.Informer interface, +// which additionally requires AddIndexers. +func helperCreateWatcherWithInformer(t *testing.T) (*cachetesting.FakeControllerSource, *crdSource) { t.Helper() ctx := t.Context() watcher := cachetesting.NewFakeControllerSource() - - informer := cache.NewSharedInformer(watcher, &unstructured.Unstructured{}, 0) + informer := toolscache.NewSharedIndexInformer(watcher, &unstructured.Unstructured{}, 0, toolscache.Indexers{}) go informer.RunWithContext(ctx) - require.Eventually(t, func() bool { - return cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) + return toolscache.WaitForCacheSync(ctx.Done(), informer.HasSynced) }, 2*time.Second, 10*time.Millisecond) - cs := &crdSource{ - informer: informer, - } + fakeCache := newFakeCRDCache(t, informer, fakeCRDCacheFilter{}) + cs, err := newCrdSource(ctx, fakeCache, fakeCache.Client, "", nil) + require.NoError(t, err) - return watcher, *cs + return watcher, cs } // generateTestFixtureDNSEndpointsByType generates DNSEndpoint CRDs according to the provided counts per RecordType. @@ -1023,8 +788,6 @@ func generateTestFixtureDNSEndpointsByType(namespace string, typeCounts map[stri idx++ } } - // Shuffle the result to ensure randomness in the order. - rand.New(rand.NewSource(time.Now().UnixNano())) rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] }) @@ -1033,3 +796,259 @@ func generateTestFixtureDNSEndpointsByType(namespace string, typeCounts map[stri Items: result, } } + +func TestStartAndSync(t *testing.T) { + tests := []struct { + name string + startErr error + syncOK bool + blockStart bool + cancelCtx bool + wantErr string + }{ + { + name: "success", + syncOK: true, + }, + { + name: "sync fails, start error available", + startErr: fmt.Errorf("connection refused"), + syncOK: false, + wantErr: "cache failed to sync: connection refused", + }, + { + name: "sync fails, no start error", + syncOK: false, + wantErr: "cache failed to sync", + }, + { + name: "sync fails, context cancelled before start returns", + syncOK: false, + blockStart: true, + cancelCtx: true, + wantErr: "cache failed to sync: context canceled", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := t.Context() + if tc.cancelCtx { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + cancel() + } + c := &startSyncFakeCache{startErr: tc.startErr, syncOK: tc.syncOK, blockStart: tc.blockStart} + err := startAndSync(ctx, c) + if tc.wantErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.wantErr) + } + }) + } +} + +// TestNewCRDSource covers the error paths inside NewCRDSource that cannot be +// reached through newCrdSource (which uses fake caches and writers directly). +func TestNewCRDSource(t *testing.T) { + tests := []struct { + name string + annotationFilter string + makeRestCfg func(t *testing.T) *rest.Config + ctxTimeout time.Duration // 0 → use t.Context() as-is + wantErrContains string + }{ + { + name: "annotation filter parse error", + annotationFilter: "!!!invalid", + makeRestCfg: func(_ *testing.T) *rest.Config { return &rest.Config{Host: "http://ignored"} }, + wantErrContains: "couldn't parse the selector string", + }, + { + // crcache.New and client.New share the same restConfig and the same + // HTTP-client construction path, so they can't be isolated: any config + // that would make client.New fail makes crcache.New fail first. + name: "cache construction fails: bad TLS cert", + makeRestCfg: func(_ *testing.T) *rest.Config { + return &rest.Config{ + Host: "https://127.0.0.1:1", + TLSClientConfig: rest.TLSClientConfig{CAData: []byte("not-a-pem-cert")}, + } + }, + wantErrContains: "unable to load root certificates", + }, + { + // A fake discovery server lets crcache.New succeed; returning 500 for + // all LIST calls prevents the informer from ever syncing. + name: "cache fails to sync: context deadline exceeded", + makeRestCfg: func(t *testing.T) *rest.Config { return &rest.Config{Host: newFakeDiscoveryServer(t).URL} }, + ctxTimeout: 3 * time.Second, + wantErrContains: "cache failed to sync", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := t.Context() + if tc.ctxTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, tc.ctxTimeout) + t.Cleanup(cancel) + } + _, err := NewCRDSource(ctx, tc.makeRestCfg(t), &Config{AnnotationFilter: tc.annotationFilter}) + require.ErrorContains(t, err, tc.wantErrContains) + }) + } +} + +// newFakeDiscoveryServer starts an httptest.Server that serves just enough of +// the Kubernetes discovery API for crcache.New + client.New to succeed and the +// DNSEndpoint informer to be registered. +func newFakeDiscoveryServer(t *testing.T) *httptest.Server { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + encode := func(v any) { + if err := json.NewEncoder(w).Encode(v); err != nil { + t.Errorf("fakeDiscoveryServer: json.Encode %s: %v", r.URL.Path, err) + } + } + switch r.URL.Path { + case "/api": + encode(metav1.APIVersions{ + TypeMeta: metav1.TypeMeta{Kind: "APIVersions", APIVersion: "v1"}, + Versions: []string{"v1"}, + }) + case "/apis": + encode(metav1.APIGroupList{ + TypeMeta: metav1.TypeMeta{Kind: "APIGroupList", APIVersion: "v1"}, + Groups: []metav1.APIGroup{{ + Name: "externaldns.k8s.io", + Versions: []metav1.GroupVersionForDiscovery{{GroupVersion: "externaldns.k8s.io/v1alpha1", Version: "v1alpha1"}}, + PreferredVersion: metav1.GroupVersionForDiscovery{GroupVersion: "externaldns.k8s.io/v1alpha1", Version: "v1alpha1"}, + }}, + }) + case "/apis/externaldns.k8s.io/v1alpha1": + encode(metav1.APIResourceList{ + TypeMeta: metav1.TypeMeta{Kind: "APIResourceList", APIVersion: "v1"}, + GroupVersion: "externaldns.k8s.io/v1alpha1", + APIResources: []metav1.APIResource{{ + Name: "dnsendpoints", + Namespaced: true, + Kind: "DNSEndpoint", + Verbs: metav1.Verbs{"list", "watch"}, + }}, + }) + default: + // Causes the informer's LIST to fail so the cache never syncs. + http.Error(w, `{"kind":"Status","apiVersion":"v1","status":"Failure","code":500}`, http.StatusInternalServerError) + } + })) + t.Cleanup(srv.Close) + return srv +} + +// startSyncFakeCache is a minimal crcache.Cache stub for TestStartAndSync. +// It embeds fakeCRDCache to satisfy the full interface; only Start and +// WaitForCacheSync are overridden. +type startSyncFakeCache struct { + fakeCRDCache + startErr error + syncOK bool + blockStart bool +} + +func (f *startSyncFakeCache) Start(ctx context.Context) error { + if f.blockStart { + <-ctx.Done() + } + return f.startErr +} +func (f *startSyncFakeCache) WaitForCacheSync(_ context.Context) bool { return f.syncOK } + +// fakeCRDCache implements crCache.Cache for unit tests. +// Reads are backed by a fake.Client so test fixtures are populated with +// WithObjects; cache lifecycle methods are no-ops that return immediately. +type fakeCRDCache struct { + client.Client // provides Get + List from fake.Client + informer toolscache.SharedIndexInformer +} + +func (f *fakeCRDCache) GetInformer(_ context.Context, _ client.Object, _ ...crcache.InformerGetOption) (crcache.Informer, error) { + return f.informer, nil +} + +func (f *fakeCRDCache) GetInformerForKind(_ context.Context, _ schema.GroupVersionKind, _ ...crcache.InformerGetOption) (crcache.Informer, error) { + return f.informer, nil +} + +func (*fakeCRDCache) RemoveInformer(_ context.Context, _ client.Object) error { return nil } +func (*fakeCRDCache) Start(_ context.Context) error { return nil } +func (*fakeCRDCache) WaitForCacheSync(_ context.Context) bool { return true } +func (*fakeCRDCache) IndexField(_ context.Context, _ client.Object, _ string, _ client.IndexerFunc) error { + return nil +} + +// fakeCRDCacheFilter holds the admission criteria applied by the real controller-runtime +// cache (namespace, label selector, annotation selector). Zero value means no filtering. +type fakeCRDCacheFilter struct { + namespace string + labelSelector labels.Selector + annotationSelector labels.Selector +} + +// newFakeCRDCache builds a test cache backed by the given objects. +// Annotation filtering is applied via the transform (mirroring buildCacheOptions). +// Namespace and label filtering are applied at read time by the fake client, mirroring +// the crReader.List options used in Endpoints(). +// When informer is nil a real SharedIndexInformer backed by a FakeControllerSource +// is created to satisfy newCrdSource's GetInformer call; it is not started. +func newFakeCRDCache(t *testing.T, informer toolscache.SharedIndexInformer, filter fakeCRDCacheFilter, objs ...client.Object) *fakeCRDCache { + t.Helper() + if informer == nil { + informer = toolscache.NewSharedIndexInformer( + cachetesting.NewFakeControllerSource(), + &apiv1alpha1.DNSEndpoint{}, + 0, + toolscache.Indexers{}, + ) + } + if len(objs) > 0 { + cacheOpts, err := buildCacheOptions(filter.namespace, filter.labelSelector, filter.annotationSelector) + require.NoError(t, err) + byObj := dnsEndpointByObj(t, cacheOpts) + var admitted []client.Object + for _, obj := range objs { + got, err := byObj.Transform(obj) + require.NoError(t, err) + if got != nil { + admitted = append(admitted, obj) + } + } + objs = admitted + } + fc := fake.NewClientBuilder(). + WithScheme(newCRDTestScheme(t)). + WithStatusSubresource(&apiv1alpha1.DNSEndpoint{}). + WithObjects(objs...). + Build() + return &fakeCRDCache{Client: fc, informer: informer} +} + +// newCRDTestScheme returns a scheme with the external-dns API types registered. +func newCRDTestScheme(t *testing.T) *runtime.Scheme { + t.Helper() + scheme := runtime.NewScheme() + require.NoError(t, apiv1alpha1.AddToScheme(scheme)) + return scheme +} + +// dnsEndpointListToObjects converts a DNSEndpointList slice into a []client.Object slice. +func dnsEndpointListToObjects(items []apiv1alpha1.DNSEndpoint) []client.Object { + objs := make([]client.Object, len(items)) + for i := range items { + objs[i] = &items[i] + } + return objs +} diff --git a/source/informers/transformers.go b/source/informers/transformers.go index c723a872b..f0caf0e3e 100644 --- a/source/informers/transformers.go +++ b/source/informers/transformers.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/scheme" @@ -32,10 +33,11 @@ import ( // All options operate on the metav1.Object interface (or via reflection for Status // fields) and are therefore applicable to any Kubernetes resource type. type TransformOptions struct { - removeManagedFields bool - removeLastAppliedConfig bool - removeStatusConditions bool - keepAnnotationPrefixes []string + removeManagedFields bool + removeLastAppliedConfig bool + removeStatusConditions bool + keepAnnotationPrefixes []string + requireAnnotationSelector labels.Selector } // TransformRemoveManagedFields strips managedFields from the object's metadata. @@ -72,6 +74,16 @@ func TransformKeepAnnotationPrefix(prefixes ...string) func(*TransformOptions) { } } +// TransformRequireAnnotation is a local guard against annotation mutation: +// the Kubernetes API does not support annotation selectors in List/Watch. +// Do not use when an indexer handles annotation filtering. +// A nil or empty selector is a no-op. +func TransformRequireAnnotation(selector labels.Selector) func(*TransformOptions) { + return func(o *TransformOptions) { + o.requireAnnotationSelector = selector + } +} + // TransformerWithOptions returns a cache.TransformFunc that modifies objects of type T // in place to reduce the memory footprint of the informer cache. All options operate // on the metav1.Object interface or via reflection, making the transformer applicable @@ -104,6 +116,11 @@ func TransformerWithOptions[T interface { if !ok { return nil, nil } + if sel := options.requireAnnotationSelector; sel != nil && !sel.Empty() { + if !sel.Matches(labels.Set(entity.GetAnnotations())) { + return nil, nil + } + } populateGVK(entity) if options.removeManagedFields { entity.SetManagedFields(nil) @@ -138,10 +155,7 @@ func MustSetTransform(informer cache.SharedInformer, fn cache.TransformFunc) { } } -// populateGVK sets TypeMeta (Kind/APIVersion) on obj if it is missing. -// Kubernetes informers strip TypeMeta from cached objects because the client already -// knows what type it requested. Populating it here makes cached objects self-describing -// for templates and logging without any per-reconciliation overhead. +// populateGVK restores TypeMeta (Kind/APIVersion) stripped by the Kubernetes informer. func populateGVK(obj runtime.Object) { gvk := obj.GetObjectKind().GroupVersionKind() if gvk.Kind != "" { diff --git a/source/informers/transformers_test.go b/source/informers/transformers_test.go index 5e9ad9111..498e6a2bc 100644 --- a/source/informers/transformers_test.go +++ b/source/informers/transformers_test.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -184,6 +185,65 @@ func TestTransformKeepAnnotationPrefix(t *testing.T) { }) } +func TestTransformRequireAnnotation(t *testing.T) { + t.Run("matching selector keeps object", func(t *testing.T) { + svc := fakeService() // annotations include external-dns.alpha.kubernetes.io/hostname=example.com + sel, err := labels.Parse("external-dns.alpha.kubernetes.io/hostname=example.com") + require.NoError(t, err) + + transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(sel)) + got, err := transform(svc) + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, svc.Name, got.(*corev1.Service).Name) + }) + + t.Run("non-matching selector drops object", func(t *testing.T) { + svc := fakeService() + sel, err := labels.Parse("external-dns.alpha.kubernetes.io/hostname=other.com") + require.NoError(t, err) + + transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(sel)) + got, err := transform(svc) + require.NoError(t, err) + assert.Nil(t, got) + }) + + t.Run("nil selector is a no-op", func(t *testing.T) { + svc := fakeService() + transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(nil)) + got, err := transform(svc) + require.NoError(t, err) + assert.NotNil(t, got) + }) + + t.Run("empty selector is a no-op", func(t *testing.T) { + svc := fakeService() + transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(labels.Everything())) + got, err := transform(svc) + require.NoError(t, err) + assert.NotNil(t, got) + }) + + t.Run("drops object after annotation mutation (simulates MODIFIED event)", func(t *testing.T) { + sel, err := labels.Parse("external-dns.alpha.kubernetes.io/hostname=example.com") + require.NoError(t, err) + transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(sel)) + + // First call: annotation matches, object is admitted. + svc := fakeService() // annotations include external-dns.alpha.kubernetes.io/hostname=example.com + got, err := transform(svc) + require.NoError(t, err) + require.NotNil(t, got) + + // Annotation mutates — simulate MODIFIED event with new value. + svc.Annotations["external-dns.alpha.kubernetes.io/hostname"] = "other.com" + got, err = transform(svc) + require.NoError(t, err) + assert.Nil(t, got, "mutated object must be dropped as a local guard") + }) +} + func TestTransformerWithOptions_Combined(t *testing.T) { svc := fakeService() diff --git a/source/store.go b/source/store.go index 8afd3e17d..c6e771109 100644 --- a/source/store.go +++ b/source/store.go @@ -615,19 +615,13 @@ func buildOpenShiftRouteSource(ctx context.Context, p ClientGenerator, cfg *Conf return NewOcpRouteSource(ctx, ocpClient, cfg) } -// buildCRDSource creates a CRD source for exposing custom resources as DNS records. -// Uses a specialized CRD client created via NewCRDClientForAPIVersionKind. -// Parameter order: crdClient, namespace, kind, annotationFilter, labelFilter, scheme, updateEvents -func buildCRDSource(_ context.Context, p ClientGenerator, cfg *Config) (Source, error) { - client, err := p.KubeClient() +// buildCRDSource creates a CRD source for exposing DNSEndpoint custom resources as DNS records. +func buildCRDSource(ctx context.Context, p ClientGenerator, cfg *Config) (Source, error) { + restConfig, err := p.RESTConfig() if err != nil { return nil, err } - crdClient, scheme, err := NewCRDClientForAPIVersionKind(client, cfg) - if err != nil { - return nil, err - } - return NewCRDSource(crdClient, cfg, scheme) + return NewCRDSource(ctx, restConfig, cfg) } // buildSkipperRouteGroupSource creates a Skipper RouteGroup source for exposing route groups as DNS records. diff --git a/source/store_test.go b/source/store_test.go index 8a9fc47ec..14964e954 100644 --- a/source/store_test.go +++ b/source/store_test.go @@ -140,6 +140,7 @@ func (suite *ByNamesTestSuite) TestSourceNotFound() { func (suite *ByNamesTestSuite) TestKubeClientFails() { mockClientGenerator := new(testutils.MockClientGenerator) mockClientGenerator.On("KubeClient").Return(nil, errors.New("foo")) + mockClientGenerator.On("RESTConfig").Return(nil, errors.New("foo")) sourceUnderTest := []string{ types.Node, types.Service, types.Ingress, types.Pod, types.IstioGateway, types.IstioVirtualService,