external-dns/source/kong_tcpingress.go
Rich 7c23e01e31
feat(aws): always create AAAA alias records in route53 (#5111)
* First pass based on existing PR, what is currently on master and some
extra tests.

* Try to resolve AWS service documentation

* Add documentation on how to opt-out of AAAA record creation

* Address documentation concerns

* Add some IPv6 tests to sources

* Make recommended changes to documentation
2025-03-03 00:21:00 -08:00

424 lines
12 KiB
Go

/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package source
import (
"context"
"fmt"
"sort"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint"
)
var kongGroupdVersionResource = schema.GroupVersionResource{
Group: "configuration.konghq.com",
Version: "v1beta1",
Resource: "tcpingresses",
}
// kongTCPIngressSource is an implementation of Source for Kong TCPIngress objects.
type kongTCPIngressSource struct {
annotationFilter string
ignoreHostnameAnnotation bool
dynamicKubeClient dynamic.Interface
kongTCPIngressInformer informers.GenericInformer
kubeClient kubernetes.Interface
namespace string
unstructuredConverter *unstructuredConverter
}
// NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config.
func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string, ignoreHostnameAnnotation bool) (Source, error) {
var err error
// Use shared informer to listen for add/update/delete of Host in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed.
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
kongTCPIngressInformer := informerFactory.ForResource(kongGroupdVersionResource)
// Add default resource event handlers to properly initialize informer.
kongTCPIngressInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
informerFactory.Start(ctx.Done())
// wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err
}
uc, err := newKongUnstructuredConverter()
if err != nil {
return nil, errors.Wrapf(err, "failed to setup Unstructured Converter")
}
return &kongTCPIngressSource{
annotationFilter: annotationFilter,
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
dynamicKubeClient: dynamicKubeClient,
kongTCPIngressInformer: kongTCPIngressInformer,
kubeClient: kubeClient,
namespace: namespace,
unstructuredConverter: uc,
}, nil
}
// Endpoints returns endpoint objects for each host-target combination that should be processed.
// Retrieves all TCPIngresses in the source's namespace(s).
func (sc *kongTCPIngressSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
tis, err := sc.kongTCPIngressInformer.Lister().ByNamespace(sc.namespace).List(labels.Everything())
if err != nil {
return nil, err
}
var tcpIngresses []*TCPIngress
for _, tcpIngressObj := range tis {
unstructuredHost, ok := tcpIngressObj.(*unstructured.Unstructured)
if !ok {
return nil, errors.New("could not convert")
}
tcpIngress := &TCPIngress{}
err := sc.unstructuredConverter.scheme.Convert(unstructuredHost, tcpIngress, nil)
if err != nil {
return nil, err
}
tcpIngresses = append(tcpIngresses, tcpIngress)
}
tcpIngresses, err = sc.filterByAnnotations(tcpIngresses)
if err != nil {
return nil, errors.Wrap(err, "failed to filter TCPIngresses")
}
var endpoints []*endpoint.Endpoint
for _, tcpIngress := range tcpIngresses {
targets := getTargetsFromTargetAnnotation(tcpIngress.Annotations)
if len(targets) == 0 {
for _, lb := range tcpIngress.Status.LoadBalancer.Ingress {
if lb.IP != "" {
targets = append(targets, lb.IP)
}
if lb.Hostname != "" {
targets = append(targets, lb.Hostname)
}
}
}
fullname := fmt.Sprintf("%s/%s", tcpIngress.Namespace, tcpIngress.Name)
ingressEndpoints, err := sc.endpointsFromTCPIngress(tcpIngress, targets)
if err != nil {
return nil, err
}
if len(ingressEndpoints) == 0 {
log.Debugf("No endpoints could be generated from Host %s", fullname)
continue
}
log.Debugf("Endpoints generated from TCPIngress: %s: %v", fullname, ingressEndpoints)
endpoints = append(endpoints, ingressEndpoints...)
}
for _, ep := range endpoints {
sort.Sort(ep.Targets)
}
return endpoints, nil
}
// filterByAnnotations filters a list of TCPIngresses by a given annotation selector.
func (sc *kongTCPIngressSource) filterByAnnotations(tcpIngresses []*TCPIngress) ([]*TCPIngress, error) {
labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter)
if err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return tcpIngresses, nil
}
filteredList := []*TCPIngress{}
for _, tcpIngress := range tcpIngresses {
// convert the TCPIngress's annotations to an equivalent label selector
annotations := labels.Set(tcpIngress.Annotations)
// include TCPIngress if its annotations match the selector
if selector.Matches(annotations) {
filteredList = append(filteredList, tcpIngress)
}
}
return filteredList, nil
}
// endpointsFromTCPIngress extracts the endpoints from a TCPIngress object
func (sc *kongTCPIngressSource) endpointsFromTCPIngress(tcpIngress *TCPIngress, targets endpoint.Targets) ([]*endpoint.Endpoint, error) {
var endpoints []*endpoint.Endpoint
resource := fmt.Sprintf("tcpingress/%s/%s", tcpIngress.Namespace, tcpIngress.Name)
ttl := getTTLFromAnnotations(tcpIngress.Annotations, resource)
providerSpecific, setIdentifier := getProviderSpecificAnnotations(tcpIngress.Annotations)
if !sc.ignoreHostnameAnnotation {
hostnameList := getHostnamesFromAnnotations(tcpIngress.Annotations)
for _, hostname := range hostnameList {
endpoints = append(endpoints, endpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier, resource)...)
}
}
if tcpIngress.Spec.Rules != nil {
for _, rule := range tcpIngress.Spec.Rules {
if rule.Host != "" {
endpoints = append(endpoints, endpointsForHostname(rule.Host, targets, ttl, providerSpecific, setIdentifier, resource)...)
}
}
}
return endpoints, nil
}
func (sc *kongTCPIngressSource) AddEventHandler(ctx context.Context, handler func()) {
log.Debug("Adding event handler for TCPIngress")
// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
sc.kongTCPIngressInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
// newUnstructuredConverter returns a new unstructuredConverter initialized
func newKongUnstructuredConverter() (*unstructuredConverter, error) {
uc := &unstructuredConverter{
scheme: runtime.NewScheme(),
}
// Add the core types we need
uc.scheme.AddKnownTypes(kongGroupdVersionResource.GroupVersion(), &TCPIngress{}, &TCPIngressList{})
if err := scheme.AddToScheme(uc.scheme); err != nil {
return nil, err
}
return uc, nil
}
// Kong types based on https://github.com/Kong/kubernetes-ingress-controller/blob/v1.2.0/pkg/apis/configuration/v1beta1/types.go to facilitate testing
// When trying to import them from the Kong repo as a dependency it required upgrading the k8s.io/client-go and k8s.io/apimachinery which seemed
// cause several changes in how the mock clients were working that resulted in a bunch of failures in other tests
// If that is dealt with at some point the below can be removed and replaced with an actual import
type TCPIngress struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec tcpIngressSpec `json:"spec,omitempty"`
Status tcpIngressStatus `json:"status,omitempty"`
}
type TCPIngressList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []TCPIngress `json:"items"`
}
type tcpIngressSpec struct {
Rules []tcpIngressRule `json:"rules,omitempty"`
TLS []tcpIngressTLS `json:"tls,omitempty"`
}
type tcpIngressTLS struct {
Hosts []string `json:"hosts,omitempty"`
SecretName string `json:"secretName,omitempty"`
}
type tcpIngressStatus struct {
LoadBalancer corev1.LoadBalancerStatus `json:"loadBalancer,omitempty"`
}
type tcpIngressRule struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
Backend tcpIngressBackend `json:"backend"`
}
type tcpIngressBackend struct {
ServiceName string `json:"serviceName"`
ServicePort int `json:"servicePort"`
}
func (in *tcpIngressBackend) DeepCopyInto(out *tcpIngressBackend) {
*out = *in
}
func (in *tcpIngressBackend) DeepCopy() *tcpIngressBackend {
if in == nil {
return nil
}
out := new(tcpIngressBackend)
in.DeepCopyInto(out)
return out
}
func (in *tcpIngressRule) DeepCopyInto(out *tcpIngressRule) {
*out = *in
out.Backend = in.Backend
}
func (in *tcpIngressRule) DeepCopy() *tcpIngressRule {
if in == nil {
return nil
}
out := new(tcpIngressRule)
in.DeepCopyInto(out)
return out
}
func (in *tcpIngressSpec) DeepCopyInto(out *tcpIngressSpec) {
*out = *in
if in.Rules != nil {
in, out := &in.Rules, &out.Rules
*out = make([]tcpIngressRule, len(*in))
copy(*out, *in)
}
if in.TLS != nil {
in, out := &in.TLS, &out.TLS
*out = make([]tcpIngressTLS, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
func (in *tcpIngressSpec) DeepCopy() *tcpIngressSpec {
if in == nil {
return nil
}
out := new(tcpIngressSpec)
in.DeepCopyInto(out)
return out
}
func (in *tcpIngressStatus) DeepCopyInto(out *tcpIngressStatus) {
*out = *in
in.LoadBalancer.DeepCopyInto(&out.LoadBalancer)
}
func (in *tcpIngressStatus) DeepCopy() *tcpIngressStatus {
if in == nil {
return nil
}
out := new(tcpIngressStatus)
in.DeepCopyInto(out)
return out
}
func (in *tcpIngressTLS) DeepCopyInto(out *tcpIngressTLS) {
*out = *in
if in.Hosts != nil {
in, out := &in.Hosts, &out.Hosts
*out = make([]string, len(*in))
copy(*out, *in)
}
}
func (in *tcpIngressTLS) DeepCopy() *tcpIngressTLS {
if in == nil {
return nil
}
out := new(tcpIngressTLS)
in.DeepCopyInto(out)
return out
}
func (in *TCPIngress) DeepCopyInto(out *TCPIngress) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
}
func (in *TCPIngress) DeepCopy() *TCPIngress {
if in == nil {
return nil
}
out := new(TCPIngress)
in.DeepCopyInto(out)
return out
}
func (in *TCPIngress) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
func (in *TCPIngressList) DeepCopyInto(out *TCPIngressList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]TCPIngress, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
func (in *TCPIngressList) DeepCopy() *TCPIngressList {
if in == nil {
return nil
}
out := new(TCPIngressList)
in.DeepCopyInto(out)
return out
}
func (in *TCPIngressList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}