mirror of
				https://github.com/kubernetes-sigs/external-dns.git
				synced 2025-11-04 04:31:00 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			438 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			438 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2021 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package source
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"sort"
 | 
						|
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	log "github.com/sirupsen/logrus"
 | 
						|
	corev1 "k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
						|
	"k8s.io/apimachinery/pkg/labels"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						|
	"k8s.io/client-go/dynamic"
 | 
						|
	"k8s.io/client-go/dynamic/dynamicinformer"
 | 
						|
	"k8s.io/client-go/informers"
 | 
						|
	"k8s.io/client-go/kubernetes"
 | 
						|
	"k8s.io/client-go/kubernetes/scheme"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
 | 
						|
	"sigs.k8s.io/external-dns/endpoint"
 | 
						|
)
 | 
						|
 | 
						|
var kongGroupdVersionResource = schema.GroupVersionResource{
 | 
						|
	Group:    "configuration.konghq.com",
 | 
						|
	Version:  "v1beta1",
 | 
						|
	Resource: "tcpingresses",
 | 
						|
}
 | 
						|
 | 
						|
// kongTCPIngressSource is an implementation of Source for Kong TCPIngress objects.
 | 
						|
type kongTCPIngressSource struct {
 | 
						|
	annotationFilter       string
 | 
						|
	dynamicKubeClient      dynamic.Interface
 | 
						|
	kongTCPIngressInformer informers.GenericInformer
 | 
						|
	kubeClient             kubernetes.Interface
 | 
						|
	namespace              string
 | 
						|
	unstructuredConverter  *unstructuredConverter
 | 
						|
}
 | 
						|
 | 
						|
// NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config.
 | 
						|
func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) {
 | 
						|
	var err error
 | 
						|
 | 
						|
	// Use shared informer to listen for add/update/delete of Host in the specified namespace.
 | 
						|
	// Set resync period to 0, to prevent processing when nothing has changed.
 | 
						|
	informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
 | 
						|
	kongTCPIngressInformer := informerFactory.ForResource(kongGroupdVersionResource)
 | 
						|
 | 
						|
	// Add default resource event handlers to properly initialize informer.
 | 
						|
	kongTCPIngressInformer.Informer().AddEventHandler(
 | 
						|
		cache.ResourceEventHandlerFuncs{
 | 
						|
			AddFunc: func(obj interface{}) {
 | 
						|
			},
 | 
						|
		},
 | 
						|
	)
 | 
						|
 | 
						|
	informerFactory.Start(ctx.Done())
 | 
						|
 | 
						|
	// wait for the local cache to be populated.
 | 
						|
	if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	uc, err := newKongUnstructuredConverter()
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Wrapf(err, "failed to setup Unstructured Converter")
 | 
						|
	}
 | 
						|
 | 
						|
	return &kongTCPIngressSource{
 | 
						|
		annotationFilter:       annotationFilter,
 | 
						|
		dynamicKubeClient:      dynamicKubeClient,
 | 
						|
		kongTCPIngressInformer: kongTCPIngressInformer,
 | 
						|
		kubeClient:             kubeClient,
 | 
						|
		namespace:              namespace,
 | 
						|
		unstructuredConverter:  uc,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Endpoints returns endpoint objects for each host-target combination that should be processed.
 | 
						|
// Retrieves all TCPIngresses in the source's namespace(s).
 | 
						|
func (sc *kongTCPIngressSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
 | 
						|
	tis, err := sc.kongTCPIngressInformer.Lister().ByNamespace(sc.namespace).List(labels.Everything())
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	var tcpIngresses []*TCPIngress
 | 
						|
	for _, tcpIngressObj := range tis {
 | 
						|
		unstructuredHost, ok := tcpIngressObj.(*unstructured.Unstructured)
 | 
						|
		if !ok {
 | 
						|
			return nil, errors.New("could not convert")
 | 
						|
		}
 | 
						|
 | 
						|
		tcpIngress := &TCPIngress{}
 | 
						|
		err := sc.unstructuredConverter.scheme.Convert(unstructuredHost, tcpIngress, nil)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		tcpIngresses = append(tcpIngresses, tcpIngress)
 | 
						|
	}
 | 
						|
 | 
						|
	tcpIngresses, err = sc.filterByAnnotations(tcpIngresses)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Wrap(err, "failed to filter TCPIngresses")
 | 
						|
	}
 | 
						|
 | 
						|
	var endpoints []*endpoint.Endpoint
 | 
						|
	for _, tcpIngress := range tcpIngresses {
 | 
						|
		var targets endpoint.Targets
 | 
						|
		for _, lb := range tcpIngress.Status.LoadBalancer.Ingress {
 | 
						|
			if lb.IP != "" {
 | 
						|
				targets = append(targets, lb.IP)
 | 
						|
			}
 | 
						|
			if lb.Hostname != "" {
 | 
						|
				targets = append(targets, lb.Hostname)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		fullname := fmt.Sprintf("%s/%s", tcpIngress.Namespace, tcpIngress.Name)
 | 
						|
 | 
						|
		ingressEndpoints, err := sc.endpointsFromTCPIngress(tcpIngress, targets)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		if len(ingressEndpoints) == 0 {
 | 
						|
			log.Debugf("No endpoints could be generated from Host %s", fullname)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		log.Debugf("Endpoints generated from TCPIngress: %s: %v", fullname, ingressEndpoints)
 | 
						|
		sc.setResourceLabel(tcpIngress, ingressEndpoints)
 | 
						|
		sc.setDualstackLabel(tcpIngress, ingressEndpoints)
 | 
						|
		endpoints = append(endpoints, ingressEndpoints...)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, ep := range endpoints {
 | 
						|
		sort.Sort(ep.Targets)
 | 
						|
	}
 | 
						|
 | 
						|
	return endpoints, nil
 | 
						|
}
 | 
						|
 | 
						|
// filterByAnnotations filters a list of TCPIngresses by a given annotation selector.
 | 
						|
func (sc *kongTCPIngressSource) filterByAnnotations(tcpIngresses []*TCPIngress) ([]*TCPIngress, error) {
 | 
						|
	labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	selector, err := metav1.LabelSelectorAsSelector(labelSelector)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// empty filter returns original list
 | 
						|
	if selector.Empty() {
 | 
						|
		return tcpIngresses, nil
 | 
						|
	}
 | 
						|
 | 
						|
	filteredList := []*TCPIngress{}
 | 
						|
 | 
						|
	for _, tcpIngress := range tcpIngresses {
 | 
						|
		// convert the TCPIngress's annotations to an equivalent label selector
 | 
						|
		annotations := labels.Set(tcpIngress.Annotations)
 | 
						|
 | 
						|
		// include TCPIngress if its annotations match the selector
 | 
						|
		if selector.Matches(annotations) {
 | 
						|
			filteredList = append(filteredList, tcpIngress)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return filteredList, nil
 | 
						|
}
 | 
						|
 | 
						|
func (sc *kongTCPIngressSource) setResourceLabel(tcpIngress *TCPIngress, endpoints []*endpoint.Endpoint) {
 | 
						|
	for _, ep := range endpoints {
 | 
						|
		ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("tcpingress/%s/%s", tcpIngress.Namespace, tcpIngress.Name)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (sc *kongTCPIngressSource) setDualstackLabel(tcpIngress *TCPIngress, endpoints []*endpoint.Endpoint) {
 | 
						|
	val, ok := tcpIngress.Annotations[ALBDualstackAnnotationKey]
 | 
						|
	if ok && val == ALBDualstackAnnotationValue {
 | 
						|
		log.Debugf("Adding dualstack label to TCPIngress %s/%s.", tcpIngress.Namespace, tcpIngress.Name)
 | 
						|
		for _, ep := range endpoints {
 | 
						|
			ep.Labels[endpoint.DualstackLabelKey] = "true"
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// endpointsFromTCPIngress extracts the endpoints from a TCPIngress object
 | 
						|
func (sc *kongTCPIngressSource) endpointsFromTCPIngress(tcpIngress *TCPIngress, targets endpoint.Targets) ([]*endpoint.Endpoint, error) {
 | 
						|
	var endpoints []*endpoint.Endpoint
 | 
						|
 | 
						|
	providerSpecific, setIdentifier := getProviderSpecificAnnotations(tcpIngress.Annotations)
 | 
						|
 | 
						|
	ttl, err := getTTLFromAnnotations(tcpIngress.Annotations)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	hostnameList := getHostnamesFromAnnotations(tcpIngress.Annotations)
 | 
						|
	for _, hostname := range hostnameList {
 | 
						|
		endpoints = append(endpoints, endpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier)...)
 | 
						|
	}
 | 
						|
 | 
						|
	if tcpIngress.Spec.Rules != nil {
 | 
						|
		for _, rule := range tcpIngress.Spec.Rules {
 | 
						|
			if rule.Host != "" {
 | 
						|
				endpoints = append(endpoints, endpointsForHostname(rule.Host, targets, ttl, providerSpecific, setIdentifier)...)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return endpoints, nil
 | 
						|
}
 | 
						|
 | 
						|
func (sc *kongTCPIngressSource) AddEventHandler(ctx context.Context, handler func()) {
 | 
						|
	log.Debug("Adding event handler for TCPIngress")
 | 
						|
 | 
						|
	// Right now there is no way to remove event handler from informer, see:
 | 
						|
	// https://github.com/kubernetes/kubernetes/issues/79610
 | 
						|
	sc.kongTCPIngressInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
 | 
						|
}
 | 
						|
 | 
						|
// newUnstructuredConverter returns a new unstructuredConverter initialized
 | 
						|
func newKongUnstructuredConverter() (*unstructuredConverter, error) {
 | 
						|
	uc := &unstructuredConverter{
 | 
						|
		scheme: runtime.NewScheme(),
 | 
						|
	}
 | 
						|
 | 
						|
	// Add the core types we need
 | 
						|
	uc.scheme.AddKnownTypes(kongGroupdVersionResource.GroupVersion(), &TCPIngress{}, &TCPIngressList{})
 | 
						|
	if err := scheme.AddToScheme(uc.scheme); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return uc, nil
 | 
						|
}
 | 
						|
 | 
						|
//Kong types based on https://github.com/Kong/kubernetes-ingress-controller/blob/v1.2.0/pkg/apis/configuration/v1beta1/types.go to facilitate testing
 | 
						|
//When trying to import them from the Kong repo as a dependency it required upgrading the k8s.io/client-go and k8s.io/apimachinery which seemed
 | 
						|
//cause several changes in how the mock clients were working that resulted in a bunch of failures in other tests
 | 
						|
//If that is dealt with at some point the below can be removed and replaced with an actual import
 | 
						|
 | 
						|
type TCPIngress struct {
 | 
						|
	metav1.TypeMeta   `json:",inline"`
 | 
						|
	metav1.ObjectMeta `json:"metadata,omitempty"`
 | 
						|
 | 
						|
	Spec   tcpIngressSpec   `json:"spec,omitempty"`
 | 
						|
	Status tcpIngressStatus `json:"status,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
type TCPIngressList struct {
 | 
						|
	metav1.TypeMeta `json:",inline"`
 | 
						|
	metav1.ListMeta `json:"metadata,omitempty"`
 | 
						|
	Items           []TCPIngress `json:"items"`
 | 
						|
}
 | 
						|
 | 
						|
type tcpIngressSpec struct {
 | 
						|
	Rules []tcpIngressRule `json:"rules,omitempty"`
 | 
						|
	TLS   []tcpIngressTLS  `json:"tls,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
type tcpIngressTLS struct {
 | 
						|
	Hosts      []string `json:"hosts,omitempty"`
 | 
						|
	SecretName string   `json:"secretName,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
type tcpIngressStatus struct {
 | 
						|
	LoadBalancer corev1.LoadBalancerStatus `json:"loadBalancer,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
type tcpIngressRule struct {
 | 
						|
	Host    string            `json:"host,omitempty"`
 | 
						|
	Port    int               `json:"port,omitempty"`
 | 
						|
	Backend tcpIngressBackend `json:"backend"`
 | 
						|
}
 | 
						|
 | 
						|
type tcpIngressBackend struct {
 | 
						|
	ServiceName string `json:"serviceName"`
 | 
						|
	ServicePort int    `json:"servicePort"`
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressBackend) DeepCopyInto(out *tcpIngressBackend) {
 | 
						|
	*out = *in
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressBackend) DeepCopy() *tcpIngressBackend {
 | 
						|
	if in == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	out := new(tcpIngressBackend)
 | 
						|
	in.DeepCopyInto(out)
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressRule) DeepCopyInto(out *tcpIngressRule) {
 | 
						|
	*out = *in
 | 
						|
	out.Backend = in.Backend
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressRule) DeepCopy() *tcpIngressRule {
 | 
						|
	if in == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	out := new(tcpIngressRule)
 | 
						|
	in.DeepCopyInto(out)
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressSpec) DeepCopyInto(out *tcpIngressSpec) {
 | 
						|
	*out = *in
 | 
						|
	if in.Rules != nil {
 | 
						|
		in, out := &in.Rules, &out.Rules
 | 
						|
		*out = make([]tcpIngressRule, len(*in))
 | 
						|
		copy(*out, *in)
 | 
						|
	}
 | 
						|
	if in.TLS != nil {
 | 
						|
		in, out := &in.TLS, &out.TLS
 | 
						|
		*out = make([]tcpIngressTLS, len(*in))
 | 
						|
		for i := range *in {
 | 
						|
			(*in)[i].DeepCopyInto(&(*out)[i])
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressSpec) DeepCopy() *tcpIngressSpec {
 | 
						|
	if in == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	out := new(tcpIngressSpec)
 | 
						|
	in.DeepCopyInto(out)
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressStatus) DeepCopyInto(out *tcpIngressStatus) {
 | 
						|
	*out = *in
 | 
						|
	in.LoadBalancer.DeepCopyInto(&out.LoadBalancer)
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressStatus) DeepCopy() *tcpIngressStatus {
 | 
						|
	if in == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	out := new(tcpIngressStatus)
 | 
						|
	in.DeepCopyInto(out)
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressTLS) DeepCopyInto(out *tcpIngressTLS) {
 | 
						|
	*out = *in
 | 
						|
	if in.Hosts != nil {
 | 
						|
		in, out := &in.Hosts, &out.Hosts
 | 
						|
		*out = make([]string, len(*in))
 | 
						|
		copy(*out, *in)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (in *tcpIngressTLS) DeepCopy() *tcpIngressTLS {
 | 
						|
	if in == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	out := new(tcpIngressTLS)
 | 
						|
	in.DeepCopyInto(out)
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func (in *TCPIngress) DeepCopyInto(out *TCPIngress) {
 | 
						|
	*out = *in
 | 
						|
	out.TypeMeta = in.TypeMeta
 | 
						|
	in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
 | 
						|
	in.Spec.DeepCopyInto(&out.Spec)
 | 
						|
	in.Status.DeepCopyInto(&out.Status)
 | 
						|
}
 | 
						|
 | 
						|
func (in *TCPIngress) DeepCopy() *TCPIngress {
 | 
						|
	if in == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	out := new(TCPIngress)
 | 
						|
	in.DeepCopyInto(out)
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func (in *TCPIngress) DeepCopyObject() runtime.Object {
 | 
						|
	if c := in.DeepCopy(); c != nil {
 | 
						|
		return c
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (in *TCPIngressList) DeepCopyInto(out *TCPIngressList) {
 | 
						|
	*out = *in
 | 
						|
	out.TypeMeta = in.TypeMeta
 | 
						|
	in.ListMeta.DeepCopyInto(&out.ListMeta)
 | 
						|
	if in.Items != nil {
 | 
						|
		in, out := &in.Items, &out.Items
 | 
						|
		*out = make([]TCPIngress, len(*in))
 | 
						|
		for i := range *in {
 | 
						|
			(*in)[i].DeepCopyInto(&(*out)[i])
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (in *TCPIngressList) DeepCopy() *TCPIngressList {
 | 
						|
	if in == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	out := new(TCPIngressList)
 | 
						|
	in.DeepCopyInto(out)
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func (in *TCPIngressList) DeepCopyObject() runtime.Object {
 | 
						|
	if c := in.DeepCopy(); c != nil {
 | 
						|
		return c
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |