mirror of
				https://github.com/kubernetes-sigs/external-dns.git
				synced 2025-11-04 04:31:00 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			297 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			297 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2018 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package source
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"os"
 | 
						|
	"strings"
 | 
						|
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/apimachinery/pkg/watch"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
 | 
						|
	log "github.com/sirupsen/logrus"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/labels"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime/serializer"
 | 
						|
	"k8s.io/client-go/kubernetes"
 | 
						|
	"k8s.io/client-go/rest"
 | 
						|
	"k8s.io/client-go/tools/clientcmd"
 | 
						|
 | 
						|
	"sigs.k8s.io/external-dns/endpoint"
 | 
						|
)
 | 
						|
 | 
						|
// crdSource is an implementation of Source that provides endpoints by listing
 | 
						|
// specified CRD and fetching Endpoints embedded in Spec.
 | 
						|
type crdSource struct {
 | 
						|
	crdClient        rest.Interface
 | 
						|
	namespace        string
 | 
						|
	crdResource      string
 | 
						|
	codec            runtime.ParameterCodec
 | 
						|
	annotationFilter string
 | 
						|
	labelSelector    labels.Selector
 | 
						|
	informer         *cache.SharedInformer
 | 
						|
}
 | 
						|
 | 
						|
func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error {
 | 
						|
	scheme.AddKnownTypes(groupVersion,
 | 
						|
		&endpoint.DNSEndpoint{},
 | 
						|
		&endpoint.DNSEndpointList{},
 | 
						|
	)
 | 
						|
	metav1.AddToGroupVersion(scheme, groupVersion)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// NewCRDClientForAPIVersionKind return rest client for the given apiVersion and kind of the CRD
 | 
						|
func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiServerURL, apiVersion, kind string) (*rest.RESTClient, *runtime.Scheme, error) {
 | 
						|
	if kubeConfig == "" {
 | 
						|
		if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
 | 
						|
			kubeConfig = clientcmd.RecommendedHomeFile
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	config, err := clientcmd.BuildConfigFromFlags(apiServerURL, kubeConfig)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	groupVersion, err := schema.ParseGroupVersion(apiVersion)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
	apiResourceList, err := client.Discovery().ServerResourcesForGroupVersion(groupVersion.String())
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, fmt.Errorf("error listing resources in GroupVersion %q: %w", groupVersion.String(), err)
 | 
						|
	}
 | 
						|
 | 
						|
	var crdAPIResource *metav1.APIResource
 | 
						|
	for _, apiResource := range apiResourceList.APIResources {
 | 
						|
		if apiResource.Kind == kind {
 | 
						|
			crdAPIResource = &apiResource
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if crdAPIResource == nil {
 | 
						|
		return nil, nil, fmt.Errorf("unable to find Resource Kind %q in GroupVersion %q", kind, apiVersion)
 | 
						|
	}
 | 
						|
 | 
						|
	scheme := runtime.NewScheme()
 | 
						|
	addKnownTypes(scheme, groupVersion)
 | 
						|
 | 
						|
	config.ContentConfig.GroupVersion = &groupVersion
 | 
						|
	config.APIPath = "/apis"
 | 
						|
	config.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
 | 
						|
 | 
						|
	crdClient, err := rest.UnversionedRESTClientFor(config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
	return crdClient, scheme, nil
 | 
						|
}
 | 
						|
 | 
						|
// NewCRDSource creates a new crdSource with the given config.
 | 
						|
func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme, startInformer bool) (Source, error) {
 | 
						|
	sourceCrd := crdSource{
 | 
						|
		crdResource:      strings.ToLower(kind) + "s",
 | 
						|
		namespace:        namespace,
 | 
						|
		annotationFilter: annotationFilter,
 | 
						|
		labelSelector:    labelSelector,
 | 
						|
		crdClient:        crdClient,
 | 
						|
		codec:            runtime.NewParameterCodec(scheme),
 | 
						|
	}
 | 
						|
	if startInformer {
 | 
						|
		// external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any
 | 
						|
		// missed or dropped events are handled.  specify a resync period 0 to avoid unnecessary sync handler invocations.
 | 
						|
		informer := cache.NewSharedInformer(
 | 
						|
			&cache.ListWatch{
 | 
						|
				ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) {
 | 
						|
					return sourceCrd.List(context.TODO(), &lo)
 | 
						|
				},
 | 
						|
				WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) {
 | 
						|
					return sourceCrd.watch(context.TODO(), &lo)
 | 
						|
				},
 | 
						|
			},
 | 
						|
			&endpoint.DNSEndpoint{},
 | 
						|
			0)
 | 
						|
		sourceCrd.informer = &informer
 | 
						|
		go informer.Run(wait.NeverStop)
 | 
						|
	}
 | 
						|
	return &sourceCrd, nil
 | 
						|
}
 | 
						|
 | 
						|
func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) {
 | 
						|
	if cs.informer != nil {
 | 
						|
		log.Debug("Adding event handler for CRD")
 | 
						|
		// Right now there is no way to remove event handler from informer, see:
 | 
						|
		// https://github.com/kubernetes/kubernetes/issues/79610
 | 
						|
		informer := *cs.informer
 | 
						|
		informer.AddEventHandler(
 | 
						|
			cache.ResourceEventHandlerFuncs{
 | 
						|
				AddFunc: func(obj interface{}) {
 | 
						|
					handler()
 | 
						|
				},
 | 
						|
				UpdateFunc: func(old interface{}, new interface{}) {
 | 
						|
					handler()
 | 
						|
				},
 | 
						|
				DeleteFunc: func(obj interface{}) {
 | 
						|
					handler()
 | 
						|
				},
 | 
						|
			},
 | 
						|
		)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Endpoints returns endpoint objects.
 | 
						|
func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
 | 
						|
	endpoints := []*endpoint.Endpoint{}
 | 
						|
 | 
						|
	var (
 | 
						|
		result *endpoint.DNSEndpointList
 | 
						|
		err    error
 | 
						|
	)
 | 
						|
 | 
						|
	result, err = cs.List(ctx, &metav1.ListOptions{LabelSelector: cs.labelSelector.String()})
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	result, err = cs.filterByAnnotations(result)
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	for _, dnsEndpoint := range result.Items {
 | 
						|
		// Make sure that all endpoints have targets for A or CNAME type
 | 
						|
		crdEndpoints := []*endpoint.Endpoint{}
 | 
						|
		for _, ep := range dnsEndpoint.Spec.Endpoints {
 | 
						|
			if (ep.RecordType == "CNAME" || ep.RecordType == "A" || ep.RecordType == "AAAA") && len(ep.Targets) < 1 {
 | 
						|
				log.Warnf("Endpoint %s with DNSName %s has an empty list of targets", dnsEndpoint.ObjectMeta.Name, ep.DNSName)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			illegalTarget := false
 | 
						|
			for _, target := range ep.Targets {
 | 
						|
				if strings.HasSuffix(target, ".") {
 | 
						|
					illegalTarget = true
 | 
						|
					break
 | 
						|
				}
 | 
						|
			}
 | 
						|
			if illegalTarget {
 | 
						|
				log.Warnf("Endpoint %s with DNSName %s has an illegal target. The subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. 'example.com')", dnsEndpoint.ObjectMeta.Name, ep.DNSName)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			if ep.Labels == nil {
 | 
						|
				ep.Labels = endpoint.NewLabels()
 | 
						|
			}
 | 
						|
 | 
						|
			crdEndpoints = append(crdEndpoints, ep)
 | 
						|
		}
 | 
						|
 | 
						|
		cs.setResourceLabel(&dnsEndpoint, crdEndpoints)
 | 
						|
		endpoints = append(endpoints, crdEndpoints...)
 | 
						|
 | 
						|
		if dnsEndpoint.Status.ObservedGeneration == dnsEndpoint.Generation {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		dnsEndpoint.Status.ObservedGeneration = dnsEndpoint.Generation
 | 
						|
		// Update the ObservedGeneration
 | 
						|
		_, err = cs.UpdateStatus(ctx, &dnsEndpoint)
 | 
						|
		if err != nil {
 | 
						|
			log.Warnf("Could not update ObservedGeneration of the CRD: %v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return endpoints, nil
 | 
						|
}
 | 
						|
 | 
						|
func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*endpoint.Endpoint) {
 | 
						|
	for _, ep := range endpoints {
 | 
						|
		ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("crd/%s/%s", crd.ObjectMeta.Namespace, crd.ObjectMeta.Name)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) {
 | 
						|
	opts.Watch = true
 | 
						|
	return cs.crdClient.Get().
 | 
						|
		Namespace(cs.namespace).
 | 
						|
		Resource(cs.crdResource).
 | 
						|
		VersionedParams(opts, cs.codec).
 | 
						|
		Watch(ctx)
 | 
						|
}
 | 
						|
 | 
						|
func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) {
 | 
						|
	result = &endpoint.DNSEndpointList{}
 | 
						|
	err = cs.crdClient.Get().
 | 
						|
		Namespace(cs.namespace).
 | 
						|
		Resource(cs.crdResource).
 | 
						|
		VersionedParams(opts, cs.codec).
 | 
						|
		Do(ctx).
 | 
						|
		Into(result)
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (cs *crdSource) UpdateStatus(ctx context.Context, dnsEndpoint *endpoint.DNSEndpoint) (result *endpoint.DNSEndpoint, err error) {
 | 
						|
	result = &endpoint.DNSEndpoint{}
 | 
						|
	err = cs.crdClient.Put().
 | 
						|
		Namespace(dnsEndpoint.Namespace).
 | 
						|
		Resource(cs.crdResource).
 | 
						|
		Name(dnsEndpoint.Name).
 | 
						|
		SubResource("status").
 | 
						|
		Body(dnsEndpoint).
 | 
						|
		Do(ctx).
 | 
						|
		Into(result)
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// filterByAnnotations filters a list of dnsendpoints by a given annotation selector.
 | 
						|
func (cs *crdSource) filterByAnnotations(dnsendpoints *endpoint.DNSEndpointList) (*endpoint.DNSEndpointList, error) {
 | 
						|
	labelSelector, err := metav1.ParseToLabelSelector(cs.annotationFilter)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	selector, err := metav1.LabelSelectorAsSelector(labelSelector)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// empty filter returns original list
 | 
						|
	if selector.Empty() {
 | 
						|
		return dnsendpoints, nil
 | 
						|
	}
 | 
						|
 | 
						|
	filteredList := endpoint.DNSEndpointList{}
 | 
						|
 | 
						|
	for _, dnsendpoint := range dnsendpoints.Items {
 | 
						|
		// convert the dnsendpoint' annotations to an equivalent label selector
 | 
						|
		annotations := labels.Set(dnsendpoint.Annotations)
 | 
						|
 | 
						|
		// include dnsendpoint if its annotations match the selector
 | 
						|
		if selector.Matches(annotations) {
 | 
						|
			filteredList.Items = append(filteredList.Items, dnsendpoint)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return &filteredList, nil
 | 
						|
}
 |