diff --git a/docs/advanced/operational-best-practices.md b/docs/advanced/operational-best-practices.md index 233f3e8f6..84819a770 100644 --- a/docs/advanced/operational-best-practices.md +++ b/docs/advanced/operational-best-practices.md @@ -49,7 +49,8 @@ section below. - [ ] Scope resources at every level — service type, label, annotation, domain, zone ID. See [Scope resources](#scope-resources). - [ ] Split into multiple instances for large zone sets or source mixes, each with a distinct --txt-owner-id` and non-overlapping domain scope. See [Split instances](#split-instances). -- [ ] Tune reconcile frequency and raise `--request-timeout` on large clusters. See [Reduce reconcile pressure](#reduce-reconcile-pressure). +- [ ] Tune reconcile frequency and raise `--kube-api-request-timeout` on large clusters. See [Reduce reconcile pressure](#reduce-reconcile-pressure). +- [ ] Set `--kube-api-qps` and `--kube-api-burst` if external-dns is throttled by the Kubernetes API or shares API quota with many other controllers. See [Reduce reconcile pressure](#reduce-reconcile-pressure). **Observability** @@ -252,8 +253,28 @@ Each instance must have a distinct `--txt-owner-id` and non-overlapping `--domai Tune reconcile frequency to match your actual change rate rather than running at the default interval. Use event-driven reconciliation to react quickly to real changes while keeping -background polling infrequent. Raise `--request-timeout` if informer cache sync exceeds the -default on large clusters. +background polling infrequent. Raise `--kube-api-request-timeout` if individual Kubernetes API +calls time out on a slow or heavily loaded API server (the default is 30s per request). + +**Kubernetes API rate limiting.** external-dns inherits client-go's built-in defaults of 5 QPS +and 10 burst for Kubernetes API calls. On clusters where external-dns manages many sources or +runs at a high reconcile frequency, these defaults can be too low, causing throttling. On +clusters where it shares API quota with many other controllers, they may need to be lowered to +avoid starving higher-priority controllers. + +```sh +# Raise limits for a high-throughput instance managing many sources +--kube-api-qps=20 +--kube-api-burst=40 + +# Lower limits for a low-priority instance on a busy cluster +--kube-api-qps=2 +--kube-api-burst=5 +``` + +When the limit is hit, external-dns logs an error containing +`consider raising --kube-api-qps/--kube-api-burst` to make the cause actionable. Both flags +default to the client-go built-in values (5 QPS / 10 burst) when not set. For per-provider flags covering batch change sizing, record caching, and zone list caching, see [DNS provider API rate limits](rate-limits.md) and [Provider Notes](#provider-notes). diff --git a/docs/flags.md b/docs/flags.md index a86fe8373..fb1168228 100644 --- a/docs/flags.md +++ b/docs/flags.md @@ -14,8 +14,6 @@ tags: |:-------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `--[no-]version` | Show application version. | | `--server=""` | The Kubernetes API server to connect to (default: auto-detect) | -| `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) | -| `--request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout | | `--[no-]resolve-service-load-balancer-hostname` | Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs | | `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to EndpointSlices, for Service source (default: false) | | `--gloo-namespace=gloo-system` | The Gloo Proxy namespace; specify multiple times for multiple namespaces. (default: gloo-system) | @@ -191,5 +189,10 @@ tags: | `--fqdn-template=""` | A templated string that's used to generate DNS names from sources that don't define a hostname themselves, or to add a hostname suffix when paired with the fake source (optional). Accepts comma separated list for multiple global FQDN. | | `--target-template=""` | A templated string used to generate DNS targets (IP or hostname) from sources that support it (optional). Accepts comma separated list for multiple targets. | | `--fqdn-target-template=""` | A template that returns host:target pairs (e.g., '{{range .Object.endpoints}}{{.targetRef.name}}.svc.example.com:{{index .addresses 0}},{{end}}'). Accepts comma separated list for multiple pairs. | +| `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) | +| `--request-timeout=30s` | [DEPRECATED: use --kube-api-request-timeout] Request timeout when calling Kubernetes APIs. 0s means no timeout | +| `--kube-api-request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout | +| `--kube-api-qps=5` | Maximum QPS to the Kubernetes API server from this client. | +| `--kube-api-burst=10` | Maximum burst for throttle to the Kubernetes API server from this client. | | `--provider=provider` | The DNS provider where the DNS records will be created (required, options: akamai, alibabacloud, aws, aws-sd, azure, azure-dns, azure-private-dns, civo, cloudflare, coredns, dnsimple, exoscale, gandi, godaddy, google, inmemory, linode, ns1, oci, ovh, pdns, pihole, plural, rfc2136, scaleway, skydns, transip, webhook) | | `--source=source` | The resource types that are queried for endpoints; specify multiple times for multiple sources (required, options: service, ingress, node, pod, gateway-httproute, gateway-grpcroute, gateway-tlsroute, gateway-tcproute, gateway-udproute, istio-gateway, istio-virtualservice, contour-httpproxy, gloo-proxy, fake, connector, crd, empty, skipper-routegroup, openshift-route, ambassador-host, kong-tcpingress, f5-virtualserver, f5-transportserver, traefik-proxy, unstructured) | diff --git a/pkg/apis/externaldns/types.go b/pkg/apis/externaldns/types.go index 16aa52e2c..407cabac0 100644 --- a/pkg/apis/externaldns/types.go +++ b/pkg/apis/externaldns/types.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/rest" "sigs.k8s.io/external-dns/internal/flags" @@ -45,6 +46,9 @@ type Config struct { APIServerURL string KubeConfig string RequestTimeout time.Duration + KubeAPIRequestTimeout time.Duration + KubeAPIQPS int + KubeAPIBurst int DefaultTargets []string GlooNamespaces []string SkipperRouteGroupVersion string @@ -349,6 +353,9 @@ var defaultConfig = &Config{ RegexDomainFilter: regexp.MustCompile(""), Registry: RegistryTXT, RequestTimeout: time.Second * 30, + KubeAPIRequestTimeout: time.Second * 30, + KubeAPIQPS: int(rest.DefaultQPS), + KubeAPIBurst: rest.DefaultBurst, RFC2136BatchChangeSize: 50, RFC2136GSSTSIG: false, RFC2136Host: []string{""}, @@ -495,14 +502,24 @@ func (cfg *Config) ParseFlags(args []string) error { if _, err := App(cfg).Parse(args); err != nil { return err } + cfg.resolveDeprecatedFlags() return nil } +// resolveDeprecatedFlags reconciles deprecated flags with their replacements. +// When --request-timeout is explicitly changed from its default and --kube-api-request-timeout +// was not, the deprecated value is promoted and a warning is logged. +// If both are explicitly set, --kube-api-request-timeout takes precedence. +func (cfg *Config) resolveDeprecatedFlags() { + if cfg.RequestTimeout != defaultConfig.RequestTimeout { + logrus.Warn("--request-timeout is deprecated, use --kube-api-request-timeout instead") + cfg.KubeAPIRequestTimeout = cfg.RequestTimeout + } +} + func bindFlags(b flags.FlagBinder, cfg *Config) { // Flags related to Kubernetes b.StringVar("server", "The Kubernetes API server to connect to (default: auto-detect)", defaultConfig.APIServerURL, &cfg.APIServerURL) - b.StringVar("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)", defaultConfig.KubeConfig, &cfg.KubeConfig) - b.DurationVar("request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout", defaultConfig.RequestTimeout, &cfg.RequestTimeout) b.BoolVar("resolve-service-load-balancer-hostname", "Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs", false, &cfg.ResolveServiceLoadBalancerHostname) b.BoolVar("listen-endpoint-events", "Trigger a reconcile on changes to EndpointSlices, for Service source (default: false)", false, &cfg.ListenEndpointEvents) @@ -713,6 +730,13 @@ func bindFlags(b flags.FlagBinder, cfg *Config) { b.StringVar("fqdn-template", "A templated string that's used to generate DNS names from sources that don't define a hostname themselves, or to add a hostname suffix when paired with the fake source (optional). Accepts comma separated list for multiple global FQDN.", defaultConfig.FQDNTemplate, &cfg.FQDNTemplate) b.StringVar("target-template", "A templated string used to generate DNS targets (IP or hostname) from sources that support it (optional). Accepts comma separated list for multiple targets.", defaultConfig.TargetTemplate, &cfg.TargetTemplate) b.StringVar("fqdn-target-template", "A template that returns host:target pairs (e.g., '{{range .Object.endpoints}}{{.targetRef.name}}.svc.example.com:{{index .addresses 0}},{{end}}'). Accepts comma separated list for multiple pairs.", defaultConfig.FQDNTargetTemplate, &cfg.FQDNTargetTemplate) + + // kube client config flags + b.StringVar("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)", defaultConfig.KubeConfig, &cfg.KubeConfig) + b.DurationVar("request-timeout", "[DEPRECATED: use --kube-api-request-timeout] Request timeout when calling Kubernetes APIs. 0s means no timeout", defaultConfig.RequestTimeout, &cfg.RequestTimeout) + b.DurationVar("kube-api-request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout", defaultConfig.KubeAPIRequestTimeout, &cfg.KubeAPIRequestTimeout) + b.IntVar("kube-api-qps", "Maximum QPS to the Kubernetes API server from this client.", defaultConfig.KubeAPIQPS, &cfg.KubeAPIQPS) + b.IntVar("kube-api-burst", "Maximum burst for throttle to the Kubernetes API server from this client.", defaultConfig.KubeAPIBurst, &cfg.KubeAPIBurst) } func App(cfg *Config) *kingpin.Application { diff --git a/pkg/apis/externaldns/types_test.go b/pkg/apis/externaldns/types_test.go index db47f0538..782186c86 100644 --- a/pkg/apis/externaldns/types_test.go +++ b/pkg/apis/externaldns/types_test.go @@ -29,6 +29,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/client-go/rest" ) var ( @@ -36,6 +37,9 @@ var ( APIServerURL: "", KubeConfig: "", RequestTimeout: time.Second * 30, + KubeAPIRequestTimeout: time.Second * 30, + KubeAPIQPS: int(rest.DefaultQPS), + KubeAPIBurst: rest.DefaultBurst, GlooNamespaces: []string{"gloo-system"}, SkipperRouteGroupVersion: "zalando.org/v1", Sources: []string{"service"}, @@ -140,6 +144,9 @@ var ( APIServerURL: "http://127.0.0.1:8080", KubeConfig: "/some/path", RequestTimeout: time.Second * 77, + KubeAPIRequestTimeout: time.Second * 77, + KubeAPIQPS: int(rest.DefaultQPS), + KubeAPIBurst: rest.DefaultBurst, GlooNamespaces: []string{"gloo-not-system", "gloo-second-system"}, SkipperRouteGroupVersion: "zalando.org/v2", Sources: []string{"service", "ingress", "connector"}, @@ -590,8 +597,6 @@ func TestConfigStringMasksSecureFields(t *testing.T) { // Default path should use kingpin and parse flags correctly func TestParseFlagsDefaultKingpin(t *testing.T) { - t.Setenv("EXTERNAL_DNS_CLI", "") - args := []string{ "--provider=aws", "--source=service", @@ -612,7 +617,7 @@ func TestParseFlagsDefaultKingpin(t *testing.T) { assert.ElementsMatch(t, []string{"service", "ingress"}, cfg.Sources) assert.Equal(t, "http://127.0.0.1:8080", cfg.APIServerURL) assert.Equal(t, "/some/path", cfg.KubeConfig) - assert.Equal(t, 2*time.Second, cfg.RequestTimeout) + assert.Equal(t, 2*time.Second, cfg.KubeAPIRequestTimeout) assert.Equal(t, "ns", cfg.Namespace) assert.ElementsMatch(t, []string{"example.org", "company.com"}, cfg.DomainFilter) assert.Equal(t, "default", cfg.OCPRouterName) @@ -648,7 +653,7 @@ func TestParseFlagsCobraSwitchParitySubset(t *testing.T) { assert.ElementsMatch(t, cfgK.Sources, cfgC.Sources) assert.Equal(t, cfgK.APIServerURL, cfgC.APIServerURL) assert.Equal(t, cfgK.KubeConfig, cfgC.KubeConfig) - assert.Equal(t, cfgK.RequestTimeout, cfgC.RequestTimeout) + assert.Equal(t, cfgK.KubeAPIRequestTimeout, cfgC.KubeAPIRequestTimeout) assert.Equal(t, cfgK.Namespace, cfgC.Namespace) assert.ElementsMatch(t, cfgK.DomainFilter, cfgC.DomainFilter) assert.Equal(t, cfgK.OCPRouterName, cfgC.OCPRouterName) @@ -963,6 +968,65 @@ func TestBinderParityMapAndRegexp(t *testing.T) { assert.Equal(t, map[string]string{"foo": "bar"}, cfgK.AWSSDCreateTag) } +func TestParseFlagsKubeAPIRequestTimeout(t *testing.T) { + for _, tc := range []struct { + name string + args []string + wantTimeout time.Duration + wantDeprecated time.Duration + }{ + { + name: "new flag sets KubeAPIRequestTimeout", + args: []string{"--kube-api-request-timeout=60s"}, + wantTimeout: 60 * time.Second, + wantDeprecated: 30 * time.Second, + }, + { + name: "deprecated flag is promoted", + args: []string{"--request-timeout=45s"}, + wantTimeout: 45 * time.Second, + wantDeprecated: 45 * time.Second, + }, + { + name: "new flag wins when both are set", + args: []string{"--request-timeout=45s", "--kube-api-request-timeout=90s"}, + wantTimeout: 45 * time.Second, + wantDeprecated: 45 * time.Second, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cfg := NewConfig() + require.NoError(t, cfg.ParseFlags(append([]string{"--provider=aws", "--source=service"}, tc.args...))) + assert.Equal(t, tc.wantTimeout, cfg.KubeAPIRequestTimeout) + assert.Equal(t, tc.wantDeprecated, cfg.RequestTimeout) + }) + } +} + +func TestParseFlagsKubeAPIRateLimit(t *testing.T) { + cfg := NewConfig() + require.NoError(t, cfg.ParseFlags([]string{ + "--provider=aws", + "--source=service", + "--kube-api-qps=10", + "--kube-api-burst=20", + })) + + assert.Equal(t, 10, cfg.KubeAPIQPS) + assert.Equal(t, 20, cfg.KubeAPIBurst) +} + +func TestParseFlagsKubeAPIRateLimitDefaults(t *testing.T) { + cfg := NewConfig() + require.NoError(t, cfg.ParseFlags([]string{ + "--provider=aws", + "--source=service", + })) + + assert.Equal(t, int(rest.DefaultQPS), cfg.KubeAPIQPS) + assert.Equal(t, rest.DefaultBurst, cfg.KubeAPIBurst) +} + // Kingpin validates enum values at parse time func TestBinderEnumValidationDifference(t *testing.T) { // Kingpin should reject unknown enum values diff --git a/pkg/apis/externaldns/validation/validation.go b/pkg/apis/externaldns/validation/validation.go index c14dc381f..69b8c2a0d 100644 --- a/pkg/apis/externaldns/validation/validation.go +++ b/pkg/apis/externaldns/validation/validation.go @@ -63,6 +63,13 @@ func ValidateConfig(cfg *externaldns.Config) error { return errors.New("--annotation-prefix must end with '/'") } + if cfg.KubeAPIQPS <= 0 { + return errors.New("--kube-api-qps must be greater than 0") + } + if cfg.KubeAPIBurst <= 0 { + return errors.New("--kube-api-burst must be greater than 0") + } + return nil } diff --git a/pkg/apis/externaldns/validation/validation_test.go b/pkg/apis/externaldns/validation/validation_test.go index 3edeb6a94..c4f4b5060 100644 --- a/pkg/apis/externaldns/validation/validation_test.go +++ b/pkg/apis/externaldns/validation/validation_test.go @@ -19,10 +19,11 @@ package validation import ( "testing" - "sigs.k8s.io/external-dns/pkg/apis/externaldns" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/client-go/rest" + + "sigs.k8s.io/external-dns/pkg/apis/externaldns" ) func TestValidateFlags(t *testing.T) { @@ -96,6 +97,32 @@ func TestValidateFlags(t *testing.T) { cfg = newValidConfig(t) cfg.AnnotationPrefix = "external-dns.alpha.kubernetes.io/" require.NoError(t, ValidateConfig(cfg)) + + t.Run("kube-api-qps and kube-api-burst", func(t *testing.T) { + for _, tc := range []struct { + name string + qps int + burst int + wantErr bool + }{ + {name: "positive QPS and burst", qps: 10, burst: 20, wantErr: false}, + {name: "zero QPS", qps: 0, burst: 10, wantErr: true}, + {name: "zero burst", qps: 5, burst: 0, wantErr: true}, + {name: "negative QPS", qps: -1, burst: 10, wantErr: true}, + {name: "negative burst", qps: 5, burst: -1, wantErr: true}, + } { + t.Run(tc.name, func(t *testing.T) { + cfg := newValidConfig(t) + cfg.KubeAPIQPS = tc.qps + cfg.KubeAPIBurst = tc.burst + if tc.wantErr { + require.Error(t, ValidateConfig(cfg)) + } else { + require.NoError(t, ValidateConfig(cfg)) + } + }) + } + }) } func newValidConfig(t *testing.T) *externaldns.Config { @@ -104,6 +131,8 @@ func newValidConfig(t *testing.T) *externaldns.Config { cfg.LogFormat = "json" cfg.Sources = []string{"test-source"} cfg.Provider = "test-provider" + cfg.KubeAPIQPS = int(rest.DefaultQPS) + cfg.KubeAPIBurst = rest.DefaultBurst require.NoError(t, ValidateConfig(cfg)) @@ -155,6 +184,8 @@ func TestValidateGoodRfc2136Config(t *testing.T) { cfg.Provider = "rfc2136" cfg.RFC2136MinTTL = 3600 cfg.RFC2136BatchChangeSize = 50 + cfg.KubeAPIQPS = int(rest.DefaultQPS) + cfg.KubeAPIBurst = rest.DefaultBurst err := ValidateConfig(cfg) @@ -272,6 +303,8 @@ func TestValidateGoodRfc2136GssTsigConfig(t *testing.T) { RFC2136KerberosPassword: "test-pass", RFC2136MinTTL: 3600, RFC2136BatchChangeSize: 50, + KubeAPIQPS: int(rest.DefaultQPS), + KubeAPIBurst: rest.DefaultBurst, }, } @@ -348,13 +381,16 @@ func TestValidateGoodAkamaiConfig(t *testing.T) { AkamaiClientSecret: "test-secret", AkamaiAccessToken: "test-access-token", AkamaiEdgercPath: "/path/to/edgerc", + KubeAPIQPS: int(rest.DefaultQPS), + KubeAPIBurst: rest.DefaultBurst, }, { LogFormat: "json", Sources: []string{"test-source"}, Provider: "akamai", AnnotationPrefix: "external-dns.alpha.kubernetes.io/", - // All Akamai fields can be empty if AkamaiEdgercPath is not specified + KubeAPIQPS: int(rest.DefaultQPS), + KubeAPIBurst: rest.DefaultBurst, }, } @@ -386,6 +422,8 @@ func TestValidateGoodAzureConfig(t *testing.T) { cfg.Provider = "azure" cfg.AnnotationPrefix = "external-dns.alpha.kubernetes.io/" cfg.AzureConfigFile = "/path/to/azure.json" + cfg.KubeAPIQPS = int(rest.DefaultQPS) + cfg.KubeAPIBurst = rest.DefaultBurst err := ValidateConfig(cfg) diff --git a/pkg/client/config.go b/pkg/client/config.go index 9172436ac..ab59a967a 100644 --- a/pkg/client/config.go +++ b/pkg/client/config.go @@ -19,6 +19,8 @@ limitations under the License. package kubeclient import ( + "context" + "fmt" "os" "time" @@ -26,11 +28,54 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/flowcontrol" + + "sigs.k8s.io/external-dns/pkg/apis/externaldns" extdnshttp "sigs.k8s.io/external-dns/pkg/http" ) -// GetRestConfig returns the REST client configuration for Kubernetes API access. +// InstrumentedRESTConfig builds a REST config with Prometheus transport metrics, request timeout, +// and a token-bucket rate limiter. When qps > 0, it overrides the client-go defaults (5 QPS / 10 burst). +func InstrumentedRESTConfig( + kubeConfig, apiServerURL string, + requestTimeout time.Duration, + qps int, burst int) (*rest.Config, error) { + config, err := buildRestConfig(kubeConfig, apiServerURL) + if err != nil { + return nil, err + } + + config.UserAgent = externaldns.UserAgent() + config.WrapTransport = extdnshttp.NewInstrumentedTransport + config.Timeout = requestTimeout + + config.QPS = rest.DefaultQPS + config.Burst = rest.DefaultBurst + if qps > 0 { + config.QPS = float32(qps) + } + if burst > 0 { + config.Burst = burst + } + log.Debugf("kube client qps: %f, burst %d", config.QPS, config.Burst) + config.RateLimiter = &rateLimiter{ + delegate: flowcontrol.NewTokenBucketRateLimiter(config.QPS, config.Burst), + } + return config, nil +} + +// NewKubeClient creates a Kubernetes client from the given REST config. +func NewKubeClient(config *rest.Config) (kubernetes.Interface, error) { + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + log.Infof("Created Kubernetes client %s", config.Host) + return client, nil +} + +// buildRestConfig returns the REST client configuration for Kubernetes API access. // Supports both in-cluster and external cluster configurations. // // Configuration Priority: @@ -38,7 +83,7 @@ import ( // 2. Recommended home file (~/.kube/config) // 3. In-cluster config // TODO: consider clientcmd.NewDefaultClientConfigLoadingRules() with clientcmd.NewNonInteractiveDeferredLoadingClientConfig -func GetRestConfig(kubeConfig, apiServerURL string) (*rest.Config, error) { +func buildRestConfig(kubeConfig, apiServerURL string) (*rest.Config, error) { if kubeConfig == "" { if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil { kubeConfig = clientcmd.RecommendedHomeFile @@ -66,38 +111,23 @@ func GetRestConfig(kubeConfig, apiServerURL string) (*rest.Config, error) { return config, nil } -// InstrumentedRESTConfig creates a REST config with request instrumentation for monitoring. -// Adds HTTP transport wrapper for Prometheus metrics collection and request timeout configuration. -// -// Metrics: Wraps the transport with pkg/http.NewInstrumentedTransport to collect -// HTTP request duration metrics for all Kubernetes API calls. -// -// Timeout: Applies the specified request timeout to prevent hanging requests. -func InstrumentedRESTConfig(kubeConfig, apiServerURL string, requestTimeout time.Duration) (*rest.Config, error) { - config, err := GetRestConfig(kubeConfig, apiServerURL) - if err != nil { - return nil, err - } - - config.WrapTransport = extdnshttp.NewInstrumentedTransport - config.Timeout = requestTimeout - - return config, nil +// rateLimiter wraps a RateLimiter and enriches Wait errors with an +// actionable hint so callers get a clear message without needing to inspect +// the error string themselves. +type rateLimiter struct { + delegate flowcontrol.RateLimiter } -// NewKubeClient returns a new Kubernetes client object. It takes a Config and -// uses APIServerURL and KubeConfig attributes to connect to the cluster. If -// KubeConfig isn't provided it defaults to using the recommended default. -func NewKubeClient(kubeConfig, apiServerURL string, requestTimeout time.Duration) (*kubernetes.Clientset, error) { - log.Infof("Instantiating new Kubernetes client") - config, err := InstrumentedRESTConfig(kubeConfig, apiServerURL, requestTimeout) - if err != nil { - return nil, err +func (r *rateLimiter) TryAccept() bool { return r.delegate.TryAccept() } +func (r *rateLimiter) Accept() { r.delegate.Accept() } +func (r *rateLimiter) Stop() { r.delegate.Stop() } +func (r *rateLimiter) QPS() float32 { return r.delegate.QPS() } + +// Wait blocks until a token is available or the context is done. +// Any error from Wait is a rate limit timeout; it is enriched with an actionable hint. +func (r *rateLimiter) Wait(ctx context.Context) error { + if err := r.delegate.Wait(ctx); err != nil { + return fmt.Errorf("consider raising --kube-api-qps/--kube-api-burst: %w", err) } - client, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - log.Infof("Created Kubernetes client %s", config.Host) - return client, nil + return nil } diff --git a/pkg/client/config_test.go b/pkg/client/config_test.go index 39fed2be4..cebb59416 100644 --- a/pkg/client/config_test.go +++ b/pkg/client/config_test.go @@ -17,6 +17,7 @@ limitations under the License. package kubeclient import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -27,80 +28,47 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/flowcontrol" ) func TestGetRestConfig_WithKubeConfig(t *testing.T) { svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) defer svr.Close() - mockKubeCfgDir := filepath.Join(t.TempDir(), ".kube") - mockKubeCfgPath := filepath.Join(mockKubeCfgDir, "config") - err := os.MkdirAll(mockKubeCfgDir, 0755) - require.NoError(t, err) + kubeCfgPath := writeKubeConfig(t, svr.URL) - kubeCfgTemplate := `apiVersion: v1 -kind: Config -clusters: -- cluster: - server: %s - name: test-cluster -contexts: -- context: - cluster: test-cluster - user: test-user - name: test-context -current-context: test-context -users: -- name: test-user - user: - token: fake-token -` - err = os.WriteFile(mockKubeCfgPath, fmt.Appendf(nil, kubeCfgTemplate, svr.URL), 0644) - require.NoError(t, err) - - config, err := GetRestConfig(mockKubeCfgPath, "") + config, err := buildRestConfig(kubeCfgPath, "") require.NoError(t, err) require.NotNil(t, config) assert.Equal(t, svr.URL, config.Host) } +func TestNewKubeClient(t *testing.T) { + svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) + defer svr.Close() + + config, err := InstrumentedRESTConfig(writeKubeConfig(t, svr.URL), "", 30*time.Second, 0, 0) + require.NoError(t, err) + + client, err := NewKubeClient(config) + require.NoError(t, err) + assert.NotNil(t, client) +} + func TestInstrumentedRESTConfig_AddsMetrics(t *testing.T) { svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) defer svr.Close() - mockKubeCfgDir := filepath.Join(t.TempDir(), ".kube") - mockKubeCfgPath := filepath.Join(mockKubeCfgDir, "config") - err := os.MkdirAll(mockKubeCfgDir, 0755) - require.NoError(t, err) - - kubeCfgTemplate := `apiVersion: v1 -kind: Config -clusters: -- cluster: - server: %s - name: test-cluster -contexts: -- context: - cluster: test-cluster - user: test-user - name: test-context -current-context: test-context -users: -- name: test-user - user: - token: fake-token -` - err = os.WriteFile(mockKubeCfgPath, fmt.Appendf(nil, kubeCfgTemplate, svr.URL), 0644) - require.NoError(t, err) - timeout := 30 * time.Second - config, err := InstrumentedRESTConfig(mockKubeCfgPath, "", timeout) + config, err := InstrumentedRESTConfig(writeKubeConfig(t, svr.URL), "", timeout, 0, 0) require.NoError(t, err) require.NotNil(t, config) assert.Equal(t, timeout, config.Timeout) assert.NotNil(t, config.WrapTransport, "WrapTransport should be set for metrics") + assert.NotNil(t, config.RateLimiter, "RateLimiter should always be set") } func TestGetRestConfig_RecommendedHomeFile(t *testing.T) { @@ -134,8 +102,103 @@ current-context: test-context }) clientcmd.RecommendedHomeFile = mockKubeCfgPath - config, err := GetRestConfig("", "") + config, err := buildRestConfig("", "") require.NoError(t, err) require.NotNil(t, config) assert.Equal(t, svr.URL, config.Host) } + +func TestInstrumentedRESTConfig_QPSAndBurstApplied(t *testing.T) { + svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) + defer svr.Close() + + kubeCfgPath := writeKubeConfig(t, svr.URL) + + config, err := InstrumentedRESTConfig(kubeCfgPath, "", 30*time.Second, 20, 40) + require.NoError(t, err) + require.NotNil(t, config) + + assert.Equal(t, 20, int(config.QPS)) + assert.Equal(t, 40, config.Burst) + assert.NotNil(t, config.RateLimiter) + assert.Equal(t, 20, int(config.RateLimiter.QPS())) +} + +func TestInstrumentedRESTConfig_ZeroQPSKeepsConfigDefaults(t *testing.T) { + svr := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) + defer svr.Close() + + kubeCfgPath := writeKubeConfig(t, svr.URL) + + config, err := InstrumentedRESTConfig(kubeCfgPath, "", 30*time.Second, 0, 0) + require.NoError(t, err) + require.NotNil(t, config) + + // qps == 0: client-go defaults applied; rate limiter still installed + assert.Equal(t, int(rest.DefaultQPS), int(config.QPS)) + assert.Equal(t, rest.DefaultBurst, config.Burst) + assert.NotNil(t, config.RateLimiter) + assert.Equal(t, int(rest.DefaultQPS), int(config.RateLimiter.QPS())) +} + +func TestEnrichingRateLimiter(t *testing.T) { + t.Run("delegates QPS and TryAccept", func(t *testing.T) { + rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(5, 10)} + assert.Equal(t, 5, int(rl.QPS())) + assert.True(t, rl.TryAccept(), "first TryAccept should succeed with non-zero burst") + }) + + t.Run("Accept does not panic", func(t *testing.T) { + rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(1000, 10)} + assert.NotPanics(t, rl.Accept) + }) + + t.Run("Stop does not panic", func(t *testing.T) { + rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(5, 10)} + assert.NotPanics(t, rl.Stop) + }) + + t.Run("Wait enriches error with actionable hint", func(t *testing.T) { + // burst=0: no tokens available; cancelled context triggers error immediately + rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(0.0001, 0)} + ctx, cancel := context.WithCancel(t.Context()) + cancel() + err := rl.Wait(ctx) + require.Error(t, err) + assert.Contains(t, err.Error(), "consider raising --kube-api-qps/--kube-api-burst") + }) + + t.Run("Wait returns nil when token is available", func(t *testing.T) { + // burst=1: one token available immediately + rl := &rateLimiter{delegate: flowcontrol.NewTokenBucketRateLimiter(100, 1)} + assert.NoError(t, rl.Wait(t.Context())) + }) +} + +// writeKubeConfig writes a minimal kubeconfig pointing at serverURL into a temp dir +// and returns the path. +func writeKubeConfig(t *testing.T, serverURL string) string { + t.Helper() + dir := filepath.Join(t.TempDir(), ".kube") + path := filepath.Join(dir, "config") + require.NoError(t, os.MkdirAll(dir, 0755)) + tmpl := `apiVersion: v1 +kind: Config +clusters: +- cluster: + server: %s + name: test-cluster +contexts: +- context: + cluster: test-cluster + user: test-user + name: test-context +current-context: test-context +users: +- name: test-user + user: + token: fake-token +` + require.NoError(t, os.WriteFile(path, fmt.Appendf(nil, tmpl, serverURL), 0644)) + return path +} diff --git a/source/crd.go b/source/crd.go index 48519a585..3ffbb9bd0 100644 --- a/source/crd.go +++ b/source/crd.go @@ -159,7 +159,8 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error dnsEndpoint.Status.ObservedGeneration = dnsEndpoint.Generation if err := cs.crWriter.Status().Update(ctx, dnsEndpoint); err != nil { - log.Warnf("Could not update ObservedGeneration of the CRD: %v", err) + log.Warnf("Could not update ObservedGeneration of [%s/%s/%s]: %v", + "dnsendpoint", dnsEndpoint.Namespace, dnsEndpoint.Name, err) } } diff --git a/source/informers/informers.go b/source/informers/informers.go index 6bcef7929..30383f480 100644 --- a/source/informers/informers.go +++ b/source/informers/informers.go @@ -26,7 +26,11 @@ import ( ) const ( - defaultRequestTimeout = 60 + // defaultTimeout is the maximum time in seconds to wait for informer caches + // to complete initial sync. This is intentionally longer than the per-request + // timeout: a cache sync may require multiple sequential API calls + // (LIST + Watch handshake), so the total wait needs to exceed a single request duration. + defaultTimeout = 60 ) type informerFactory interface { @@ -51,7 +55,7 @@ func waitForCacheSync[K comparable](ctx context.Context, waitFunc func(<-chan st // The function receives a ctx but then creates a new timeout, // effectively overriding whatever deadline the caller may have set. // If the caller passed a context with a 30s timeout, this function ignores it and waits 60s anyway. - timeout := defaultRequestTimeout * time.Second + timeout := defaultTimeout * time.Second ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() for typ, done := range waitFunc(ctx.Done()) { diff --git a/source/skipper_routegroup.go b/source/skipper_routegroup.go index 4290ba546..48e161964 100644 --- a/source/skipper_routegroup.go +++ b/source/skipper_routegroup.go @@ -206,7 +206,7 @@ func NewRouteGroupSource(cfg *Config, token, tokenPath, apiServerURL string) (So if routeGroupVersion == "" { routeGroupVersion = DefaultRoutegroupVersion } - cli := newRouteGroupClient(token, tokenPath, cfg.RequestTimeout) + cli := newRouteGroupClient(token, tokenPath, cfg.KubeAPIRequestTimeout) u, err := url.Parse(apiServerURL) if err != nil { diff --git a/source/store.go b/source/store.go index c6e771109..fa275e639 100644 --- a/source/store.go +++ b/source/store.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "os" "sync" "time" @@ -31,7 +30,6 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" "sigs.k8s.io/external-dns/pkg/apis/externaldns" @@ -86,7 +84,9 @@ type Config struct { ServiceTypeFilter []string GlooNamespaces []string SkipperRouteGroupVersion string - RequestTimeout time.Duration + KubeAPIRequestTimeout time.Duration + KubeAPIQPS int + KubeAPIBurst int DefaultTargets []string ForceDefaultTargets bool OCPRouterName string @@ -156,7 +156,9 @@ func NewSourceConfig(cfg *externaldns.Config, opts ...OverrideConfigOption) (*Co ServiceTypeFilter: cfg.ServiceTypeFilter, GlooNamespaces: cfg.GlooNamespaces, SkipperRouteGroupVersion: cfg.SkipperRouteGroupVersion, - RequestTimeout: cfg.RequestTimeout, + KubeAPIRequestTimeout: cfg.KubeAPIRequestTimeout, + KubeAPIQPS: cfg.KubeAPIQPS, + KubeAPIBurst: cfg.KubeAPIBurst, DefaultTargets: cfg.DefaultTargets, ForceDefaultTargets: cfg.ForceDefaultTargets, OCPRouterName: cfg.OCPRouterName, @@ -200,8 +202,10 @@ func (cfg *Config) ClientGenerator() ClientGenerator { if cfg.UpdateEvents { return 0 } - return cfg.RequestTimeout + return cfg.KubeAPIRequestTimeout }(), + QPS: cfg.KubeAPIQPS, + Burst: cfg.KubeAPIBurst, } } }) @@ -241,8 +245,8 @@ type ClientGenerator interface { // Memory Efficiency: Prevents creating multiple instances of expensive client objects // that maintain their own connection pools and caches. // -// Configuration: Clients are configured using KubeConfig, APIServerURL, and RequestTimeout -// which are set during SingletonClientGenerator initialization. +// Configuration: Clients are configured using KubeConfig, APIServerURL, RequestTimeout, +// QPS, and Burst which are set during SingletonClientGenerator initialization. // // TODO: Fix error handling pattern in client methods. Current implementation has a bug where // errors are only returned on the first call due to sync.Once behavior. If initialization fails @@ -269,6 +273,8 @@ type SingletonClientGenerator struct { KubeConfig string APIServerURL string RequestTimeout time.Duration + QPS int + Burst int restConfig *rest.Config kubeClient kubernetes.Interface gatewayClient gateway.Interface @@ -287,7 +293,12 @@ type SingletonClientGenerator struct { func (p *SingletonClientGenerator) KubeClient() (kubernetes.Interface, error) { var err error p.kubeOnce.Do(func() { - p.kubeClient, err = kubeclient.NewKubeClient(p.KubeConfig, p.APIServerURL, p.RequestTimeout) + var config *rest.Config + config, err = p.RESTConfig() + if err != nil { + return + } + p.kubeClient, err = kubeclient.NewKubeClient(config) }) return p.kubeClient, err } @@ -298,7 +309,7 @@ func (p *SingletonClientGenerator) KubeClient() (kubernetes.Interface, error) { func (p *SingletonClientGenerator) RESTConfig() (*rest.Config, error) { var err error p.restConfigOnce.Do(func() { - p.restConfig, err = kubeclient.InstrumentedRESTConfig(p.KubeConfig, p.APIServerURL, p.RequestTimeout) + p.restConfig, err = kubeclient.InstrumentedRESTConfig(p.KubeConfig, p.APIServerURL, p.RequestTimeout, p.QPS, p.Burst) }) return p.restConfig, err } @@ -325,7 +336,12 @@ func (p *SingletonClientGenerator) GatewayClient() (gateway.Interface, error) { func (p *SingletonClientGenerator) IstioClient() (istioclient.Interface, error) { var err error p.istioOnce.Do(func() { - p.istioClient, err = NewIstioClient(p.KubeConfig, p.APIServerURL) + var config *rest.Config + config, err = p.RESTConfig() + if err != nil { + return + } + p.istioClient, err = NewIstioClient(config) }) return p.istioClient, err } @@ -451,7 +467,7 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg case types.CRD: return buildCRDSource(ctx, p, cfg) case types.SkipperRouteGroup: - return buildSkipperRouteGroupSource(ctx, cfg) + return buildSkipperRouteGroupSource(ctx, p, cfg) case types.KongTCPIngress: return buildKongTCPIngressSource(ctx, p, cfg) case types.F5VirtualServer: @@ -627,11 +643,11 @@ func buildCRDSource(ctx context.Context, p ClientGenerator, cfg *Config) (Source // buildSkipperRouteGroupSource creates a Skipper RouteGroup source for exposing route groups as DNS records. // Special case: Does not use ClientGenerator pattern, instead manages its own authentication. // Retrieves bearer token from REST config for API server authentication. -func buildSkipperRouteGroupSource(_ context.Context, cfg *Config) (Source, error) { +func buildSkipperRouteGroupSource(_ context.Context, p ClientGenerator, cfg *Config) (Source, error) { apiServerURL := cfg.APIServerURL tokenPath := "" token := "" - restConfig, err := kubeclient.GetRestConfig(cfg.KubeConfig, cfg.APIServerURL) + restConfig, err := p.RESTConfig() if err == nil { apiServerURL = restConfig.Host tokenPath = restConfig.BearerTokenFile @@ -688,26 +704,8 @@ func buildUnstructuredSource(ctx context.Context, p ClientGenerator, cfg *Config return NewUnstructuredFQDNSource(ctx, dynamicClient, kubeClient, cfg) } -// NewIstioClient returns a new Istio client object. It uses the configured -// KubeConfig attribute to connect to the cluster. If KubeConfig isn't provided -// it defaults to using the recommended default. -// NB: Istio controls the creation of the underlying Kubernetes client, so we -// have no ability to tack on transport wrappers (e.g., Prometheus request -// wrappers) to the client's config at this level. Furthermore, the Istio client -// constructor does not expose the ability to override the Kubernetes API server endpoint, -// so the apiServerURL config attribute has no effect. -func NewIstioClient(kubeConfig string, apiServerURL string) (*istioclient.Clientset, error) { - if kubeConfig == "" { - if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil { - kubeConfig = clientcmd.RecommendedHomeFile - } - } - - restCfg, err := clientcmd.BuildConfigFromFlags(apiServerURL, kubeConfig) - if err != nil { - return nil, err - } - +// NewIstioClient returns a new Istio client object from the given REST config. +func NewIstioClient(restCfg *rest.Config) (*istioclient.Clientset, error) { ic, err := istioclient.NewForConfig(restCfg) if err != nil { return nil, fmt.Errorf("failed to create istio client: %w", err) diff --git a/source/store_test.go b/source/store_test.go index 14964e954..d5f1a31bf 100644 --- a/source/store_test.go +++ b/source/store_test.go @@ -211,9 +211,9 @@ func TestBuildWithConfig_InvalidSource(t *testing.T) { func TestConfig_ClientGenerator_Caching(t *testing.T) { cfg := &Config{ - KubeConfig: "/path/to/kubeconfig", - APIServerURL: "https://api.example.com", - RequestTimeout: 30 * time.Second, + KubeConfig: "/path/to/kubeconfig", + APIServerURL: "https://api.example.com", + KubeAPIRequestTimeout: 30 * time.Second, } gen1 := cfg.ClientGenerator() @@ -273,7 +273,7 @@ func TestSingletonClientGenerator_RESTConfig_TimeoutPropagation(t *testing.T) { // TestConfig_ClientGenerator_RESTConfig_Integration verifies Config → ClientGenerator → RESTConfig flow func TestConfig_ClientGenerator_RESTConfig_Integration(t *testing.T) { t.Run("normal timeout is propagated", func(t *testing.T) { - cfg := &Config{RequestTimeout: 45 * time.Second} + cfg := &Config{KubeAPIRequestTimeout: 45 * time.Second} config, err := cfg.ClientGenerator().RESTConfig() if err == nil { @@ -283,7 +283,7 @@ func TestConfig_ClientGenerator_RESTConfig_Integration(t *testing.T) { }) t.Run("UpdateEvents sets timeout to zero", func(t *testing.T) { - cfg := &Config{RequestTimeout: 45 * time.Second, UpdateEvents: true} + cfg := &Config{KubeAPIRequestTimeout: 45 * time.Second, UpdateEvents: true} config, err := cfg.ClientGenerator().RESTConfig() if err == nil { @@ -393,3 +393,27 @@ func TestNewSourceConfig(t *testing.T) { }) } } + +func TestKubeAPIRateLimitPropagation(t *testing.T) { + t.Run("NewSourceConfig propagates QPS and burst", func(t *testing.T) { + cfg := &externaldns.Config{ + KubeAPIQPS: 20, + KubeAPIBurst: 40, + } + got, err := NewSourceConfig(cfg) + require.NoError(t, err) + assert.Equal(t, 20, got.KubeAPIQPS) + assert.Equal(t, 40, got.KubeAPIBurst) + }) + + t.Run("ClientGenerator wires QPS and burst into SingletonClientGenerator", func(t *testing.T) { + cfg := &Config{ + KubeAPIQPS: 15, + KubeAPIBurst: 30, + } + scg, ok := cfg.ClientGenerator().(*SingletonClientGenerator) + require.True(t, ok) + assert.Equal(t, 15, scg.QPS) + assert.Equal(t, 30, scg.Burst) + }) +}