feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting (#6322)

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* feat(client): add --kube-api-qps and --kube-api-burst flags for Kubernetes client rate limiting

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

---------

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
Ivan Ka 2026-03-29 22:10:12 +01:00 committed by GitHub
parent 5dd8369d29
commit f1d771815f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 418 additions and 141 deletions

View File

@ -49,7 +49,8 @@ section below.
- [ ] Scope resources at every level — service type, label, annotation, domain, zone ID. See
[Scope resources](#scope-resources).
- [ ] Split into multiple instances for large zone sets or source mixes, each with a distinct --txt-owner-id` and non-overlapping domain scope. See [Split instances](#split-instances).
- [ ] Tune reconcile frequency and raise `--request-timeout` on large clusters. See [Reduce reconcile pressure](#reduce-reconcile-pressure).
- [ ] Tune reconcile frequency and raise `--kube-api-request-timeout` on large clusters. See [Reduce reconcile pressure](#reduce-reconcile-pressure).
- [ ] Set `--kube-api-qps` and `--kube-api-burst` if external-dns is throttled by the Kubernetes API or shares API quota with many other controllers. See [Reduce reconcile pressure](#reduce-reconcile-pressure).
**Observability**
@ -252,8 +253,28 @@ Each instance must have a distinct `--txt-owner-id` and non-overlapping `--domai
Tune reconcile frequency to match your actual change rate rather than running at the default
interval. Use event-driven reconciliation to react quickly to real changes while keeping
background polling infrequent. Raise `--request-timeout` if informer cache sync exceeds the
default on large clusters.
background polling infrequent. Raise `--kube-api-request-timeout` if individual Kubernetes API
calls time out on a slow or heavily loaded API server (the default is 30s per request).
**Kubernetes API rate limiting.** external-dns inherits client-go's built-in defaults of 5 QPS
and 10 burst for Kubernetes API calls. On clusters where external-dns manages many sources or
runs at a high reconcile frequency, these defaults can be too low, causing throttling. On
clusters where it shares API quota with many other controllers, they may need to be lowered to
avoid starving higher-priority controllers.
```sh
# Raise limits for a high-throughput instance managing many sources
--kube-api-qps=20
--kube-api-burst=40
# Lower limits for a low-priority instance on a busy cluster
--kube-api-qps=2
--kube-api-burst=5
```
When the limit is hit, external-dns logs an error containing
`consider raising --kube-api-qps/--kube-api-burst` to make the cause actionable. Both flags
default to the client-go built-in values (5 QPS / 10 burst) when not set.
For per-provider flags covering batch change sizing, record caching, and zone list caching, see
[DNS provider API rate limits](rate-limits.md) and [Provider Notes](#provider-notes).

View File

@ -14,8 +14,6 @@ tags:
|:-------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `--[no-]version` | Show application version. |
| `--server=""` | The Kubernetes API server to connect to (default: auto-detect) |
| `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) |
| `--request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout |
| `--[no-]resolve-service-load-balancer-hostname` | Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs |
| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to EndpointSlices, for Service source (default: false) |
| `--gloo-namespace=gloo-system` | The Gloo Proxy namespace; specify multiple times for multiple namespaces. (default: gloo-system) |
@ -191,5 +189,10 @@ tags:
| `--fqdn-template=""` | A templated string that's used to generate DNS names from sources that don't define a hostname themselves, or to add a hostname suffix when paired with the fake source (optional). Accepts comma separated list for multiple global FQDN. |
| `--target-template=""` | A templated string used to generate DNS targets (IP or hostname) from sources that support it (optional). Accepts comma separated list for multiple targets. |
| `--fqdn-target-template=""` | A template that returns host:target pairs (e.g., '{{range .Object.endpoints}}{{.targetRef.name}}.svc.example.com:{{index .addresses 0}},{{end}}'). Accepts comma separated list for multiple pairs. |
| `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) |
| `--request-timeout=30s` | [DEPRECATED: use --kube-api-request-timeout] Request timeout when calling Kubernetes APIs. 0s means no timeout |
| `--kube-api-request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout |
| `--kube-api-qps=5` | Maximum QPS to the Kubernetes API server from this client. |
| `--kube-api-burst=10` | Maximum burst for throttle to the Kubernetes API server from this client. |
| `--provider=provider` | The DNS provider where the DNS records will be created (required, options: akamai, alibabacloud, aws, aws-sd, azure, azure-dns, azure-private-dns, civo, cloudflare, coredns, dnsimple, exoscale, gandi, godaddy, google, inmemory, linode, ns1, oci, ovh, pdns, pihole, plural, rfc2136, scaleway, skydns, transip, webhook) |
| `--source=source` | The resource types that are queried for endpoints; specify multiple times for multiple sources (required, options: service, ingress, node, pod, gateway-httproute, gateway-grpcroute, gateway-tlsroute, gateway-tcproute, gateway-udproute, istio-gateway, istio-virtualservice, contour-httpproxy, gloo-proxy, fake, connector, crd, empty, skipper-routegroup, openshift-route, ambassador-host, kong-tcpingress, f5-virtualserver, f5-transportserver, traefik-proxy, unstructured) |

View File

@ -24,6 +24,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/rest"
"sigs.k8s.io/external-dns/internal/flags"
@ -45,6 +46,9 @@ type Config struct {
APIServerURL string
KubeConfig string
RequestTimeout time.Duration
KubeAPIRequestTimeout time.Duration
KubeAPIQPS int
KubeAPIBurst int
DefaultTargets []string
GlooNamespaces []string
SkipperRouteGroupVersion string
@ -349,6 +353,9 @@ var defaultConfig = &Config{
RegexDomainFilter: regexp.MustCompile(""),
Registry: RegistryTXT,
RequestTimeout: time.Second * 30,
KubeAPIRequestTimeout: time.Second * 30,
KubeAPIQPS: int(rest.DefaultQPS),
KubeAPIBurst: rest.DefaultBurst,
RFC2136BatchChangeSize: 50,
RFC2136GSSTSIG: false,
RFC2136Host: []string{""},
@ -495,14 +502,24 @@ func (cfg *Config) ParseFlags(args []string) error {
if _, err := App(cfg).Parse(args); err != nil {
return err
}
cfg.resolveDeprecatedFlags()
return nil
}
// resolveDeprecatedFlags reconciles deprecated flags with their replacements.
// When --request-timeout is explicitly changed from its default and --kube-api-request-timeout
// was not, the deprecated value is promoted and a warning is logged.
// If both are explicitly set, --kube-api-request-timeout takes precedence.
func (cfg *Config) resolveDeprecatedFlags() {
if cfg.RequestTimeout != defaultConfig.RequestTimeout {
logrus.Warn("--request-timeout is deprecated, use --kube-api-request-timeout instead")
cfg.KubeAPIRequestTimeout = cfg.RequestTimeout
}
}
func bindFlags(b flags.FlagBinder, cfg *Config) {
// Flags related to Kubernetes
b.StringVar("server", "The Kubernetes API server to connect to (default: auto-detect)", defaultConfig.APIServerURL, &cfg.APIServerURL)
b.StringVar("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)", defaultConfig.KubeConfig, &cfg.KubeConfig)
b.DurationVar("request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout", defaultConfig.RequestTimeout, &cfg.RequestTimeout)
b.BoolVar("resolve-service-load-balancer-hostname", "Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs", false, &cfg.ResolveServiceLoadBalancerHostname)
b.BoolVar("listen-endpoint-events", "Trigger a reconcile on changes to EndpointSlices, for Service source (default: false)", false, &cfg.ListenEndpointEvents)
@ -713,6 +730,13 @@ func bindFlags(b flags.FlagBinder, cfg *Config) {
b.StringVar("fqdn-template", "A templated string that's used to generate DNS names from sources that don't define a hostname themselves, or to add a hostname suffix when paired with the fake source (optional). Accepts comma separated list for multiple global FQDN.", defaultConfig.FQDNTemplate, &cfg.FQDNTemplate)
b.StringVar("target-template", "A templated string used to generate DNS targets (IP or hostname) from sources that support it (optional). Accepts comma separated list for multiple targets.", defaultConfig.TargetTemplate, &cfg.TargetTemplate)
b.StringVar("fqdn-target-template", "A template that returns host:target pairs (e.g., '{{range .Object.endpoints}}{{.targetRef.name}}.svc.example.com:{{index .addresses 0}},{{end}}'). Accepts comma separated list for multiple pairs.", defaultConfig.FQDNTargetTemplate, &cfg.FQDNTargetTemplate)
// kube client config flags
b.StringVar("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)", defaultConfig.KubeConfig, &cfg.KubeConfig)
b.DurationVar("request-timeout", "[DEPRECATED: use --kube-api-request-timeout] Request timeout when calling Kubernetes APIs. 0s means no timeout", defaultConfig.RequestTimeout, &cfg.RequestTimeout)
b.DurationVar("kube-api-request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout", defaultConfig.KubeAPIRequestTimeout, &cfg.KubeAPIRequestTimeout)
b.IntVar("kube-api-qps", "Maximum QPS to the Kubernetes API server from this client.", defaultConfig.KubeAPIQPS, &cfg.KubeAPIQPS)
b.IntVar("kube-api-burst", "Maximum burst for throttle to the Kubernetes API server from this client.", defaultConfig.KubeAPIBurst, &cfg.KubeAPIBurst)
}
func App(cfg *Config) *kingpin.Application {

View File

@ -29,6 +29,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/client-go/rest"
)
var (
@ -36,6 +37,9 @@ var (
APIServerURL: "",
KubeConfig: "",
RequestTimeout: time.Second * 30,
KubeAPIRequestTimeout: time.Second * 30,
KubeAPIQPS: int(rest.DefaultQPS),
KubeAPIBurst: rest.DefaultBurst,
GlooNamespaces: []string{"gloo-system"},
SkipperRouteGroupVersion: "zalando.org/v1",
Sources: []string{"service"},
@ -140,6 +144,9 @@ var (
APIServerURL: "http://127.0.0.1:8080",
KubeConfig: "/some/path",
RequestTimeout: time.Second * 77,
KubeAPIRequestTimeout: time.Second * 77,
KubeAPIQPS: int(rest.DefaultQPS),
KubeAPIBurst: rest.DefaultBurst,
GlooNamespaces: []string{"gloo-not-system", "gloo-second-system"},
SkipperRouteGroupVersion: "zalando.org/v2",
Sources: []string{"service", "ingress", "connector"},
@ -590,8 +597,6 @@ func TestConfigStringMasksSecureFields(t *testing.T) {
// Default path should use kingpin and parse flags correctly
func TestParseFlagsDefaultKingpin(t *testing.T) {
t.Setenv("EXTERNAL_DNS_CLI", "")
args := []string{
"--provider=aws",
"--source=service",
@ -612,7 +617,7 @@ func TestParseFlagsDefaultKingpin(t *testing.T) {
assert.ElementsMatch(t, []string{"service", "ingress"}, cfg.Sources)
assert.Equal(t, "http://127.0.0.1:8080", cfg.APIServerURL)
assert.Equal(t, "/some/path", cfg.KubeConfig)
assert.Equal(t, 2*time.Second, cfg.RequestTimeout)
assert.Equal(t, 2*time.Second, cfg.KubeAPIRequestTimeout)
assert.Equal(t, "ns", cfg.Namespace)
assert.ElementsMatch(t, []string{"example.org", "company.com"}, cfg.DomainFilter)
assert.Equal(t, "default", cfg.OCPRouterName)
@ -648,7 +653,7 @@ func TestParseFlagsCobraSwitchParitySubset(t *testing.T) {
assert.ElementsMatch(t, cfgK.Sources, cfgC.Sources)
assert.Equal(t, cfgK.APIServerURL, cfgC.APIServerURL)
assert.Equal(t, cfgK.KubeConfig, cfgC.KubeConfig)
assert.Equal(t, cfgK.RequestTimeout, cfgC.RequestTimeout)
assert.Equal(t, cfgK.KubeAPIRequestTimeout, cfgC.KubeAPIRequestTimeout)
assert.Equal(t, cfgK.Namespace, cfgC.Namespace)
assert.ElementsMatch(t, cfgK.DomainFilter, cfgC.DomainFilter)
assert.Equal(t, cfgK.OCPRouterName, cfgC.OCPRouterName)
@ -963,6 +968,65 @@ func TestBinderParityMapAndRegexp(t *testing.T) {
assert.Equal(t, map[string]string{"foo": "bar"}, cfgK.AWSSDCreateTag)
}
func TestParseFlagsKubeAPIRequestTimeout(t *testing.T) {
for _, tc := range []struct {
name string
args []string
wantTimeout time.Duration
wantDeprecated time.Duration
}{
{
name: "new flag sets KubeAPIRequestTimeout",
args: []string{"--kube-api-request-timeout=60s"},
wantTimeout: 60 * time.Second,
wantDeprecated: 30 * time.Second,
},
{
name: "deprecated flag is promoted",
args: []string{"--request-timeout=45s"},
wantTimeout: 45 * time.Second,
wantDeprecated: 45 * time.Second,
},
{
name: "new flag wins when both are set",
args: []string{"--request-timeout=45s", "--kube-api-request-timeout=90s"},
wantTimeout: 45 * time.Second,
wantDeprecated: 45 * time.Second,
},
} {
t.Run(tc.name, func(t *testing.T) {
cfg := NewConfig()
require.NoError(t, cfg.ParseFlags(append([]string{"--provider=aws", "--source=service"}, tc.args...)))
assert.Equal(t, tc.wantTimeout, cfg.KubeAPIRequestTimeout)
assert.Equal(t, tc.wantDeprecated, cfg.RequestTimeout)
})
}
}
func TestParseFlagsKubeAPIRateLimit(t *testing.T) {
cfg := NewConfig()
require.NoError(t, cfg.ParseFlags([]string{
"--provider=aws",
"--source=service",
"--kube-api-qps=10",
"--kube-api-burst=20",
}))
assert.Equal(t, 10, cfg.KubeAPIQPS)
assert.Equal(t, 20, cfg.KubeAPIBurst)
}
func TestParseFlagsKubeAPIRateLimitDefaults(t *testing.T) {
cfg := NewConfig()
require.NoError(t, cfg.ParseFlags([]string{
"--provider=aws",
"--source=service",
}))
assert.Equal(t, int(rest.DefaultQPS), cfg.KubeAPIQPS)
assert.Equal(t, rest.DefaultBurst, cfg.KubeAPIBurst)
}
// Kingpin validates enum values at parse time
func TestBinderEnumValidationDifference(t *testing.T) {
// Kingpin should reject unknown enum values

View File

@ -63,6 +63,13 @@ func ValidateConfig(cfg *externaldns.Config) error {
return errors.New("--annotation-prefix must end with '/'")
}
if cfg.KubeAPIQPS <= 0 {
return errors.New("--kube-api-qps must be greater than 0")
}
if cfg.KubeAPIBurst <= 0 {
return errors.New("--kube-api-burst must be greater than 0")
}
return nil
}

View File

@ -19,10 +19,11 @@ package validation
import (
"testing"
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/client-go/rest"
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
)
func TestValidateFlags(t *testing.T) {
@ -96,6 +97,32 @@ func TestValidateFlags(t *testing.T) {
cfg = newValidConfig(t)
cfg.AnnotationPrefix = "external-dns.alpha.kubernetes.io/"
require.NoError(t, ValidateConfig(cfg))
t.Run("kube-api-qps and kube-api-burst", func(t *testing.T) {
for _, tc := range []struct {
name string
qps int
burst int
wantErr bool
}{
{name: "positive QPS and burst", qps: 10, burst: 20, wantErr: false},
{name: "zero QPS", qps: 0, burst: 10, wantErr: true},
{name: "zero burst", qps: 5, burst: 0, wantErr: true},
{name: "negative QPS", qps: -1, burst: 10, wantErr: true},
{name: "negative burst", qps: 5, burst: -1, wantErr: true},
} {
t.Run(tc.name, func(t *testing.T) {
cfg := newValidConfig(t)
cfg.KubeAPIQPS = tc.qps
cfg.KubeAPIBurst = tc.burst
if tc.wantErr {
require.Error(t, ValidateConfig(cfg))
} else {
require.NoError(t, ValidateConfig(cfg))
}
})
}
})
}
func newValidConfig(t *testing.T) *externaldns.Config {
@ -104,6 +131,8 @@ func newValidConfig(t *testing.T) *externaldns.Config {
cfg.LogFormat = "json"
cfg.Sources = []string{"test-source"}
cfg.Provider = "test-provider"
cfg.KubeAPIQPS = int(rest.DefaultQPS)
cfg.KubeAPIBurst = rest.DefaultBurst
require.NoError(t, ValidateConfig(cfg))
@ -155,6 +184,8 @@ func TestValidateGoodRfc2136Config(t *testing.T) {
cfg.Provider = "rfc2136"
cfg.RFC2136MinTTL = 3600
cfg.RFC2136BatchChangeSize = 50
cfg.KubeAPIQPS = int(rest.DefaultQPS)
cfg.KubeAPIBurst = rest.DefaultBurst
err := ValidateConfig(cfg)
@ -272,6 +303,8 @@ func TestValidateGoodRfc2136GssTsigConfig(t *testing.T) {
RFC2136KerberosPassword: "test-pass",
RFC2136MinTTL: 3600,
RFC2136BatchChangeSize: 50,
KubeAPIQPS: int(rest.DefaultQPS),
KubeAPIBurst: rest.DefaultBurst,
},
}
@ -348,13 +381,16 @@ func TestValidateGoodAkamaiConfig(t *testing.T) {
AkamaiClientSecret: "test-secret",
AkamaiAccessToken: "test-access-token",
AkamaiEdgercPath: "/path/to/edgerc",
KubeAPIQPS: int(rest.DefaultQPS),
KubeAPIBurst: rest.DefaultBurst,
},
{
LogFormat: "json",
Sources: []string{"test-source"},
Provider: "akamai",
AnnotationPrefix: "external-dns.alpha.kubernetes.io/",
// All Akamai fields can be empty if AkamaiEdgercPath is not specified
KubeAPIQPS: int(rest.DefaultQPS),
KubeAPIBurst: rest.DefaultBurst,
},
}
@ -386,6 +422,8 @@ func TestValidateGoodAzureConfig(t *testing.T) {
cfg.Provider = "azure"
cfg.AnnotationPrefix = "external-dns.alpha.kubernetes.io/"
cfg.AzureConfigFile = "/path/to/azure.json"
cfg.KubeAPIQPS = int(rest.DefaultQPS)
cfg.KubeAPIBurst = rest.DefaultBurst
err := ValidateConfig(cfg)

View File

@ -19,6 +19,8 @@ limitations under the License.
package kubeclient
import (
"context"
"fmt"
"os"
"time"
@ -26,11 +28,54 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
extdnshttp "sigs.k8s.io/external-dns/pkg/http"
)
// GetRestConfig returns the REST client configuration for Kubernetes API access.
// InstrumentedRESTConfig builds a REST config with Prometheus transport metrics, request timeout,
// and a token-bucket rate limiter. When qps > 0, it overrides the client-go defaults (5 QPS / 10 burst).
func InstrumentedRESTConfig(
kubeConfig, apiServerURL string,
requestTimeout time.Duration,
qps int, burst int) (*rest.Config, error) {
config, err := buildRestConfig(kubeConfig, apiServerURL)
if err != nil {
return nil, err
}
config.UserAgent = externaldns.UserAgent()
config.WrapTransport = extdnshttp.NewInstrumentedTransport
config.Timeout = requestTimeout
config.QPS = rest.DefaultQPS
config.Burst = rest.DefaultBurst
if qps > 0 {
config.QPS = float32(qps)
}
if burst > 0 {
config.Burst = burst
}
log.Debugf("kube client qps: %f, burst %d", config.QPS, config.Burst)
config.RateLimiter = &rateLimiter{
delegate: flowcontrol.NewTokenBucketRateLimiter(config.QPS, config.Burst),
}
return config, nil
}
// NewKubeClient creates a Kubernetes client from the given REST config.
func NewKubeClient(config *rest.Config) (kubernetes.Interface, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
log.Infof("Created Kubernetes client %s", config.Host)
return client, nil
}
// buildRestConfig returns the REST client configuration for Kubernetes API access.
// Supports both in-cluster and external cluster configurations.
//
// Configuration Priority:
@ -38,7 +83,7 @@ import (
// 2. Recommended home file (~/.kube/config)
// 3. In-cluster config
// TODO: consider clientcmd.NewDefaultClientConfigLoadingRules() with clientcmd.NewNonInteractiveDeferredLoadingClientConfig
func GetRestConfig(kubeConfig, apiServerURL string) (*rest.Config, error) {
func buildRestConfig(kubeConfig, apiServerURL string) (*rest.Config, error) {
if kubeConfig == "" {
if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
kubeConfig = clientcmd.RecommendedHomeFile
@ -66,38 +111,23 @@ func GetRestConfig(kubeConfig, apiServerURL string) (*rest.Config, error) {
return config, nil
}
// InstrumentedRESTConfig creates a REST config with request instrumentation for monitoring.
// Adds HTTP transport wrapper for Prometheus metrics collection and request timeout configuration.
//
// Metrics: Wraps the transport with pkg/http.NewInstrumentedTransport to collect
// HTTP request duration metrics for all Kubernetes API calls.
//
// Timeout: Applies the specified request timeout to prevent hanging requests.
func InstrumentedRESTConfig(kubeConfig, apiServerURL string, requestTimeout time.Duration) (*rest.Config, error) {
config, err := GetRestConfig(kubeConfig, apiServerURL)
if err != nil {
return nil, err
}
config.WrapTransport = extdnshttp.NewInstrumentedTransport
config.Timeout = requestTimeout
return config, nil
// rateLimiter wraps a RateLimiter and enriches Wait errors with an
// actionable hint so callers get a clear message without needing to inspect
// the error string themselves.
type rateLimiter struct {
delegate flowcontrol.RateLimiter
}
// NewKubeClient returns a new Kubernetes client object. It takes a Config and
// uses APIServerURL and KubeConfig attributes to connect to the cluster. If
// KubeConfig isn't provided it defaults to using the recommended default.
func NewKubeClient(kubeConfig, apiServerURL string, requestTimeout time.Duration) (*kubernetes.Clientset, error) {
log.Infof("Instantiating new Kubernetes client")
config, err := InstrumentedRESTConfig(kubeConfig, apiServerURL, requestTimeout)
if err != nil {
return nil, err
func (r *rateLimiter) TryAccept() bool { return r.delegate.TryAccept() }
func (r *rateLimiter) Accept() { r.delegate.Accept() }
func (r *rateLimiter) Stop() { r.delegate.Stop() }
func (r *rateLimiter) QPS() float32 { return r.delegate.QPS() }
// Wait blocks until a token is available or the context is done.
// Any error from Wait is a rate limit timeout; it is enriched with an actionable hint.
func (r *rateLimiter) Wait(ctx context.Context) error {
if err := r.delegate.Wait(ctx); err != nil {
return fmt.Errorf("consider raising --kube-api-qps/--kube-api-burst: %w", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
log.Infof("Created Kubernetes client %s", config.Host)
return client, nil
return nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package kubeclient
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
@ -27,80 +28,47 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
)
func TestGetRestConfig_WithKubeConfig(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}))
defer svr.Close()
mockKubeCfgDir := filepath.Join(t.TempDir(), ".kube")
mockKubeCfgPath := filepath.Join(mockKubeCfgDir, "config")
err := os.MkdirAll(mockKubeCfgDir, 0755)
require.NoError(t, err)
kubeCfgPath := writeKubeConfig(t, svr.URL)
kubeCfgTemplate := `apiVersion: v1
kind: Config
clusters:
- cluster:
server: %s
name: test-cluster
contexts:
- context:
cluster: test-cluster
user: test-user
name: test-context
current-context: test-context
users:
- name: test-user
user:
token: fake-token
`
err = os.WriteFile(mockKubeCfgPath, fmt.Appendf(nil, kubeCfgTemplate, svr.URL), 0644)
require.NoError(t, err)
config, err := GetRestConfig(mockKubeCfgPath, "")
config, err := buildRestConfig(kubeCfgPath, "")
require.NoError(t, err)
require.NotNil(t, config)
assert.Equal(t, svr.URL, config.Host)
}
func TestNewKubeClient(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}))
defer svr.Close()
config, err := InstrumentedRESTConfig(writeKubeConfig(t, svr.URL), "", 30*time.Second, 0, 0)
require.NoError(t, err)
client, err := NewKubeClient(config)
require.NoError(t, err)
assert.NotNil(t, client)
}
func TestInstrumentedRESTConfig_AddsMetrics(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}))
defer svr.Close()
mockKubeCfgDir := filepath.Join(t.TempDir(), ".kube")
mockKubeCfgPath := filepath.Join(mockKubeCfgDir, "config")
err := os.MkdirAll(mockKubeCfgDir, 0755)
require.NoError(t, err)
kubeCfgTemplate := `apiVersion: v1
kind: Config
clusters:
- cluster:
server: %s
name: test-cluster
contexts:
- context:
cluster: test-cluster
user: test-user
name: test-context
current-context: test-context
users:
- name: test-user
user:
token: fake-token
`
err = os.WriteFile(mockKubeCfgPath, fmt.Appendf(nil, kubeCfgTemplate, svr.URL), 0644)
require.NoError(t, err)
timeout := 30 * time.Second
config, err := InstrumentedRESTConfig(mockKubeCfgPath, "", timeout)
config, err := InstrumentedRESTConfig(writeKubeConfig(t, svr.URL), "", timeout, 0, 0)
require.NoError(t, err)
require.NotNil(t, config)
assert.Equal(t, timeout, config.Timeout)
assert.NotNil(t, config.WrapTransport, "WrapTransport should be set for metrics")
assert.NotNil(t, config.RateLimiter, "RateLimiter should always be set")
}
func TestGetRestConfig_RecommendedHomeFile(t *testing.T) {
@ -134,8 +102,103 @@ current-context: test-context
})
clientcmd.RecommendedHomeFile = mockKubeCfgPath
config, err := GetRestConfig("", "")
config, err := buildRestConfig("", "")
require.NoError(t, err)
require.NotNil(t, config)
assert.Equal(t, svr.URL, config.Host)
}
func TestInstrumentedRESTConfig_QPSAndBurstApplied(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}))
defer svr.Close()
kubeCfgPath := writeKubeConfig(t, svr.URL)
config, err := InstrumentedRESTConfig(kubeCfgPath, "", 30*time.Second, 20, 40)
require.NoError(t, err)
require.NotNil(t, config)
assert.Equal(t, 20, int(config.QPS))
assert.Equal(t, 40, config.Burst)
assert.NotNil(t, config.RateLimiter)
assert.Equal(t, 20, int(config.RateLimiter.QPS()))
}
func TestInstrumentedRESTConfig_ZeroQPSKeepsConfigDefaults(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}))
defer svr.Close()
kubeCfgPath := writeKubeConfig(t, svr.URL)
config, err := InstrumentedRESTConfig(kubeCfgPath, "", 30*time.Second, 0, 0)
require.NoError(t, err)
require.NotNil(t, config)
// qps == 0: client-go defaults applied; rate limiter still installed
assert.Equal(t, int(rest.DefaultQPS), int(config.QPS))
assert.Equal(t, rest.DefaultBurst, config.Burst)
assert.NotNil(t, config.RateLimiter)
assert.Equal(t, int(rest.DefaultQPS), int(config.RateLimiter.QPS()))
}
func TestEnrichingRateLimiter(t *testing.T) {
t.Run("delegates QPS and TryAccept", func(t *testing.T) {
rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(5, 10)}
assert.Equal(t, 5, int(rl.QPS()))
assert.True(t, rl.TryAccept(), "first TryAccept should succeed with non-zero burst")
})
t.Run("Accept does not panic", func(t *testing.T) {
rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(1000, 10)}
assert.NotPanics(t, rl.Accept)
})
t.Run("Stop does not panic", func(t *testing.T) {
rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(5, 10)}
assert.NotPanics(t, rl.Stop)
})
t.Run("Wait enriches error with actionable hint", func(t *testing.T) {
// burst=0: no tokens available; cancelled context triggers error immediately
rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(0.0001, 0)}
ctx, cancel := context.WithCancel(t.Context())
cancel()
err := rl.Wait(ctx)
require.Error(t, err)
assert.Contains(t, err.Error(), "consider raising --kube-api-qps/--kube-api-burst")
})
t.Run("Wait returns nil when token is available", func(t *testing.T) {
// burst=1: one token available immediately
rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(100, 1)}
assert.NoError(t, rl.Wait(t.Context()))
})
}
// writeKubeConfig writes a minimal kubeconfig pointing at serverURL into a temp dir
// and returns the path.
func writeKubeConfig(t *testing.T, serverURL string) string {
t.Helper()
dir := filepath.Join(t.TempDir(), ".kube")
path := filepath.Join(dir, "config")
require.NoError(t, os.MkdirAll(dir, 0755))
tmpl := `apiVersion: v1
kind: Config
clusters:
- cluster:
server: %s
name: test-cluster
contexts:
- context:
cluster: test-cluster
user: test-user
name: test-context
current-context: test-context
users:
- name: test-user
user:
token: fake-token
`
require.NoError(t, os.WriteFile(path, fmt.Appendf(nil, tmpl, serverURL), 0644))
return path
}

View File

@ -159,7 +159,8 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
dnsEndpoint.Status.ObservedGeneration = dnsEndpoint.Generation
if err := cs.crWriter.Status().Update(ctx, dnsEndpoint); err != nil {
log.Warnf("Could not update ObservedGeneration of the CRD: %v", err)
log.Warnf("Could not update ObservedGeneration of [%s/%s/%s]: %v",
"dnsendpoint", dnsEndpoint.Namespace, dnsEndpoint.Name, err)
}
}

View File

@ -26,7 +26,11 @@ import (
)
const (
defaultRequestTimeout = 60
// defaultTimeout is the maximum time in seconds to wait for informer caches
// to complete initial sync. This is intentionally longer than the per-request
// timeout: a cache sync may require multiple sequential API calls
// (LIST + Watch handshake), so the total wait needs to exceed a single request duration.
defaultTimeout = 60
)
type informerFactory interface {
@ -51,7 +55,7 @@ func waitForCacheSync[K comparable](ctx context.Context, waitFunc func(<-chan st
// The function receives a ctx but then creates a new timeout,
// effectively overriding whatever deadline the caller may have set.
// If the caller passed a context with a 30s timeout, this function ignores it and waits 60s anyway.
timeout := defaultRequestTimeout * time.Second
timeout := defaultTimeout * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for typ, done := range waitFunc(ctx.Done()) {

View File

@ -206,7 +206,7 @@ func NewRouteGroupSource(cfg *Config, token, tokenPath, apiServerURL string) (So
if routeGroupVersion == "" {
routeGroupVersion = DefaultRoutegroupVersion
}
cli := newRouteGroupClient(token, tokenPath, cfg.RequestTimeout)
cli := newRouteGroupClient(token, tokenPath, cfg.KubeAPIRequestTimeout)
u, err := url.Parse(apiServerURL)
if err != nil {

View File

@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"os"
"sync"
"time"
@ -31,7 +30,6 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
@ -86,7 +84,9 @@ type Config struct {
ServiceTypeFilter []string
GlooNamespaces []string
SkipperRouteGroupVersion string
RequestTimeout time.Duration
KubeAPIRequestTimeout time.Duration
KubeAPIQPS int
KubeAPIBurst int
DefaultTargets []string
ForceDefaultTargets bool
OCPRouterName string
@ -156,7 +156,9 @@ func NewSourceConfig(cfg *externaldns.Config, opts ...OverrideConfigOption) (*Co
ServiceTypeFilter: cfg.ServiceTypeFilter,
GlooNamespaces: cfg.GlooNamespaces,
SkipperRouteGroupVersion: cfg.SkipperRouteGroupVersion,
RequestTimeout: cfg.RequestTimeout,
KubeAPIRequestTimeout: cfg.KubeAPIRequestTimeout,
KubeAPIQPS: cfg.KubeAPIQPS,
KubeAPIBurst: cfg.KubeAPIBurst,
DefaultTargets: cfg.DefaultTargets,
ForceDefaultTargets: cfg.ForceDefaultTargets,
OCPRouterName: cfg.OCPRouterName,
@ -200,8 +202,10 @@ func (cfg *Config) ClientGenerator() ClientGenerator {
if cfg.UpdateEvents {
return 0
}
return cfg.RequestTimeout
return cfg.KubeAPIRequestTimeout
}(),
QPS: cfg.KubeAPIQPS,
Burst: cfg.KubeAPIBurst,
}
}
})
@ -241,8 +245,8 @@ type ClientGenerator interface {
// Memory Efficiency: Prevents creating multiple instances of expensive client objects
// that maintain their own connection pools and caches.
//
// Configuration: Clients are configured using KubeConfig, APIServerURL, and RequestTimeout
// which are set during SingletonClientGenerator initialization.
// Configuration: Clients are configured using KubeConfig, APIServerURL, RequestTimeout,
// QPS, and Burst which are set during SingletonClientGenerator initialization.
//
// TODO: Fix error handling pattern in client methods. Current implementation has a bug where
// errors are only returned on the first call due to sync.Once behavior. If initialization fails
@ -269,6 +273,8 @@ type SingletonClientGenerator struct {
KubeConfig string
APIServerURL string
RequestTimeout time.Duration
QPS int
Burst int
restConfig *rest.Config
kubeClient kubernetes.Interface
gatewayClient gateway.Interface
@ -287,7 +293,12 @@ type SingletonClientGenerator struct {
func (p *SingletonClientGenerator) KubeClient() (kubernetes.Interface, error) {
var err error
p.kubeOnce.Do(func() {
p.kubeClient, err = kubeclient.NewKubeClient(p.KubeConfig, p.APIServerURL, p.RequestTimeout)
var config *rest.Config
config, err = p.RESTConfig()
if err != nil {
return
}
p.kubeClient, err = kubeclient.NewKubeClient(config)
})
return p.kubeClient, err
}
@ -298,7 +309,7 @@ func (p *SingletonClientGenerator) KubeClient() (kubernetes.Interface, error) {
func (p *SingletonClientGenerator) RESTConfig() (*rest.Config, error) {
var err error
p.restConfigOnce.Do(func() {
p.restConfig, err = kubeclient.InstrumentedRESTConfig(p.KubeConfig, p.APIServerURL, p.RequestTimeout)
p.restConfig, err = kubeclient.InstrumentedRESTConfig(p.KubeConfig, p.APIServerURL, p.RequestTimeout, p.QPS, p.Burst)
})
return p.restConfig, err
}
@ -325,7 +336,12 @@ func (p *SingletonClientGenerator) GatewayClient() (gateway.Interface, error) {
func (p *SingletonClientGenerator) IstioClient() (istioclient.Interface, error) {
var err error
p.istioOnce.Do(func() {
p.istioClient, err = NewIstioClient(p.KubeConfig, p.APIServerURL)
var config *rest.Config
config, err = p.RESTConfig()
if err != nil {
return
}
p.istioClient, err = NewIstioClient(config)
})
return p.istioClient, err
}
@ -451,7 +467,7 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg
case types.CRD:
return buildCRDSource(ctx, p, cfg)
case types.SkipperRouteGroup:
return buildSkipperRouteGroupSource(ctx, cfg)
return buildSkipperRouteGroupSource(ctx, p, cfg)
case types.KongTCPIngress:
return buildKongTCPIngressSource(ctx, p, cfg)
case types.F5VirtualServer:
@ -627,11 +643,11 @@ func buildCRDSource(ctx context.Context, p ClientGenerator, cfg *Config) (Source
// buildSkipperRouteGroupSource creates a Skipper RouteGroup source for exposing route groups as DNS records.
// Special case: Does not use ClientGenerator pattern, instead manages its own authentication.
// Retrieves bearer token from REST config for API server authentication.
func buildSkipperRouteGroupSource(_ context.Context, cfg *Config) (Source, error) {
func buildSkipperRouteGroupSource(_ context.Context, p ClientGenerator, cfg *Config) (Source, error) {
apiServerURL := cfg.APIServerURL
tokenPath := ""
token := ""
restConfig, err := kubeclient.GetRestConfig(cfg.KubeConfig, cfg.APIServerURL)
restConfig, err := p.RESTConfig()
if err == nil {
apiServerURL = restConfig.Host
tokenPath = restConfig.BearerTokenFile
@ -688,26 +704,8 @@ func buildUnstructuredSource(ctx context.Context, p ClientGenerator, cfg *Config
return NewUnstructuredFQDNSource(ctx, dynamicClient, kubeClient, cfg)
}
// NewIstioClient returns a new Istio client object. It uses the configured
// KubeConfig attribute to connect to the cluster. If KubeConfig isn't provided
// it defaults to using the recommended default.
// NB: Istio controls the creation of the underlying Kubernetes client, so we
// have no ability to tack on transport wrappers (e.g., Prometheus request
// wrappers) to the client's config at this level. Furthermore, the Istio client
// constructor does not expose the ability to override the Kubernetes API server endpoint,
// so the apiServerURL config attribute has no effect.
func NewIstioClient(kubeConfig string, apiServerURL string) (*istioclient.Clientset, error) {
if kubeConfig == "" {
if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
kubeConfig = clientcmd.RecommendedHomeFile
}
}
restCfg, err := clientcmd.BuildConfigFromFlags(apiServerURL, kubeConfig)
if err != nil {
return nil, err
}
// NewIstioClient returns a new Istio client object from the given REST config.
func NewIstioClient(restCfg *rest.Config) (*istioclient.Clientset, error) {
ic, err := istioclient.NewForConfig(restCfg)
if err != nil {
return nil, fmt.Errorf("failed to create istio client: %w", err)

View File

@ -211,9 +211,9 @@ func TestBuildWithConfig_InvalidSource(t *testing.T) {
func TestConfig_ClientGenerator_Caching(t *testing.T) {
cfg := &Config{
KubeConfig: "/path/to/kubeconfig",
APIServerURL: "https://api.example.com",
RequestTimeout: 30 * time.Second,
KubeConfig: "/path/to/kubeconfig",
APIServerURL: "https://api.example.com",
KubeAPIRequestTimeout: 30 * time.Second,
}
gen1 := cfg.ClientGenerator()
@ -273,7 +273,7 @@ func TestSingletonClientGenerator_RESTConfig_TimeoutPropagation(t *testing.T) {
// TestConfig_ClientGenerator_RESTConfig_Integration verifies Config → ClientGenerator → RESTConfig flow
func TestConfig_ClientGenerator_RESTConfig_Integration(t *testing.T) {
t.Run("normal timeout is propagated", func(t *testing.T) {
cfg := &Config{RequestTimeout: 45 * time.Second}
cfg := &Config{KubeAPIRequestTimeout: 45 * time.Second}
config, err := cfg.ClientGenerator().RESTConfig()
if err == nil {
@ -283,7 +283,7 @@ func TestConfig_ClientGenerator_RESTConfig_Integration(t *testing.T) {
})
t.Run("UpdateEvents sets timeout to zero", func(t *testing.T) {
cfg := &Config{RequestTimeout: 45 * time.Second, UpdateEvents: true}
cfg := &Config{KubeAPIRequestTimeout: 45 * time.Second, UpdateEvents: true}
config, err := cfg.ClientGenerator().RESTConfig()
if err == nil {
@ -393,3 +393,27 @@ func TestNewSourceConfig(t *testing.T) {
})
}
}
func TestKubeAPIRateLimitPropagation(t *testing.T) {
t.Run("NewSourceConfig propagates QPS and burst", func(t *testing.T) {
cfg := &externaldns.Config{
KubeAPIQPS: 20,
KubeAPIBurst: 40,
}
got, err := NewSourceConfig(cfg)
require.NoError(t, err)
assert.Equal(t, 20, got.KubeAPIQPS)
assert.Equal(t, 40, got.KubeAPIBurst)
})
t.Run("ClientGenerator wires QPS and burst into SingletonClientGenerator", func(t *testing.T) {
cfg := &Config{
KubeAPIQPS: 15,
KubeAPIBurst: 30,
}
scg, ok := cfg.ClientGenerator().(*SingletonClientGenerator)
require.True(t, ok)
assert.Equal(t, 15, scg.QPS)
assert.Equal(t, 30, scg.Burst)
})
}