Add cache at provider level

**Description**

In the current implementation, DNS providers are called to list all
records on every loop. This is expensive in terms of number of requests
to the provider and may result in being rate limited, as reported in 1293
and 3397.

In our case, we have approximately 20,000 records in our AWS Hosted Zone.
The ListResourceRecordSets API call allows a maximum of 300 items per call.
That requires 67 API calls per external-dns deployment during every sync period

With this, we introduce an optional generic caching mechanism at the provider
level, that re-uses the latest known list of records for a given time.

This prevents from expensive Provider calls to list all records for each
object modification that does not change the actual record (annotations,
statuses, ingress routing, ...)

This introduces 2 trade-offs:

1. Any changes or corruption directly on the provider side will be
longer to detect and to resolve, up to the cache time

2. Any conflicting records in the DNS provider (such as a different
external-dns instance) injected during the cache validity will cause
the first iteration of the next reconcile loop to fail, and hence add a
delay until the next retry

**Checklist**

- [X] Unit tests updated
- [X] End user documentation updated

Change-Id: I0bdcfa994ac1b76acedb05d458a97c080284c5aa
This commit is contained in:
Thibault Jamet 2024-07-08 09:19:12 +02:00
parent c875e65d8d
commit 089744c6ff
No known key found for this signature in database
GPG Key ID: 9D28A304A3810C17
5 changed files with 249 additions and 0 deletions

View File

@ -912,6 +912,8 @@ Route53 has a [5 API requests per second per account hard quota](https://docs.aw
Running several fast polling ExternalDNS instances in a given account can easily hit that limit. Some ways to reduce the request rate include:
* Reduce the polling loop's synchronization interval at the possible cost of slower change propagation (but see `--events` below to reduce the impact).
* `--interval=5m` (default `1m`)
* Cache the results of the zone at the possible cost of slower propagation when the zone gets modified from other sources
* `--provider-cache-time=15m` (default `0m`)
* Trigger the polling loop on changes to K8s objects, rather than only at `interval` and ensure a minimum of time between events, to have responsive updates with long poll intervals
* `--events`
* `--min-event-sync-interval=5m` (default `5s`)

View File

@ -401,6 +401,13 @@ func main() {
os.Exit(0)
}
if cfg.ProviderCacheTime > 0 {
p = &provider.CachedProvider{
Provider: p,
RefreshDelay: cfg.ProviderCacheTime,
}
}
var r registry.Registry
switch cfg.Registry {
case "dynamodb":

View File

@ -67,6 +67,7 @@ type Config struct {
AlwaysPublishNotReadyAddresses bool
ConnectorSourceServer string
Provider string
ProviderCacheTime int
GoogleProject string
GoogleBatchChangeSize int
GoogleBatchChangeInterval time.Duration
@ -239,6 +240,7 @@ var defaultConfig = &Config{
PublishHostIP: false,
ConnectorSourceServer: "localhost:8080",
Provider: "",
ProviderCacheTime: 0,
GoogleProject: "",
GoogleBatchChangeSize: 1000,
GoogleBatchChangeInterval: time.Second,
@ -456,6 +458,7 @@ func (cfg *Config) ParseFlags(args []string) error {
// Flags related to providers
providers := []string{"akamai", "alibabacloud", "aws", "aws-sd", "azure", "azure-dns", "azure-private-dns", "bluecat", "civo", "cloudflare", "coredns", "designate", "digitalocean", "dnsimple", "dyn", "exoscale", "gandi", "godaddy", "google", "ibmcloud", "inmemory", "linode", "ns1", "oci", "ovh", "pdns", "pihole", "plural", "rcodezero", "rdns", "rfc2136", "safedns", "scaleway", "skydns", "tencentcloud", "transip", "ultradns", "vinyldns", "vultr", "webhook"}
app.Flag("provider", "The DNS provider where the DNS records will be created (required, options: "+strings.Join(providers, ", ")+")").Required().PlaceHolder("provider").EnumVar(&cfg.Provider, providers...)
app.Flag("provider-cache-time", "The time to cache the DNS provider record list requests.").Default(defaultConfig.ProviderCacheTime.String()).DurationVar(&cfg.ProviderCacheTime)
app.Flag("domain-filter", "Limit possible target zones by a domain suffix; specify multiple times for multiple domains (optional)").Default("").StringsVar(&cfg.DomainFilter)
app.Flag("exclude-domains", "Exclude subdomains (optional)").Default("").StringsVar(&cfg.ExcludeDomains)
app.Flag("regex-domain-filter", "Limit possible domains and target zones by a Regex filter; Overrides domain-filter (optional)").Default(defaultConfig.RegexDomainFilter.String()).RegexpVar(&cfg.RegexDomainFilter)

View File

@ -0,0 +1,73 @@
package provider
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/plan"
)
var (
cachedRecordsCallsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "external_dns",
Subsystem: "provider",
Name: "cache_records_calls",
Help: "Number of calls to the provider cache Records list.",
},
[]string{
"from_cache",
},
)
cachedApplyChangesCallsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "external_dns",
Subsystem: "provider",
Name: "cache_apply_changes_calls",
Help: "Number of calls to the provider cache ApplyChanges.",
},
)
)
type CachedProvider struct {
Provider
RefreshDelay time.Duration
err error
lastRead time.Time
cache []*endpoint.Endpoint
}
func (c *CachedProvider) Records(ctx context.Context) ([]*endpoint.Endpoint, error) {
if c.needRefresh() {
c.cache, c.err = c.Provider.Records(ctx)
c.lastRead = time.Now()
cachedRecordsCallsTotal.WithLabelValues("false").Inc()
} else {
cachedRecordsCallsTotal.WithLabelValues("true").Inc()
}
return c.cache, c.err
}
func (c *CachedProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
c.Reset()
cachedApplyChangesCallsTotal.Inc()
return c.Provider.ApplyChanges(ctx, changes)
}
func (c *CachedProvider) Reset() {
c.err = nil
c.cache = nil
c.lastRead = time.Time{}
}
func (c *CachedProvider) needRefresh() bool {
if c.cache == nil || c.err != nil {
return true
}
return time.Now().After(c.lastRead.Add(c.RefreshDelay))
}
func init() {
prometheus.MustRegister(cachedRecordsCallsTotal)
}

View File

@ -0,0 +1,164 @@
package provider
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/plan"
)
type testProviderFunc struct {
records func(ctx context.Context) ([]*endpoint.Endpoint, error)
applyChanges func(ctx context.Context, changes *plan.Changes) error
propertyValuesEqual func(name string, previous string, current string) bool
adjustEndpoints func(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint
getDomainFilter func() endpoint.DomainFilterInterface
}
func (p *testProviderFunc) Records(ctx context.Context) ([]*endpoint.Endpoint, error) {
return p.records(ctx)
}
func (p *testProviderFunc) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
return p.applyChanges(ctx, changes)
}
func (p *testProviderFunc) PropertyValuesEqual(name string, previous string, current string) bool {
return p.propertyValuesEqual(name, previous, current)
}
func (p *testProviderFunc) AdjustEndpoints(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint {
return p.adjustEndpoints(endpoints)
}
func (p *testProviderFunc) GetDomainFilter() endpoint.DomainFilterInterface {
return p.getDomainFilter()
}
func recordsNotCalled(t *testing.T) func(ctx context.Context) ([]*endpoint.Endpoint, error) {
return func(ctx context.Context) ([]*endpoint.Endpoint, error) {
t.Errorf("unexpected call to Records")
return nil, nil
}
}
func applyChangesNotCalled(t *testing.T) func(ctx context.Context, changes *plan.Changes) error {
return func(ctx context.Context, changes *plan.Changes) error {
t.Errorf("unexpected call to ApplyChanges")
return nil
}
}
func propertyValuesEqualNotCalled(t *testing.T) func(name string, previous string, current string) bool {
return func(name string, previous string, current string) bool {
t.Errorf("unexpected call to PropertyValuesEqual")
return false
}
}
func adjustEndpointsNotCalled(t *testing.T) func(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint {
return func(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint {
t.Errorf("unexpected call to AdjustEndpoints")
return endpoints
}
}
func newTestProviderFunc(t *testing.T) *testProviderFunc {
return &testProviderFunc{
records: recordsNotCalled(t),
applyChanges: applyChangesNotCalled(t),
propertyValuesEqual: propertyValuesEqualNotCalled(t),
adjustEndpoints: adjustEndpointsNotCalled(t),
}
}
func TestCachedProviderCallsProviderOnFirstCall(t *testing.T) {
testProvider := newTestProviderFunc(t)
testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) {
return []*endpoint.Endpoint{{DNSName: "domain.fqdn"}}, nil
}
provider := CachedProvider{
Provider: testProvider,
}
endpoints, err := provider.Records(context.Background())
assert.NoError(t, err)
require.NotNil(t, endpoints)
require.Len(t, endpoints, 1)
require.NotNil(t, endpoints[0])
assert.Equal(t, "domain.fqdn", endpoints[0].DNSName)
}
func TestCachedProviderUsesCacheWhileValid(t *testing.T) {
testProvider := newTestProviderFunc(t)
testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) {
return []*endpoint.Endpoint{{DNSName: "domain.fqdn"}}, nil
}
provider := CachedProvider{
RefreshDelay: 30 * time.Second,
Provider: testProvider,
}
_, err := provider.Records(context.Background())
require.NoError(t, err)
t.Run("With consecutive calls within the caching time frame", func(t *testing.T) {
testProvider.records = recordsNotCalled(t)
endpoints, err := provider.Records(context.Background())
assert.NoError(t, err)
require.NotNil(t, endpoints)
require.Len(t, endpoints, 1)
require.NotNil(t, endpoints[0])
assert.Equal(t, "domain.fqdn", endpoints[0].DNSName)
})
t.Run("When the caching time frame is exceeded", func(t *testing.T) {
testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) {
return []*endpoint.Endpoint{{DNSName: "new.domain.fqdn"}}, nil
}
provider.lastRead = time.Now().Add(-20 * time.Minute)
endpoints, err := provider.Records(context.Background())
assert.NoError(t, err)
require.NotNil(t, endpoints)
require.Len(t, endpoints, 1)
require.NotNil(t, endpoints[0])
assert.Equal(t, "new.domain.fqdn", endpoints[0].DNSName)
})
}
func TestCachedProviderForcesCacheRefreshOnUpdate(t *testing.T) {
testProvider := newTestProviderFunc(t)
testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) {
return []*endpoint.Endpoint{{DNSName: "domain.fqdn"}}, nil
}
provider := CachedProvider{
RefreshDelay: 30 * time.Second,
Provider: testProvider,
}
_, err := provider.Records(context.Background())
require.NoError(t, err)
t.Run("When the caching time frame is exceeded", func(t *testing.T) {
testProvider.records = recordsNotCalled(t)
testProvider.applyChanges = func(ctx context.Context, changes *plan.Changes) error {
return nil
}
err := provider.ApplyChanges(context.Background(), &plan.Changes{})
assert.NoError(t, err)
t.Run("Next call to Records is not cached", func(t *testing.T) {
testProvider.applyChanges = applyChangesNotCalled(t)
testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) {
return []*endpoint.Endpoint{{DNSName: "new.domain.fqdn"}}, nil
}
endpoints, err := provider.Records(context.Background())
assert.NoError(t, err)
require.NotNil(t, endpoints)
require.Len(t, endpoints, 1)
require.NotNil(t, endpoints[0])
assert.Equal(t, "new.domain.fqdn", endpoints[0].DNSName)
})
})
}