mirror of
				https://github.com/kubernetes-sigs/external-dns.git
				synced 2025-10-31 18:50:59 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			440 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			440 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2021 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package source
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sort"
 | |
| 
 | |
| 	"github.com/pkg/errors"
 | |
| 	log "github.com/sirupsen/logrus"
 | |
| 	corev1 "k8s.io/api/core/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/runtime/schema"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	"k8s.io/client-go/dynamic"
 | |
| 	"k8s.io/client-go/dynamic/dynamicinformer"
 | |
| 	"k8s.io/client-go/informers"
 | |
| 	"k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/kubernetes/scheme"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 
 | |
| 	"sigs.k8s.io/external-dns/endpoint"
 | |
| )
 | |
| 
 | |
| var kongGroupdVersionResource = schema.GroupVersionResource{
 | |
| 	Group:    "configuration.konghq.com",
 | |
| 	Version:  "v1beta1",
 | |
| 	Resource: "tcpingresses",
 | |
| }
 | |
| 
 | |
| // kongTCPIngressSource is an implementation of Source for Kong TCPIngress objects.
 | |
| type kongTCPIngressSource struct {
 | |
| 	annotationFilter       string
 | |
| 	dynamicKubeClient      dynamic.Interface
 | |
| 	kongTCPIngressInformer informers.GenericInformer
 | |
| 	kubeClient             kubernetes.Interface
 | |
| 	namespace              string
 | |
| 	unstructuredConverter  *unstructuredConverter
 | |
| }
 | |
| 
 | |
| // NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config.
 | |
| func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) {
 | |
| 	var err error
 | |
| 
 | |
| 	// Use shared informer to listen for add/update/delete of Host in the specified namespace.
 | |
| 	// Set resync period to 0, to prevent processing when nothing has changed.
 | |
| 	informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
 | |
| 	kongTCPIngressInformer := informerFactory.ForResource(kongGroupdVersionResource)
 | |
| 
 | |
| 	// Add default resource event handlers to properly initialize informer.
 | |
| 	kongTCPIngressInformer.Informer().AddEventHandler(
 | |
| 		cache.ResourceEventHandlerFuncs{
 | |
| 			AddFunc: func(obj interface{}) {
 | |
| 			},
 | |
| 		},
 | |
| 	)
 | |
| 
 | |
| 	// TODO informer is not explicitly stopped since controller is not passing in its channel.
 | |
| 	informerFactory.Start(wait.NeverStop)
 | |
| 
 | |
| 	// wait for the local cache to be populated.
 | |
| 	if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	uc, err := newKongUnstructuredConverter()
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Wrapf(err, "failed to setup Unstructured Converter")
 | |
| 	}
 | |
| 
 | |
| 	return &kongTCPIngressSource{
 | |
| 		annotationFilter:       annotationFilter,
 | |
| 		dynamicKubeClient:      dynamicKubeClient,
 | |
| 		kongTCPIngressInformer: kongTCPIngressInformer,
 | |
| 		kubeClient:             kubeClient,
 | |
| 		namespace:              namespace,
 | |
| 		unstructuredConverter:  uc,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // Endpoints returns endpoint objects for each host-target combination that should be processed.
 | |
| // Retrieves all TCPIngresses in the source's namespace(s).
 | |
| func (sc *kongTCPIngressSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
 | |
| 	tis, err := sc.kongTCPIngressInformer.Lister().ByNamespace(sc.namespace).List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var tcpIngresses []*TCPIngress
 | |
| 	for _, tcpIngressObj := range tis {
 | |
| 		unstructuredHost, ok := tcpIngressObj.(*unstructured.Unstructured)
 | |
| 		if !ok {
 | |
| 			return nil, errors.New("could not convert")
 | |
| 		}
 | |
| 
 | |
| 		tcpIngress := &TCPIngress{}
 | |
| 		err := sc.unstructuredConverter.scheme.Convert(unstructuredHost, tcpIngress, nil)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		tcpIngresses = append(tcpIngresses, tcpIngress)
 | |
| 	}
 | |
| 
 | |
| 	tcpIngresses, err = sc.filterByAnnotations(tcpIngresses)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Wrap(err, "failed to filter TCPIngresses")
 | |
| 	}
 | |
| 
 | |
| 	var endpoints []*endpoint.Endpoint
 | |
| 	for _, tcpIngress := range tcpIngresses {
 | |
| 		var targets endpoint.Targets
 | |
| 		for _, lb := range tcpIngress.Status.LoadBalancer.Ingress {
 | |
| 			if lb.IP != "" {
 | |
| 				targets = append(targets, lb.IP)
 | |
| 			}
 | |
| 			if lb.Hostname != "" {
 | |
| 				targets = append(targets, lb.Hostname)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		fullname := fmt.Sprintf("%s/%s", tcpIngress.Namespace, tcpIngress.Name)
 | |
| 
 | |
| 		ingressEndpoints, err := sc.endpointsFromTCPIngress(tcpIngress, targets)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if len(ingressEndpoints) == 0 {
 | |
| 			log.Debugf("No endpoints could be generated from Host %s", fullname)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		log.Debugf("Endpoints generated from TCPIngress: %s: %v", fullname, ingressEndpoints)
 | |
| 		sc.setResourceLabel(tcpIngress, ingressEndpoints)
 | |
| 		sc.setDualstackLabel(tcpIngress, ingressEndpoints)
 | |
| 		endpoints = append(endpoints, ingressEndpoints...)
 | |
| 	}
 | |
| 
 | |
| 	for _, ep := range endpoints {
 | |
| 		sort.Sort(ep.Targets)
 | |
| 	}
 | |
| 
 | |
| 	return endpoints, nil
 | |
| }
 | |
| 
 | |
| // filterByAnnotations filters a list of TCPIngresses by a given annotation selector.
 | |
| func (sc *kongTCPIngressSource) filterByAnnotations(tcpIngresses []*TCPIngress) ([]*TCPIngress, error) {
 | |
| 	labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	selector, err := metav1.LabelSelectorAsSelector(labelSelector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// empty filter returns original list
 | |
| 	if selector.Empty() {
 | |
| 		return tcpIngresses, nil
 | |
| 	}
 | |
| 
 | |
| 	filteredList := []*TCPIngress{}
 | |
| 
 | |
| 	for _, tcpIngress := range tcpIngresses {
 | |
| 		// convert the TCPIngress's annotations to an equivalent label selector
 | |
| 		annotations := labels.Set(tcpIngress.Annotations)
 | |
| 
 | |
| 		// include TCPIngress if its annotations match the selector
 | |
| 		if selector.Matches(annotations) {
 | |
| 			filteredList = append(filteredList, tcpIngress)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return filteredList, nil
 | |
| }
 | |
| 
 | |
| func (sc *kongTCPIngressSource) setResourceLabel(tcpIngress *TCPIngress, endpoints []*endpoint.Endpoint) {
 | |
| 	for _, ep := range endpoints {
 | |
| 		ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("tcpingress/%s/%s", tcpIngress.Namespace, tcpIngress.Name)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (sc *kongTCPIngressSource) setDualstackLabel(tcpIngress *TCPIngress, endpoints []*endpoint.Endpoint) {
 | |
| 	val, ok := tcpIngress.Annotations[ALBDualstackAnnotationKey]
 | |
| 	if ok && val == ALBDualstackAnnotationValue {
 | |
| 		log.Debugf("Adding dualstack label to TCPIngress %s/%s.", tcpIngress.Namespace, tcpIngress.Name)
 | |
| 		for _, ep := range endpoints {
 | |
| 			ep.Labels[endpoint.DualstackLabelKey] = "true"
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // endpointsFromTCPIngress extracts the endpoints from a TCPIngress object
 | |
| func (sc *kongTCPIngressSource) endpointsFromTCPIngress(tcpIngress *TCPIngress, targets endpoint.Targets) ([]*endpoint.Endpoint, error) {
 | |
| 	var endpoints []*endpoint.Endpoint
 | |
| 
 | |
| 	providerSpecific, setIdentifier := getProviderSpecificAnnotations(tcpIngress.Annotations)
 | |
| 
 | |
| 	ttl, err := getTTLFromAnnotations(tcpIngress.Annotations)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	hostnameList := getHostnamesFromAnnotations(tcpIngress.Annotations)
 | |
| 	for _, hostname := range hostnameList {
 | |
| 		endpoints = append(endpoints, endpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier)...)
 | |
| 	}
 | |
| 
 | |
| 	if tcpIngress.Spec.Rules != nil {
 | |
| 		for _, rule := range tcpIngress.Spec.Rules {
 | |
| 			if rule.Host != "" {
 | |
| 				endpoints = append(endpoints, endpointsForHostname(rule.Host, targets, ttl, providerSpecific, setIdentifier)...)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return endpoints, nil
 | |
| }
 | |
| 
 | |
| func (sc *kongTCPIngressSource) AddEventHandler(ctx context.Context, handler func()) {
 | |
| 	log.Debug("Adding event handler for TCPIngress")
 | |
| 
 | |
| 	// Right now there is no way to remove event handler from informer, see:
 | |
| 	// https://github.com/kubernetes/kubernetes/issues/79610
 | |
| 	sc.kongTCPIngressInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
 | |
| }
 | |
| 
 | |
| // newUnstructuredConverter returns a new unstructuredConverter initialized
 | |
| func newKongUnstructuredConverter() (*unstructuredConverter, error) {
 | |
| 	uc := &unstructuredConverter{
 | |
| 		scheme: runtime.NewScheme(),
 | |
| 	}
 | |
| 
 | |
| 	// Add the core types we need
 | |
| 	uc.scheme.AddKnownTypes(kongGroupdVersionResource.GroupVersion(), &TCPIngress{}, &TCPIngressList{})
 | |
| 	if err := scheme.AddToScheme(uc.scheme); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return uc, nil
 | |
| }
 | |
| 
 | |
| //Kong types based on https://github.com/Kong/kubernetes-ingress-controller/blob/v1.2.0/pkg/apis/configuration/v1beta1/types.go to facilitate testing
 | |
| //When trying to import them from the Kong repo as a dependency it required upgrading the k8s.io/client-go and k8s.io/apimachinery which seemed
 | |
| //cause several changes in how the mock clients were working that resulted in a bunch of failures in other tests
 | |
| //If that is dealt with at some point the below can be removed and replaced with an actual import
 | |
| 
 | |
| type TCPIngress struct {
 | |
| 	metav1.TypeMeta   `json:",inline"`
 | |
| 	metav1.ObjectMeta `json:"metadata,omitempty"`
 | |
| 
 | |
| 	Spec   tcpIngressSpec   `json:"spec,omitempty"`
 | |
| 	Status tcpIngressStatus `json:"status,omitempty"`
 | |
| }
 | |
| 
 | |
| type TCPIngressList struct {
 | |
| 	metav1.TypeMeta `json:",inline"`
 | |
| 	metav1.ListMeta `json:"metadata,omitempty"`
 | |
| 	Items           []TCPIngress `json:"items"`
 | |
| }
 | |
| 
 | |
| type tcpIngressSpec struct {
 | |
| 	Rules []tcpIngressRule `json:"rules,omitempty"`
 | |
| 	TLS   []tcpIngressTLS  `json:"tls,omitempty"`
 | |
| }
 | |
| 
 | |
| type tcpIngressTLS struct {
 | |
| 	Hosts      []string `json:"hosts,omitempty"`
 | |
| 	SecretName string   `json:"secretName,omitempty"`
 | |
| }
 | |
| 
 | |
| type tcpIngressStatus struct {
 | |
| 	LoadBalancer corev1.LoadBalancerStatus `json:"loadBalancer,omitempty"`
 | |
| }
 | |
| 
 | |
| type tcpIngressRule struct {
 | |
| 	Host    string            `json:"host,omitempty"`
 | |
| 	Port    int               `json:"port,omitempty"`
 | |
| 	Backend tcpIngressBackend `json:"backend"`
 | |
| }
 | |
| 
 | |
| type tcpIngressBackend struct {
 | |
| 	ServiceName string `json:"serviceName"`
 | |
| 	ServicePort int    `json:"servicePort"`
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressBackend) DeepCopyInto(out *tcpIngressBackend) {
 | |
| 	*out = *in
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressBackend) DeepCopy() *tcpIngressBackend {
 | |
| 	if in == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	out := new(tcpIngressBackend)
 | |
| 	in.DeepCopyInto(out)
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressRule) DeepCopyInto(out *tcpIngressRule) {
 | |
| 	*out = *in
 | |
| 	out.Backend = in.Backend
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressRule) DeepCopy() *tcpIngressRule {
 | |
| 	if in == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	out := new(tcpIngressRule)
 | |
| 	in.DeepCopyInto(out)
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressSpec) DeepCopyInto(out *tcpIngressSpec) {
 | |
| 	*out = *in
 | |
| 	if in.Rules != nil {
 | |
| 		in, out := &in.Rules, &out.Rules
 | |
| 		*out = make([]tcpIngressRule, len(*in))
 | |
| 		copy(*out, *in)
 | |
| 	}
 | |
| 	if in.TLS != nil {
 | |
| 		in, out := &in.TLS, &out.TLS
 | |
| 		*out = make([]tcpIngressTLS, len(*in))
 | |
| 		for i := range *in {
 | |
| 			(*in)[i].DeepCopyInto(&(*out)[i])
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressSpec) DeepCopy() *tcpIngressSpec {
 | |
| 	if in == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	out := new(tcpIngressSpec)
 | |
| 	in.DeepCopyInto(out)
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressStatus) DeepCopyInto(out *tcpIngressStatus) {
 | |
| 	*out = *in
 | |
| 	in.LoadBalancer.DeepCopyInto(&out.LoadBalancer)
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressStatus) DeepCopy() *tcpIngressStatus {
 | |
| 	if in == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	out := new(tcpIngressStatus)
 | |
| 	in.DeepCopyInto(out)
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressTLS) DeepCopyInto(out *tcpIngressTLS) {
 | |
| 	*out = *in
 | |
| 	if in.Hosts != nil {
 | |
| 		in, out := &in.Hosts, &out.Hosts
 | |
| 		*out = make([]string, len(*in))
 | |
| 		copy(*out, *in)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (in *tcpIngressTLS) DeepCopy() *tcpIngressTLS {
 | |
| 	if in == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	out := new(tcpIngressTLS)
 | |
| 	in.DeepCopyInto(out)
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| func (in *TCPIngress) DeepCopyInto(out *TCPIngress) {
 | |
| 	*out = *in
 | |
| 	out.TypeMeta = in.TypeMeta
 | |
| 	in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
 | |
| 	in.Spec.DeepCopyInto(&out.Spec)
 | |
| 	in.Status.DeepCopyInto(&out.Status)
 | |
| }
 | |
| 
 | |
| func (in *TCPIngress) DeepCopy() *TCPIngress {
 | |
| 	if in == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	out := new(TCPIngress)
 | |
| 	in.DeepCopyInto(out)
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| func (in *TCPIngress) DeepCopyObject() runtime.Object {
 | |
| 	if c := in.DeepCopy(); c != nil {
 | |
| 		return c
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (in *TCPIngressList) DeepCopyInto(out *TCPIngressList) {
 | |
| 	*out = *in
 | |
| 	out.TypeMeta = in.TypeMeta
 | |
| 	in.ListMeta.DeepCopyInto(&out.ListMeta)
 | |
| 	if in.Items != nil {
 | |
| 		in, out := &in.Items, &out.Items
 | |
| 		*out = make([]TCPIngress, len(*in))
 | |
| 		for i := range *in {
 | |
| 			(*in)[i].DeepCopyInto(&(*out)[i])
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (in *TCPIngressList) DeepCopy() *TCPIngressList {
 | |
| 	if in == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	out := new(TCPIngressList)
 | |
| 	in.DeepCopyInto(out)
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| func (in *TCPIngressList) DeepCopyObject() runtime.Object {
 | |
| 	if c := in.DeepCopy(); c != nil {
 | |
| 		return c
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |