mirror of
https://github.com/kubernetes-sigs/external-dns.git
synced 2026-05-04 22:26:11 +02:00
refactor(source/crd): migrate CRD source to controller-runtime cache (#6312)
* refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Co-authored-by: vflaux <38909103+vflaux@users.noreply.github.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Co-authored-by: vflaux <38909103+vflaux@users.noreply.github.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * refactor(source/crd): migrate CRD source to controller-runtime cache Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> --------- Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> Co-authored-by: vflaux <38909103+vflaux@users.noreply.github.com>
This commit is contained in:
parent
84198f906c
commit
ea4d2d1681
@ -29,6 +29,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"k8s.io/klog/v2"
|
||||
crlog "sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
|
||||
@ -74,6 +75,10 @@ func Execute() {
|
||||
klog.SetLogger(logr.Discard())
|
||||
}
|
||||
|
||||
// controller-runtime prints a stack trace warning if its logger is never initialized.
|
||||
// external-dns uses logrus for all logging, so controller-runtime's logr output is discarded here.
|
||||
crlog.SetLogger(logr.Discard())
|
||||
|
||||
log.Info(externaldns.Banner())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
295
source/crd.go
295
source/crd.go
@ -19,30 +19,21 @@ package source
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/rest"
|
||||
crcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
apiv1alpha1 "sigs.k8s.io/external-dns/apis/v1alpha1"
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/pkg/events"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
"sigs.k8s.io/external-dns/source/informers"
|
||||
"sigs.k8s.io/external-dns/source/types"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
|
||||
apiv1alpha1 "sigs.k8s.io/external-dns/apis/v1alpha1"
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
)
|
||||
|
||||
// crdSource is an implementation of Source that provides endpoints by listing
|
||||
@ -58,147 +49,68 @@ import (
|
||||
// +externaldns:source:events=true
|
||||
// +externaldns:source:provider-specific=true
|
||||
type crdSource struct {
|
||||
crdClient rest.Interface
|
||||
namespace string
|
||||
crdResource string
|
||||
codec runtime.ParameterCodec
|
||||
annotationFilter string
|
||||
labelSelector labels.Selector
|
||||
informer cache.SharedInformer
|
||||
crReader client.Reader
|
||||
crWriter client.Client // status writes
|
||||
informer crcache.Informer
|
||||
listOpts []client.ListOption
|
||||
}
|
||||
|
||||
// NewCRDClientForAPIVersionKind return rest client for the given apiVersion and kind of the CRD
|
||||
func NewCRDClientForAPIVersionKind(
|
||||
client kubernetes.Interface,
|
||||
cfg *Config,
|
||||
) (*rest.RESTClient, *runtime.Scheme, error) {
|
||||
kubeConfig := cfg.KubeConfig
|
||||
if kubeConfig == "" {
|
||||
if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
|
||||
kubeConfig = clientcmd.RecommendedHomeFile
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: GetRestConfig logic is duplicated from store.go, refactor to avoid duplication
|
||||
config, err := clientcmd.BuildConfigFromFlags(cfg.APIServerURL, kubeConfig)
|
||||
// NewCRDSource creates a new crdSource backed by a controller-runtime cache.
|
||||
// It builds the scheme, cache, and status-write client from restConfig and cfg.
|
||||
func NewCRDSource(ctx context.Context, restConfig *rest.Config, cfg *Config) (Source, error) {
|
||||
annotationSelector, err := annotations.ParseFilter(cfg.AnnotationFilter)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
groupVersion, err := schema.ParseGroupVersion(cfg.CRDSourceAPIVersion)
|
||||
opts, err := buildCacheOptions(cfg.Namespace, cfg.LabelFilter, annotationSelector)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
apiResourceList, err := client.Discovery().ServerResourcesForGroupVersion(groupVersion.String())
|
||||
|
||||
c, err := crcache.New(restConfig, opts)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error listing resources in GroupVersion %q: %w", groupVersion.String(), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var crdAPIResource *metav1.APIResource
|
||||
for _, apiResource := range apiResourceList.APIResources {
|
||||
if apiResource.Kind == cfg.CRDSourceKind {
|
||||
crdAPIResource = &apiResource
|
||||
break
|
||||
}
|
||||
}
|
||||
if crdAPIResource == nil {
|
||||
return nil, nil, fmt.Errorf("unable to find Resource Kind %q in GroupVersion %q", cfg.CRDSourceKind, cfg.CRDSourceAPIVersion)
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
_ = apiv1alpha1.AddToScheme(scheme)
|
||||
|
||||
config.GroupVersion = &groupVersion
|
||||
config.APIPath = "/apis"
|
||||
config.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
|
||||
|
||||
crdClient, err := rest.UnversionedRESTClientFor(config)
|
||||
// crWriter is used exclusively for status writes; reads come from the cache.
|
||||
crWriter, err := client.New(restConfig, client.Options{Scheme: opts.Scheme})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
return crdClient, scheme, nil
|
||||
}
|
||||
|
||||
// NewCRDSource creates a new crdSource with the given config.
|
||||
func NewCRDSource(
|
||||
crdClient rest.Interface,
|
||||
cfg *Config,
|
||||
scheme *runtime.Scheme) (Source, error) {
|
||||
sourceCrd := crdSource{
|
||||
crdResource: strings.ToLower(cfg.CRDSourceKind) + "s",
|
||||
namespace: cfg.Namespace,
|
||||
annotationFilter: cfg.AnnotationFilter,
|
||||
labelSelector: cfg.LabelFilter,
|
||||
crdClient: crdClient,
|
||||
codec: runtime.NewParameterCodec(scheme),
|
||||
}
|
||||
if cfg.UpdateEvents {
|
||||
// external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any
|
||||
// missed or dropped events are handled. specify resync period 0 to avoid unnecessary sync handler invocations.
|
||||
sourceCrd.informer = cache.NewSharedInformer(
|
||||
&cache.ListWatch{
|
||||
ListWithContextFunc: func(ctx context.Context, lo metav1.ListOptions) (runtime.Object, error) {
|
||||
return sourceCrd.List(ctx, &lo)
|
||||
},
|
||||
WatchFuncWithContext: func(ctx context.Context, lo metav1.ListOptions) (watch.Interface, error) {
|
||||
return sourceCrd.watch(ctx, &lo)
|
||||
},
|
||||
},
|
||||
&apiv1alpha1.DNSEndpoint{},
|
||||
0)
|
||||
go sourceCrd.informer.Run(wait.NeverStop)
|
||||
}
|
||||
return &sourceCrd, nil
|
||||
return newCrdSource(ctx, c, crWriter, cfg.Namespace, cfg.LabelFilter)
|
||||
}
|
||||
|
||||
func (cs *crdSource) AddEventHandler(_ context.Context, handler func()) {
|
||||
if cs.informer != nil {
|
||||
log.Debug("Adding event handler for CRD")
|
||||
// Right now there is no way to remove event handler from informer, see:
|
||||
// https://github.com/kubernetes/kubernetes/issues/79610
|
||||
informers.MustAddEventHandler(cs.informer,
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(_ any) {
|
||||
handler()
|
||||
},
|
||||
UpdateFunc: func(_ any, _ any) {
|
||||
handler()
|
||||
},
|
||||
DeleteFunc: func(_ any) {
|
||||
handler()
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
log.Debug("crd: adding event handler")
|
||||
// Right now there is no way to remove event handler from informer, see:
|
||||
// https://github.com/kubernetes/kubernetes/issues/79610
|
||||
_, _ = cs.informer.AddEventHandler(eventHandlerFunc(handler))
|
||||
}
|
||||
|
||||
// Endpoints returns endpoint objects.
|
||||
// Endpoints returns endpoint objects for all DNSEndpoint resources visible to
|
||||
// this source. Namespace, label, and annotation filtering are handled at the
|
||||
// cache level via buildCacheOptions; target-format validation is applied here.
|
||||
func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
endpoints := []*endpoint.Endpoint{}
|
||||
|
||||
var (
|
||||
result *apiv1alpha1.DNSEndpointList
|
||||
err error
|
||||
)
|
||||
|
||||
result, err = cs.List(ctx, &metav1.ListOptions{LabelSelector: cs.labelSelector.String()})
|
||||
if err != nil {
|
||||
list := &apiv1alpha1.DNSEndpointList{}
|
||||
if err := cs.crReader.List(ctx, list, cs.listOpts...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
itemPtrs := make([]*apiv1alpha1.DNSEndpoint, len(result.Items))
|
||||
for i := range result.Items {
|
||||
itemPtrs[i] = &result.Items[i]
|
||||
}
|
||||
|
||||
filtered, err := annotations.Filter(itemPtrs, cs.annotationFilter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, dnsEndpoint := range filtered {
|
||||
endpoints := make([]*endpoint.Endpoint, 0, len(list.Items))
|
||||
for i := range list.Items {
|
||||
dnsEndpoint := &list.Items[i]
|
||||
var crdEndpoints []*endpoint.Endpoint
|
||||
for _, ep := range dnsEndpoint.Spec.Endpoints {
|
||||
if ep == nil {
|
||||
log.Debugf(
|
||||
"Skipping nil endpoint in DNSEndpoint %s/%s at spec.endpoints",
|
||||
dnsEndpoint.Namespace,
|
||||
dnsEndpoint.Name,
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
if (ep.RecordType == endpoint.RecordTypeCNAME || ep.RecordType == endpoint.RecordTypeA || ep.RecordType == endpoint.RecordTypeAAAA) && len(ep.Targets) < 1 {
|
||||
log.Debugf("Endpoint %s with DNSName %s has an empty list of targets, allowing it to pass through for default-targets processing", dnsEndpoint.Name, ep.DNSName)
|
||||
}
|
||||
@ -206,7 +118,7 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
|
||||
for _, target := range ep.Targets {
|
||||
switch ep.RecordType {
|
||||
case endpoint.RecordTypeTXT, endpoint.RecordTypeMX:
|
||||
continue // TXT records allow arbitrary text, skip validation; MX records can have trailing dot but it's not required, skip validation
|
||||
continue // no format constraint on targets
|
||||
case endpoint.RecordTypeCNAME:
|
||||
continue // RFC 1035 §5.1: trailing dot denotes an absolute FQDN in zone file notation; both forms are valid
|
||||
}
|
||||
@ -215,9 +127,9 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
|
||||
|
||||
switch ep.RecordType {
|
||||
case endpoint.RecordTypeNAPTR:
|
||||
illegalTarget = !hasDot // Must have trailing dot
|
||||
illegalTarget = !hasDot
|
||||
default:
|
||||
illegalTarget = hasDot // Must NOT have trailing dot
|
||||
illegalTarget = hasDot
|
||||
}
|
||||
|
||||
if illegalTarget {
|
||||
@ -235,7 +147,6 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
|
||||
}
|
||||
|
||||
ep.WithLabel(endpoint.ResourceLabelKey, fmt.Sprintf("crd/%s/%s", dnsEndpoint.Namespace, dnsEndpoint.Name))
|
||||
|
||||
crdEndpoints = append(crdEndpoints, ep)
|
||||
}
|
||||
|
||||
@ -247,9 +158,7 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
|
||||
}
|
||||
|
||||
dnsEndpoint.Status.ObservedGeneration = dnsEndpoint.Generation
|
||||
// Update the ObservedGeneration
|
||||
_, err = cs.UpdateStatus(ctx, dnsEndpoint)
|
||||
if err != nil {
|
||||
if err := cs.crWriter.Status().Update(ctx, dnsEndpoint); err != nil {
|
||||
log.Warnf("Could not update ObservedGeneration of the CRD: %v", err)
|
||||
}
|
||||
}
|
||||
@ -257,33 +166,85 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
|
||||
return MergeEndpoints(endpoints), nil
|
||||
}
|
||||
|
||||
func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) {
|
||||
opts.Watch = true
|
||||
return cs.crdClient.Get().
|
||||
Namespace(cs.namespace).
|
||||
Resource(cs.crdResource).
|
||||
VersionedParams(opts, cs.codec).
|
||||
Watch(ctx)
|
||||
// newCrdSource wires a cache and writer into a running crdSource.
|
||||
func newCrdSource(
|
||||
ctx context.Context,
|
||||
c crcache.Cache,
|
||||
crWriter client.Client,
|
||||
namespace string,
|
||||
labelSelector labels.Selector) (*crdSource, error) {
|
||||
inf, err := c.GetInformer(ctx, &apiv1alpha1.DNSEndpoint{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, _ = inf.AddEventHandler(informers.DefaultEventHandler())
|
||||
|
||||
listOpts := []client.ListOption{client.InNamespace(namespace)}
|
||||
if labelSelector != nil && !labelSelector.Empty() {
|
||||
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector})
|
||||
}
|
||||
|
||||
cs := &crdSource{
|
||||
crReader: c,
|
||||
crWriter: crWriter,
|
||||
informer: inf,
|
||||
listOpts: listOpts,
|
||||
}
|
||||
|
||||
if err := startAndSync(ctx, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (*apiv1alpha1.DNSEndpointList, error) {
|
||||
result := &apiv1alpha1.DNSEndpointList{}
|
||||
return result, cs.crdClient.Get().
|
||||
Namespace(cs.namespace).
|
||||
Resource(cs.crdResource).
|
||||
VersionedParams(opts, cs.codec).
|
||||
Do(ctx).
|
||||
Into(result)
|
||||
// startAndSync starts the cache in a goroutine and waits for it to sync.
|
||||
// Returns an error if the cache fails to start or sync.
|
||||
func startAndSync(ctx context.Context, c crcache.Cache) error {
|
||||
errCh := make(chan error, 1)
|
||||
go func() { errCh <- c.Start(ctx) }()
|
||||
if !c.WaitForCacheSync(ctx) {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
return fmt.Errorf("cache failed to sync: %w", err)
|
||||
}
|
||||
return fmt.Errorf("cache failed to sync")
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("cache failed to sync: %w", ctx.Err())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *crdSource) UpdateStatus(ctx context.Context, dnsEndpoint *apiv1alpha1.DNSEndpoint) (*apiv1alpha1.DNSEndpoint, error) {
|
||||
result := &apiv1alpha1.DNSEndpoint{}
|
||||
return result, cs.crdClient.Put().
|
||||
Namespace(dnsEndpoint.Namespace).
|
||||
Resource(cs.crdResource).
|
||||
Name(dnsEndpoint.Name).
|
||||
SubResource("status").
|
||||
Body(dnsEndpoint).
|
||||
Do(ctx).
|
||||
Into(result)
|
||||
// buildCacheOptions constructs the controller-runtime cache options for the
|
||||
// given namespace and label selector. Extracted so the namespace/label scoping
|
||||
// logic can be unit-tested without a running API server.
|
||||
func buildCacheOptions(namespace string, labelFilter, annotationSelector labels.Selector) (crcache.Options, error) {
|
||||
scheme := runtime.NewScheme()
|
||||
if err := apiv1alpha1.AddToScheme(scheme); err != nil {
|
||||
return crcache.Options{}, err
|
||||
}
|
||||
|
||||
nsMap := map[string]crcache.Config{
|
||||
namespace: {}, // "" == NamespaceAll
|
||||
}
|
||||
byObj := crcache.ByObject{
|
||||
Namespaces: nsMap,
|
||||
Transform: informers.TransformerWithOptions[*apiv1alpha1.DNSEndpoint](
|
||||
informers.TransformRemoveManagedFields(),
|
||||
informers.TransformRemoveLastAppliedConfig(),
|
||||
informers.TransformRequireAnnotation(annotationSelector),
|
||||
),
|
||||
}
|
||||
if labelFilter != nil && !labelFilter.Empty() {
|
||||
byObj.Label = labelFilter
|
||||
}
|
||||
return crcache.Options{
|
||||
Scheme: scheme,
|
||||
ByObject: map[client.Object]crcache.ByObject{
|
||||
&apiv1alpha1.DNSEndpoint{}: byObj,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
1149
source/crd_test.go
1149
source/crd_test.go
File diff suppressed because it is too large
Load Diff
@ -22,6 +22,7 @@ import (
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
@ -32,10 +33,11 @@ import (
|
||||
// All options operate on the metav1.Object interface (or via reflection for Status
|
||||
// fields) and are therefore applicable to any Kubernetes resource type.
|
||||
type TransformOptions struct {
|
||||
removeManagedFields bool
|
||||
removeLastAppliedConfig bool
|
||||
removeStatusConditions bool
|
||||
keepAnnotationPrefixes []string
|
||||
removeManagedFields bool
|
||||
removeLastAppliedConfig bool
|
||||
removeStatusConditions bool
|
||||
keepAnnotationPrefixes []string
|
||||
requireAnnotationSelector labels.Selector
|
||||
}
|
||||
|
||||
// TransformRemoveManagedFields strips managedFields from the object's metadata.
|
||||
@ -72,6 +74,16 @@ func TransformKeepAnnotationPrefix(prefixes ...string) func(*TransformOptions) {
|
||||
}
|
||||
}
|
||||
|
||||
// TransformRequireAnnotation is a local guard against annotation mutation:
|
||||
// the Kubernetes API does not support annotation selectors in List/Watch.
|
||||
// Do not use when an indexer handles annotation filtering.
|
||||
// A nil or empty selector is a no-op.
|
||||
func TransformRequireAnnotation(selector labels.Selector) func(*TransformOptions) {
|
||||
return func(o *TransformOptions) {
|
||||
o.requireAnnotationSelector = selector
|
||||
}
|
||||
}
|
||||
|
||||
// TransformerWithOptions returns a cache.TransformFunc that modifies objects of type T
|
||||
// in place to reduce the memory footprint of the informer cache. All options operate
|
||||
// on the metav1.Object interface or via reflection, making the transformer applicable
|
||||
@ -104,6 +116,11 @@ func TransformerWithOptions[T interface {
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
if sel := options.requireAnnotationSelector; sel != nil && !sel.Empty() {
|
||||
if !sel.Matches(labels.Set(entity.GetAnnotations())) {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
populateGVK(entity)
|
||||
if options.removeManagedFields {
|
||||
entity.SetManagedFields(nil)
|
||||
@ -138,10 +155,7 @@ func MustSetTransform(informer cache.SharedInformer, fn cache.TransformFunc) {
|
||||
}
|
||||
}
|
||||
|
||||
// populateGVK sets TypeMeta (Kind/APIVersion) on obj if it is missing.
|
||||
// Kubernetes informers strip TypeMeta from cached objects because the client already
|
||||
// knows what type it requested. Populating it here makes cached objects self-describing
|
||||
// for templates and logging without any per-reconciliation overhead.
|
||||
// populateGVK restores TypeMeta (Kind/APIVersion) stripped by the Kubernetes informer.
|
||||
func populateGVK(obj runtime.Object) {
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
if gvk.Kind != "" {
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
@ -184,6 +185,65 @@ func TestTransformKeepAnnotationPrefix(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestTransformRequireAnnotation(t *testing.T) {
|
||||
t.Run("matching selector keeps object", func(t *testing.T) {
|
||||
svc := fakeService() // annotations include external-dns.alpha.kubernetes.io/hostname=example.com
|
||||
sel, err := labels.Parse("external-dns.alpha.kubernetes.io/hostname=example.com")
|
||||
require.NoError(t, err)
|
||||
|
||||
transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(sel))
|
||||
got, err := transform(svc)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, got)
|
||||
assert.Equal(t, svc.Name, got.(*corev1.Service).Name)
|
||||
})
|
||||
|
||||
t.Run("non-matching selector drops object", func(t *testing.T) {
|
||||
svc := fakeService()
|
||||
sel, err := labels.Parse("external-dns.alpha.kubernetes.io/hostname=other.com")
|
||||
require.NoError(t, err)
|
||||
|
||||
transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(sel))
|
||||
got, err := transform(svc)
|
||||
require.NoError(t, err)
|
||||
assert.Nil(t, got)
|
||||
})
|
||||
|
||||
t.Run("nil selector is a no-op", func(t *testing.T) {
|
||||
svc := fakeService()
|
||||
transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(nil))
|
||||
got, err := transform(svc)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, got)
|
||||
})
|
||||
|
||||
t.Run("empty selector is a no-op", func(t *testing.T) {
|
||||
svc := fakeService()
|
||||
transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(labels.Everything()))
|
||||
got, err := transform(svc)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, got)
|
||||
})
|
||||
|
||||
t.Run("drops object after annotation mutation (simulates MODIFIED event)", func(t *testing.T) {
|
||||
sel, err := labels.Parse("external-dns.alpha.kubernetes.io/hostname=example.com")
|
||||
require.NoError(t, err)
|
||||
transform := TransformerWithOptions[*corev1.Service](TransformRequireAnnotation(sel))
|
||||
|
||||
// First call: annotation matches, object is admitted.
|
||||
svc := fakeService() // annotations include external-dns.alpha.kubernetes.io/hostname=example.com
|
||||
got, err := transform(svc)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, got)
|
||||
|
||||
// Annotation mutates — simulate MODIFIED event with new value.
|
||||
svc.Annotations["external-dns.alpha.kubernetes.io/hostname"] = "other.com"
|
||||
got, err = transform(svc)
|
||||
require.NoError(t, err)
|
||||
assert.Nil(t, got, "mutated object must be dropped as a local guard")
|
||||
})
|
||||
}
|
||||
|
||||
func TestTransformerWithOptions_Combined(t *testing.T) {
|
||||
svc := fakeService()
|
||||
|
||||
|
||||
@ -615,19 +615,13 @@ func buildOpenShiftRouteSource(ctx context.Context, p ClientGenerator, cfg *Conf
|
||||
return NewOcpRouteSource(ctx, ocpClient, cfg)
|
||||
}
|
||||
|
||||
// buildCRDSource creates a CRD source for exposing custom resources as DNS records.
|
||||
// Uses a specialized CRD client created via NewCRDClientForAPIVersionKind.
|
||||
// Parameter order: crdClient, namespace, kind, annotationFilter, labelFilter, scheme, updateEvents
|
||||
func buildCRDSource(_ context.Context, p ClientGenerator, cfg *Config) (Source, error) {
|
||||
client, err := p.KubeClient()
|
||||
// buildCRDSource creates a CRD source for exposing DNSEndpoint custom resources as DNS records.
|
||||
func buildCRDSource(ctx context.Context, p ClientGenerator, cfg *Config) (Source, error) {
|
||||
restConfig, err := p.RESTConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
crdClient, scheme, err := NewCRDClientForAPIVersionKind(client, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewCRDSource(crdClient, cfg, scheme)
|
||||
return NewCRDSource(ctx, restConfig, cfg)
|
||||
}
|
||||
|
||||
// buildSkipperRouteGroupSource creates a Skipper RouteGroup source for exposing route groups as DNS records.
|
||||
|
||||
@ -140,6 +140,7 @@ func (suite *ByNamesTestSuite) TestSourceNotFound() {
|
||||
func (suite *ByNamesTestSuite) TestKubeClientFails() {
|
||||
mockClientGenerator := new(testutils.MockClientGenerator)
|
||||
mockClientGenerator.On("KubeClient").Return(nil, errors.New("foo"))
|
||||
mockClientGenerator.On("RESTConfig").Return(nil, errors.New("foo"))
|
||||
|
||||
sourceUnderTest := []string{
|
||||
types.Node, types.Service, types.Ingress, types.Pod, types.IstioGateway, types.IstioVirtualService,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user