mirror of
				https://github.com/kubernetes-sigs/external-dns.git
				synced 2025-10-25 16:51:01 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			297 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			297 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2018 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package source
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"os"
 | |
| 	"strings"
 | |
| 
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	"k8s.io/apimachinery/pkg/watch"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 
 | |
| 	log "github.com/sirupsen/logrus"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/runtime/schema"
 | |
| 	"k8s.io/apimachinery/pkg/runtime/serializer"
 | |
| 	"k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/rest"
 | |
| 	"k8s.io/client-go/tools/clientcmd"
 | |
| 
 | |
| 	"sigs.k8s.io/external-dns/endpoint"
 | |
| )
 | |
| 
 | |
| // crdSource is an implementation of Source that provides endpoints by listing
 | |
| // specified CRD and fetching Endpoints embedded in Spec.
 | |
| type crdSource struct {
 | |
| 	crdClient        rest.Interface
 | |
| 	namespace        string
 | |
| 	crdResource      string
 | |
| 	codec            runtime.ParameterCodec
 | |
| 	annotationFilter string
 | |
| 	labelSelector    labels.Selector
 | |
| 	informer         *cache.SharedInformer
 | |
| }
 | |
| 
 | |
| func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error {
 | |
| 	scheme.AddKnownTypes(groupVersion,
 | |
| 		&endpoint.DNSEndpoint{},
 | |
| 		&endpoint.DNSEndpointList{},
 | |
| 	)
 | |
| 	metav1.AddToGroupVersion(scheme, groupVersion)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // NewCRDClientForAPIVersionKind return rest client for the given apiVersion and kind of the CRD
 | |
| func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiServerURL, apiVersion, kind string) (*rest.RESTClient, *runtime.Scheme, error) {
 | |
| 	if kubeConfig == "" {
 | |
| 		if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
 | |
| 			kubeConfig = clientcmd.RecommendedHomeFile
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	config, err := clientcmd.BuildConfigFromFlags(apiServerURL, kubeConfig)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 
 | |
| 	groupVersion, err := schema.ParseGroupVersion(apiVersion)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 	apiResourceList, err := client.Discovery().ServerResourcesForGroupVersion(groupVersion.String())
 | |
| 	if err != nil {
 | |
| 		return nil, nil, fmt.Errorf("error listing resources in GroupVersion %q: %w", groupVersion.String(), err)
 | |
| 	}
 | |
| 
 | |
| 	var crdAPIResource *metav1.APIResource
 | |
| 	for _, apiResource := range apiResourceList.APIResources {
 | |
| 		if apiResource.Kind == kind {
 | |
| 			crdAPIResource = &apiResource
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	if crdAPIResource == nil {
 | |
| 		return nil, nil, fmt.Errorf("unable to find Resource Kind %q in GroupVersion %q", kind, apiVersion)
 | |
| 	}
 | |
| 
 | |
| 	scheme := runtime.NewScheme()
 | |
| 	addKnownTypes(scheme, groupVersion)
 | |
| 
 | |
| 	config.ContentConfig.GroupVersion = &groupVersion
 | |
| 	config.APIPath = "/apis"
 | |
| 	config.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
 | |
| 
 | |
| 	crdClient, err := rest.UnversionedRESTClientFor(config)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 	return crdClient, scheme, nil
 | |
| }
 | |
| 
 | |
| // NewCRDSource creates a new crdSource with the given config.
 | |
| func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme, startInformer bool) (Source, error) {
 | |
| 	sourceCrd := crdSource{
 | |
| 		crdResource:      strings.ToLower(kind) + "s",
 | |
| 		namespace:        namespace,
 | |
| 		annotationFilter: annotationFilter,
 | |
| 		labelSelector:    labelSelector,
 | |
| 		crdClient:        crdClient,
 | |
| 		codec:            runtime.NewParameterCodec(scheme),
 | |
| 	}
 | |
| 	if startInformer {
 | |
| 		// external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any
 | |
| 		// missed or dropped events are handled.  specify a resync period 0 to avoid unnecessary sync handler invocations.
 | |
| 		informer := cache.NewSharedInformer(
 | |
| 			&cache.ListWatch{
 | |
| 				ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) {
 | |
| 					return sourceCrd.List(context.TODO(), &lo)
 | |
| 				},
 | |
| 				WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) {
 | |
| 					return sourceCrd.watch(context.TODO(), &lo)
 | |
| 				},
 | |
| 			},
 | |
| 			&endpoint.DNSEndpoint{},
 | |
| 			0)
 | |
| 		sourceCrd.informer = &informer
 | |
| 		go informer.Run(wait.NeverStop)
 | |
| 	}
 | |
| 	return &sourceCrd, nil
 | |
| }
 | |
| 
 | |
| func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) {
 | |
| 	if cs.informer != nil {
 | |
| 		log.Debug("Adding event handler for CRD")
 | |
| 		// Right now there is no way to remove event handler from informer, see:
 | |
| 		// https://github.com/kubernetes/kubernetes/issues/79610
 | |
| 		informer := *cs.informer
 | |
| 		informer.AddEventHandler(
 | |
| 			cache.ResourceEventHandlerFuncs{
 | |
| 				AddFunc: func(obj interface{}) {
 | |
| 					handler()
 | |
| 				},
 | |
| 				UpdateFunc: func(old interface{}, new interface{}) {
 | |
| 					handler()
 | |
| 				},
 | |
| 				DeleteFunc: func(obj interface{}) {
 | |
| 					handler()
 | |
| 				},
 | |
| 			},
 | |
| 		)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Endpoints returns endpoint objects.
 | |
| func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
 | |
| 	endpoints := []*endpoint.Endpoint{}
 | |
| 
 | |
| 	var (
 | |
| 		result *endpoint.DNSEndpointList
 | |
| 		err    error
 | |
| 	)
 | |
| 
 | |
| 	result, err = cs.List(ctx, &metav1.ListOptions{LabelSelector: cs.labelSelector.String()})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	result, err = cs.filterByAnnotations(result)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	for _, dnsEndpoint := range result.Items {
 | |
| 		// Make sure that all endpoints have targets for A or CNAME type
 | |
| 		crdEndpoints := []*endpoint.Endpoint{}
 | |
| 		for _, ep := range dnsEndpoint.Spec.Endpoints {
 | |
| 			if (ep.RecordType == "CNAME" || ep.RecordType == "A" || ep.RecordType == "AAAA") && len(ep.Targets) < 1 {
 | |
| 				log.Warnf("Endpoint %s with DNSName %s has an empty list of targets", dnsEndpoint.ObjectMeta.Name, ep.DNSName)
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			illegalTarget := false
 | |
| 			for _, target := range ep.Targets {
 | |
| 				if strings.HasSuffix(target, ".") {
 | |
| 					illegalTarget = true
 | |
| 					break
 | |
| 				}
 | |
| 			}
 | |
| 			if illegalTarget {
 | |
| 				log.Warnf("Endpoint %s with DNSName %s has an illegal target. The subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. 'example.com')", dnsEndpoint.ObjectMeta.Name, ep.DNSName)
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			if ep.Labels == nil {
 | |
| 				ep.Labels = endpoint.NewLabels()
 | |
| 			}
 | |
| 
 | |
| 			crdEndpoints = append(crdEndpoints, ep)
 | |
| 		}
 | |
| 
 | |
| 		cs.setResourceLabel(&dnsEndpoint, crdEndpoints)
 | |
| 		endpoints = append(endpoints, crdEndpoints...)
 | |
| 
 | |
| 		if dnsEndpoint.Status.ObservedGeneration == dnsEndpoint.Generation {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		dnsEndpoint.Status.ObservedGeneration = dnsEndpoint.Generation
 | |
| 		// Update the ObservedGeneration
 | |
| 		_, err = cs.UpdateStatus(ctx, &dnsEndpoint)
 | |
| 		if err != nil {
 | |
| 			log.Warnf("Could not update ObservedGeneration of the CRD: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return endpoints, nil
 | |
| }
 | |
| 
 | |
| func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*endpoint.Endpoint) {
 | |
| 	for _, ep := range endpoints {
 | |
| 		ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("crd/%s/%s", crd.ObjectMeta.Namespace, crd.ObjectMeta.Name)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) {
 | |
| 	opts.Watch = true
 | |
| 	return cs.crdClient.Get().
 | |
| 		Namespace(cs.namespace).
 | |
| 		Resource(cs.crdResource).
 | |
| 		VersionedParams(opts, cs.codec).
 | |
| 		Watch(ctx)
 | |
| }
 | |
| 
 | |
| func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) {
 | |
| 	result = &endpoint.DNSEndpointList{}
 | |
| 	err = cs.crdClient.Get().
 | |
| 		Namespace(cs.namespace).
 | |
| 		Resource(cs.crdResource).
 | |
| 		VersionedParams(opts, cs.codec).
 | |
| 		Do(ctx).
 | |
| 		Into(result)
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (cs *crdSource) UpdateStatus(ctx context.Context, dnsEndpoint *endpoint.DNSEndpoint) (result *endpoint.DNSEndpoint, err error) {
 | |
| 	result = &endpoint.DNSEndpoint{}
 | |
| 	err = cs.crdClient.Put().
 | |
| 		Namespace(dnsEndpoint.Namespace).
 | |
| 		Resource(cs.crdResource).
 | |
| 		Name(dnsEndpoint.Name).
 | |
| 		SubResource("status").
 | |
| 		Body(dnsEndpoint).
 | |
| 		Do(ctx).
 | |
| 		Into(result)
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // filterByAnnotations filters a list of dnsendpoints by a given annotation selector.
 | |
| func (cs *crdSource) filterByAnnotations(dnsendpoints *endpoint.DNSEndpointList) (*endpoint.DNSEndpointList, error) {
 | |
| 	labelSelector, err := metav1.ParseToLabelSelector(cs.annotationFilter)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	selector, err := metav1.LabelSelectorAsSelector(labelSelector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// empty filter returns original list
 | |
| 	if selector.Empty() {
 | |
| 		return dnsendpoints, nil
 | |
| 	}
 | |
| 
 | |
| 	filteredList := endpoint.DNSEndpointList{}
 | |
| 
 | |
| 	for _, dnsendpoint := range dnsendpoints.Items {
 | |
| 		// convert the dnsendpoint' annotations to an equivalent label selector
 | |
| 		annotations := labels.Set(dnsendpoint.Annotations)
 | |
| 
 | |
| 		// include dnsendpoint if its annotations match the selector
 | |
| 		if selector.Matches(annotations) {
 | |
| 			filteredList.Items = append(filteredList.Items, dnsendpoint)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return &filteredList, nil
 | |
| }
 |