external-dns/source/skipper_routegroup.go
Ivan Ka f1d771815f
feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting (#6322)
* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

---------

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
2026-03-30 02:40:12 +05:30

409 lines
11 KiB
Go

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package source
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/external-dns/source/types"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/template"
)
const (
defaultIdleConnTimeout = 30 * time.Second
// DefaultRoutegroupVersion is the default version for route groups.
DefaultRoutegroupVersion = "zalando.org/v1"
routeGroupListResource = "/apis/%s/routegroups"
routeGroupNamespacedResource = "/apis/%s/namespaces/%s/routegroups"
)
// +externaldns:source:name=skipper-routegroup
// +externaldns:source:category=Ingress Controllers
// +externaldns:source:description=Creates DNS entries from Skipper RouteGroup resources
// +externaldns:source:resources=RouteGroup.zalando.org
// +externaldns:source:filters=annotation
// +externaldns:source:namespace=all,single
// +externaldns:source:fqdn-template=true
// +externaldns:source:provider-specific=true
type routeGroupSource struct {
cli routeGroupListClient
apiServer string
namespace string
apiEndpoint string
annotationFilter string
templateEngine template.Engine
ignoreHostnameAnnotation bool
}
// for testing
type routeGroupListClient interface {
getRouteGroupList(string) (*routeGroupList, error)
}
type routeGroupClient struct {
mu sync.Mutex
quit chan struct{}
client *http.Client
token string
tokenFile string
}
func newRouteGroupClient(token, tokenPath string, timeout time.Duration) *routeGroupClient {
const (
tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
if tokenPath != "" {
tokenPath = tokenFile
}
tr := &http.Transport{
DialContext: (&net.Dialer{
Timeout: timeout,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
TLSHandshakeTimeout: 3 * time.Second,
ResponseHeaderTimeout: timeout,
IdleConnTimeout: defaultIdleConnTimeout,
MaxIdleConns: 5,
MaxIdleConnsPerHost: 5,
}
cli := &routeGroupClient{
client: &http.Client{
Transport: tr,
},
quit: make(chan struct{}),
tokenFile: tokenPath,
token: token,
}
go func() {
for {
select {
case <-time.After(tr.IdleConnTimeout):
tr.CloseIdleConnections()
cli.updateToken()
case <-cli.quit:
return
}
}
}()
// in cluster config, errors are treated as not running in cluster
cli.updateToken()
// cluster internal use custom CA to reach TLS endpoint
rootCA, err := os.ReadFile(rootCAFile)
if err != nil {
return cli
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(rootCA) {
return cli
}
tr.TLSClientConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: certPool,
}
return cli
}
func (cli *routeGroupClient) updateToken() {
if cli.tokenFile == "" {
return
}
token, err := os.ReadFile(cli.tokenFile)
if err != nil {
log.Errorf("Failed to read token from file (%s): %v", cli.tokenFile, err)
return
}
cli.mu.Lock()
cli.token = string(token)
cli.mu.Unlock()
}
func (cli *routeGroupClient) getToken() string {
cli.mu.Lock()
defer cli.mu.Unlock()
return cli.token
}
func (cli *routeGroupClient) getRouteGroupList(url string) (*routeGroupList, error) {
resp, err := cli.get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to get routegroup list from %s, got: %s", url, resp.Status)
}
var rgs routeGroupList
err = json.NewDecoder(resp.Body).Decode(&rgs)
if err != nil {
return nil, err
}
return &rgs, nil
}
func (cli *routeGroupClient) get(url string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
return cli.do(req)
}
func (cli *routeGroupClient) do(req *http.Request) (*http.Response, error) {
if tok := cli.getToken(); tok != "" && req.Header.Get("Authorization") == "" {
req.Header.Set("Authorization", "Bearer "+tok)
}
return cli.client.Do(req)
}
// NewRouteGroupSource creates a new routeGroupSource with the given config.
func NewRouteGroupSource(cfg *Config, token, tokenPath, apiServerURL string) (Source, error) {
routeGroupVersion := cfg.SkipperRouteGroupVersion
if routeGroupVersion == "" {
routeGroupVersion = DefaultRoutegroupVersion
}
cli := newRouteGroupClient(token, tokenPath, cfg.KubeAPIRequestTimeout)
u, err := url.Parse(apiServerURL)
if err != nil {
return nil, err
}
apiServer := u.String()
// strip port if well known port, because of TLS certificate match
if u.Scheme == "https" && u.Port() == "443" {
// correctly handle IPv6 addresses by keeping surrounding `[]`.
apiServer = "https://" + strings.TrimSuffix(u.Host, ":443")
}
apiEndpoint := apiServer + fmt.Sprintf(routeGroupListResource, routeGroupVersion)
if cfg.Namespace != "" {
apiEndpoint = apiServer + fmt.Sprintf(routeGroupNamespacedResource, routeGroupVersion, cfg.Namespace)
}
return &routeGroupSource{
cli: cli,
apiServer: apiServer,
namespace: cfg.Namespace,
apiEndpoint: apiEndpoint,
annotationFilter: cfg.AnnotationFilter,
templateEngine: cfg.TemplateEngine,
ignoreHostnameAnnotation: cfg.IgnoreHostnameAnnotation,
}, nil
}
// AddEventHandler for routegroup is currently a no op, because we do not implement caching, yet.
func (sc *routeGroupSource) AddEventHandler(_ context.Context, _ func()) {}
// Endpoints returns endpoint objects for each host-target combination that should be processed.
// Retrieves all routeGroup resources on all namespaces.
// Logic is ported from ingress without fqdnTemplate
func (sc *routeGroupSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
rgList, err := sc.cli.getRouteGroupList(sc.apiEndpoint)
if err != nil {
log.Errorf("Failed to get RouteGroup list: %v", err)
return nil, err
}
filtered, err := annotations.Filter(rgList.Items, sc.annotationFilter)
if err != nil {
return nil, err
}
endpoints := []*endpoint.Endpoint{}
for _, rg := range filtered {
if annotations.IsControllerMismatch(rg, types.SkipperRouteGroup) {
continue
}
eps := sc.endpointsFromRouteGroup(rg)
eps, err = sc.templateEngine.CombineWithEndpoints(
eps,
func() ([]*endpoint.Endpoint, error) { return sc.endpointsFromTemplate(rg) },
)
if err != nil {
return nil, err
}
if endpoint.HasNoEmptyEndpoints(eps, types.OpenShiftRoute, rg) {
continue
}
log.Debugf("Endpoints generated from ingress: %s/%s: %v", rg.Namespace, rg.Name, eps)
endpoints = append(endpoints, eps...)
}
return MergeEndpoints(endpoints), nil
}
func (sc *routeGroupSource) endpointsFromTemplate(rg *routeGroup) ([]*endpoint.Endpoint, error) {
hostnames, err := sc.templateEngine.ExecFQDN(rg)
if err != nil {
return nil, err
}
resource := fmt.Sprintf("routegroup/%s/%s", rg.Namespace, rg.Name)
// error handled in endpointsFromRouteGroup(), otherwise duplicate log
ttl := annotations.TTLFromAnnotations(rg.Annotations, resource)
targets := annotations.TargetsFromTargetAnnotation(rg.Annotations)
if len(targets) == 0 {
targets = targetsFromRouteGroupStatus(rg.Status)
}
providerSpecific, setIdentifier := annotations.ProviderSpecificAnnotations(rg.Annotations)
var endpoints []*endpoint.Endpoint
for _, hostname := range hostnames {
endpoints = append(endpoints, endpoint.EndpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier, resource)...)
}
return endpoints, nil
}
func (sc *routeGroupSource) endpointsFromRouteGroup(rg *routeGroup) []*endpoint.Endpoint {
endpoints := []*endpoint.Endpoint{}
resource := fmt.Sprintf("routegroup/%s/%s", rg.Namespace, rg.Name)
ttl := annotations.TTLFromAnnotations(rg.Annotations, resource)
targets := annotations.TargetsFromTargetAnnotation(rg.Annotations)
if len(targets) == 0 {
for _, lb := range rg.Status.LoadBalancer.RouteGroup {
if lb.IP != "" {
targets = append(targets, lb.IP)
}
if lb.Hostname != "" {
targets = append(targets, lb.Hostname)
}
}
}
providerSpecific, setIdentifier := annotations.ProviderSpecificAnnotations(rg.Annotations)
for _, src := range rg.Spec.Hosts {
if src == "" {
continue
}
endpoints = append(endpoints, endpoint.EndpointsForHostname(src, targets, ttl, providerSpecific, setIdentifier, resource)...)
}
// Skip endpoints if we do not want entries from annotations
if !sc.ignoreHostnameAnnotation {
hostnameList := annotations.HostnamesFromAnnotations(rg.Annotations)
for _, hostname := range hostnameList {
endpoints = append(endpoints, endpoint.EndpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier, resource)...)
}
}
return endpoints
}
func targetsFromRouteGroupStatus(status routeGroupStatus) endpoint.Targets {
var targets endpoint.Targets
for _, lb := range status.LoadBalancer.RouteGroup {
if lb.IP != "" {
targets = append(targets, lb.IP)
}
if lb.Hostname != "" {
targets = append(targets, lb.Hostname)
}
}
return targets
}
type routeGroupList struct {
Kind string `json:"kind"`
APIVersion string `json:"apiVersion"`
Metadata routeGroupListMetadata `json:"metadata"`
Items []*routeGroup `json:"items"`
}
type routeGroupListMetadata struct {
SelfLink string `json:"selfLink"`
ResourceVersion string `json:"resourceVersion"`
}
type routeGroup struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
Spec routeGroupSpec `json:"spec"`
Status routeGroupStatus `json:"status"`
}
// Metadata returns the ObjectMeta for backward-compatible template access.
//
// Deprecated: use top-level fields directly (e.g. {{.Name}} instead of {{.Metadata.Name}}).
func (rg *routeGroup) Metadata() *metav1.ObjectMeta {
return &rg.ObjectMeta
}
func (rg *routeGroup) DeepCopyObject() runtime.Object {
out := *rg
return &out
}
type routeGroupSpec struct {
Hosts []string `json:"hosts"`
}
type routeGroupStatus struct {
LoadBalancer routeGroupLoadBalancerStatus `json:"loadBalancer"`
}
type routeGroupLoadBalancerStatus struct {
RouteGroup []routeGroupLoadBalancer `json:"routeGroup"`
}
type routeGroupLoadBalancer struct {
IP string `json:"ip,omitempty"`
Hostname string `json:"hostname,omitempty"`
}