mirror of
https://github.com/kubernetes-sigs/external-dns.git
synced 2025-08-06 09:36:58 +02:00
add cache to limit calls to Records()
Signed-off-by: Jess Frazelle <acidburn@microsoft.com>
This commit is contained in:
parent
e7cbc5239c
commit
4759789ac8
2
main.go
2
main.go
@ -158,7 +158,7 @@ func main() {
|
||||
case "noop":
|
||||
r, err = registry.NewNoopRegistry(p)
|
||||
case "txt":
|
||||
r, err = registry.NewTXTRegistry(p, cfg.TXTPrefix, cfg.TXTOwnerID)
|
||||
r, err = registry.NewTXTRegistry(p, cfg.TXTPrefix, cfg.TXTOwnerID, cfg.TXTCacheInterval)
|
||||
case "aws-sd":
|
||||
r, err = registry.NewAWSSDRegistry(p.(*provider.AWSSDProvider), cfg.TXTOwnerID)
|
||||
default:
|
||||
|
@ -78,6 +78,7 @@ type Config struct {
|
||||
LogFormat string
|
||||
MetricsAddress string
|
||||
LogLevel string
|
||||
TXTCacheInterval time.Duration
|
||||
}
|
||||
|
||||
var defaultConfig = &Config{
|
||||
@ -112,6 +113,7 @@ var defaultConfig = &Config{
|
||||
Registry: "txt",
|
||||
TXTOwnerID: "default",
|
||||
TXTPrefix: "",
|
||||
TXTCacheInterval: time.Hour,
|
||||
Interval: time.Minute,
|
||||
Once: false,
|
||||
DryRun: false,
|
||||
@ -201,6 +203,7 @@ func (cfg *Config) ParseFlags(args []string) error {
|
||||
app.Flag("txt-prefix", "When using the TXT registry, a custom string that's prefixed to each ownership DNS record (optional)").Default(defaultConfig.TXTPrefix).StringVar(&cfg.TXTPrefix)
|
||||
|
||||
// Flags related to the main control loop
|
||||
app.Flag("txt-cache-interval", "The interval between cache synchronizations in duration format (default: 1h)").Default(defaultConfig.TXTCacheInterval.String()).DurationVar(&cfg.TXTCacheInterval)
|
||||
app.Flag("interval", "The interval between two consecutive synchronizations in duration format (default: 1m)").Default(defaultConfig.Interval.String()).DurationVar(&cfg.Interval)
|
||||
app.Flag("once", "When enabled, exits the synchronization loop after the first iteration (default: disabled)").BoolVar(&cfg.Once)
|
||||
app.Flag("dry-run", "When enabled, prints DNS record changes rather than actually performing them (default: disabled)").BoolVar(&cfg.DryRun)
|
||||
|
@ -57,6 +57,7 @@ var (
|
||||
Registry: "txt",
|
||||
TXTOwnerID: "default",
|
||||
TXTPrefix: "",
|
||||
TXTCacheInterval: time.Hour,
|
||||
Interval: time.Minute,
|
||||
Once: false,
|
||||
DryRun: false,
|
||||
@ -95,6 +96,7 @@ var (
|
||||
Registry: "noop",
|
||||
TXTOwnerID: "owner-1",
|
||||
TXTPrefix: "associated-txt-record",
|
||||
TXTCacheInterval: 12 * time.Hour,
|
||||
Interval: 10 * time.Minute,
|
||||
Once: true,
|
||||
DryRun: true,
|
||||
@ -157,6 +159,7 @@ func TestParseFlags(t *testing.T) {
|
||||
"--registry=noop",
|
||||
"--txt-owner-id=owner-1",
|
||||
"--txt-prefix=associated-txt-record",
|
||||
"--txt-cache-interval=12h",
|
||||
"--interval=10m",
|
||||
"--once",
|
||||
"--dry-run",
|
||||
@ -200,6 +203,7 @@ func TestParseFlags(t *testing.T) {
|
||||
"EXTERNAL_DNS_REGISTRY": "noop",
|
||||
"EXTERNAL_DNS_TXT_OWNER_ID": "owner-1",
|
||||
"EXTERNAL_DNS_TXT_PREFIX": "associated-txt-record",
|
||||
"EXTERNAL_DNS_TXT_CACHE_INTERVAL": "12h",
|
||||
"EXTERNAL_DNS_INTERVAL": "10m",
|
||||
"EXTERNAL_DNS_ONCE": "1",
|
||||
"EXTERNAL_DNS_DRY_RUN": "1",
|
||||
|
@ -18,12 +18,14 @@ package registry
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"strings"
|
||||
|
||||
"github.com/kubernetes-incubator/external-dns/endpoint"
|
||||
"github.com/kubernetes-incubator/external-dns/plan"
|
||||
"github.com/kubernetes-incubator/external-dns/provider"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// TXTRegistry implements registry interface with ownership implemented via associated TXT records
|
||||
@ -31,10 +33,15 @@ type TXTRegistry struct {
|
||||
provider provider.Provider
|
||||
ownerID string //refers to the owner id of the current instance
|
||||
mapper nameMapper
|
||||
|
||||
// cache the records in memory and update on an interval instead.
|
||||
recordsCache []*endpoint.Endpoint
|
||||
recordsCacheRefreshTime time.Time
|
||||
cacheInterval time.Duration
|
||||
}
|
||||
|
||||
// NewTXTRegistry returns new TXTRegistry object
|
||||
func NewTXTRegistry(provider provider.Provider, txtPrefix, ownerID string) (*TXTRegistry, error) {
|
||||
func NewTXTRegistry(provider provider.Provider, txtPrefix, ownerID string, cacheInterval time.Duration) (*TXTRegistry, error) {
|
||||
if ownerID == "" {
|
||||
return nil, errors.New("owner id cannot be empty")
|
||||
}
|
||||
@ -42,9 +49,10 @@ func NewTXTRegistry(provider provider.Provider, txtPrefix, ownerID string) (*TXT
|
||||
mapper := newPrefixNameMapper(txtPrefix)
|
||||
|
||||
return &TXTRegistry{
|
||||
provider: provider,
|
||||
ownerID: ownerID,
|
||||
mapper: mapper,
|
||||
provider: provider,
|
||||
ownerID: ownerID,
|
||||
mapper: mapper,
|
||||
cacheInterval: cacheInterval,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -52,6 +60,13 @@ func NewTXTRegistry(provider provider.Provider, txtPrefix, ownerID string) (*TXT
|
||||
// If TXT records was created previously to indicate ownership its corresponding value
|
||||
// will be added to the endpoints Labels map
|
||||
func (im *TXTRegistry) Records() ([]*endpoint.Endpoint, error) {
|
||||
// If we have the zones cached AND we have refreshed the cache since the
|
||||
// last given interval, then just use the cached results.
|
||||
if im.recordsCache != nil && time.Since(im.recordsCacheRefreshTime) < im.cacheInterval {
|
||||
log.Debug("Using cached records.")
|
||||
return im.recordsCache, nil
|
||||
}
|
||||
|
||||
records, err := im.provider.Records()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -91,6 +106,10 @@ func (im *TXTRegistry) Records() ([]*endpoint.Endpoint, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Update the cache.
|
||||
im.recordsCache = endpoints
|
||||
im.recordsCacheRefreshTime = time.Now()
|
||||
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
@ -107,6 +126,11 @@ func (im *TXTRegistry) ApplyChanges(changes *plan.Changes) error {
|
||||
r.Labels[endpoint.OwnerLabelKey] = im.ownerID
|
||||
txt := endpoint.NewEndpoint(im.mapper.toTXTName(r.DNSName), endpoint.RecordTypeTXT, r.Labels.Serialize(true))
|
||||
filteredChanges.Create = append(filteredChanges.Create, txt)
|
||||
|
||||
// Add to the cache.
|
||||
if im.recordsCache != nil {
|
||||
im.recordsCache = append(im.recordsCache, txt)
|
||||
}
|
||||
}
|
||||
|
||||
for _, r := range filteredChanges.Delete {
|
||||
@ -115,12 +139,18 @@ func (im *TXTRegistry) ApplyChanges(changes *plan.Changes) error {
|
||||
// when we delete TXT records for which value has changed (due to new label) this would still work because
|
||||
// !!! TXT record value is uniquely generated from the Labels of the endpoint. Hence old TXT record can be uniquely reconstructed
|
||||
filteredChanges.Delete = append(filteredChanges.Delete, txt)
|
||||
|
||||
// Remove from the cache.
|
||||
im.removeFromCache(txt)
|
||||
}
|
||||
|
||||
// make sure TXT records are consistently updated as well
|
||||
for _, r := range filteredChanges.UpdateNew {
|
||||
txt := endpoint.NewEndpoint(im.mapper.toTXTName(r.DNSName), endpoint.RecordTypeTXT, r.Labels.Serialize(true))
|
||||
filteredChanges.UpdateNew = append(filteredChanges.UpdateNew, txt)
|
||||
|
||||
// Update the cache.
|
||||
im.updateCache(txt)
|
||||
}
|
||||
// make sure TXT records are consistently updated as well
|
||||
for _, r := range filteredChanges.UpdateOld {
|
||||
@ -128,6 +158,9 @@ func (im *TXTRegistry) ApplyChanges(changes *plan.Changes) error {
|
||||
// when we updateOld TXT records for which value has changed (due to new label) this would still work because
|
||||
// !!! TXT record value is uniquely generated from the Labels of the endpoint. Hence old TXT record can be uniquely reconstructed
|
||||
filteredChanges.UpdateOld = append(filteredChanges.UpdateOld, txt)
|
||||
|
||||
// Update the cache.
|
||||
im.updateCache(txt)
|
||||
}
|
||||
|
||||
return im.provider.ApplyChanges(filteredChanges)
|
||||
@ -167,3 +200,36 @@ func (pr prefixNameMapper) toEndpointName(txtDNSName string) string {
|
||||
func (pr prefixNameMapper) toTXTName(endpointDNSName string) string {
|
||||
return pr.prefix + endpointDNSName
|
||||
}
|
||||
|
||||
func (im *TXTRegistry) removeFromCache(txt *endpoint.Endpoint) {
|
||||
if im.recordsCache == nil || txt == nil {
|
||||
// return early.
|
||||
return
|
||||
}
|
||||
|
||||
for i, e := range im.recordsCache {
|
||||
if e.DNSName == txt.DNSName && e.RecordType == txt.RecordType {
|
||||
// We found a match delete the endpoint from the cache.
|
||||
im.recordsCache = append(im.recordsCache[:i], im.recordsCache[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (im *TXTRegistry) updateCache(txt *endpoint.Endpoint) {
|
||||
if im.recordsCache == nil || txt == nil {
|
||||
// return early.
|
||||
return
|
||||
}
|
||||
|
||||
for i, e := range im.recordsCache {
|
||||
if e.DNSName == txt.DNSName && e.RecordType == txt.RecordType {
|
||||
// We found a match update the endpoint in the cache.
|
||||
im.recordsCache[i] = txt
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// We couldn't find a match so let's just add it to the cache.
|
||||
im.recordsCache = append(im.recordsCache, txt)
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package registry
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kubernetes-incubator/external-dns/endpoint"
|
||||
"github.com/kubernetes-incubator/external-dns/internal/testutils"
|
||||
@ -40,10 +41,10 @@ func TestTXTRegistry(t *testing.T) {
|
||||
|
||||
func testTXTRegistryNew(t *testing.T) {
|
||||
p := provider.NewInMemoryProvider()
|
||||
_, err := NewTXTRegistry(p, "txt", "")
|
||||
_, err := NewTXTRegistry(p, "txt", "", time.Hour)
|
||||
require.Error(t, err)
|
||||
|
||||
r, err := NewTXTRegistry(p, "txt", "owner")
|
||||
r, err := NewTXTRegistry(p, "txt", "owner", time.Hour)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, ok := r.mapper.(prefixNameMapper)
|
||||
@ -51,7 +52,7 @@ func testTXTRegistryNew(t *testing.T) {
|
||||
assert.Equal(t, "owner", r.ownerID)
|
||||
assert.Equal(t, p, r.provider)
|
||||
|
||||
r, err = NewTXTRegistry(p, "", "owner")
|
||||
r, err = NewTXTRegistry(p, "", "owner", time.Hour)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, ok = r.mapper.(prefixNameMapper)
|
||||
@ -130,7 +131,7 @@ func testTXTRegistryRecordsPrefixed(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
r, _ := NewTXTRegistry(p, "txt.", "owner")
|
||||
r, _ := NewTXTRegistry(p, "txt.", "owner", time.Hour)
|
||||
records, _ := r.Records()
|
||||
|
||||
assert.True(t, testutils.SameEndpoints(records, expectedRecords))
|
||||
@ -204,7 +205,7 @@ func testTXTRegistryRecordsNoPrefix(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
r, _ := NewTXTRegistry(p, "", "owner")
|
||||
r, _ := NewTXTRegistry(p, "", "owner", time.Hour)
|
||||
records, _ := r.Records()
|
||||
|
||||
assert.True(t, testutils.SameEndpoints(records, expectedRecords))
|
||||
@ -231,7 +232,7 @@ func testTXTRegistryApplyChangesWithPrefix(t *testing.T) {
|
||||
newEndpointWithOwner("txt.foobar.test-zone.example.org", "\"heritage=external-dns,external-dns/owner=owner\"", endpoint.RecordTypeTXT, ""),
|
||||
},
|
||||
})
|
||||
r, _ := NewTXTRegistry(p, "txt.", "owner")
|
||||
r, _ := NewTXTRegistry(p, "txt.", "owner", time.Hour)
|
||||
|
||||
changes := &plan.Changes{
|
||||
Create: []*endpoint.Endpoint{
|
||||
@ -300,7 +301,7 @@ func testTXTRegistryApplyChangesNoPrefix(t *testing.T) {
|
||||
newEndpointWithOwner("foobar.test-zone.example.org", "\"heritage=external-dns,external-dns/owner=owner\"", endpoint.RecordTypeTXT, ""),
|
||||
},
|
||||
})
|
||||
r, _ := NewTXTRegistry(p, "", "owner")
|
||||
r, _ := NewTXTRegistry(p, "", "owner", time.Hour)
|
||||
|
||||
changes := &plan.Changes{
|
||||
Create: []*endpoint.Endpoint{
|
||||
@ -347,6 +348,55 @@ func testTXTRegistryApplyChangesNoPrefix(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestCacheMethods(t *testing.T) {
|
||||
cache := []*endpoint.Endpoint{
|
||||
newEndpointWithOwner("thing.com", "1.2.3.4", "A", "owner"),
|
||||
newEndpointWithOwner("thing1.com", "1.2.3.6", "A", "owner"),
|
||||
newEndpointWithOwner("thing2.com", "1.2.3.4", "CNAME", "owner"),
|
||||
newEndpointWithOwner("thing3.com", "1.2.3.4", "A", "owner"),
|
||||
newEndpointWithOwner("thing4.com", "1.2.3.4", "A", "owner"),
|
||||
}
|
||||
registry := &TXTRegistry{
|
||||
recordsCache: cache,
|
||||
cacheInterval: time.Hour,
|
||||
}
|
||||
|
||||
// test updating a record.
|
||||
registry.updateCache(newEndpointWithOwner("thing.com", "1.2.3.6", "A", "owner2"))
|
||||
found := false
|
||||
// ensure it was updated
|
||||
for _, e := range registry.recordsCache {
|
||||
if e.DNSName == "thing.com" && e.RecordType == "A" {
|
||||
t.Logf("targets: %#v", e.Targets)
|
||||
if e.Targets.Same([]string{"1.2.3.6"}) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Fatal("could not find updated record in cache")
|
||||
}
|
||||
|
||||
// test deleting a record
|
||||
registry.removeFromCache(newEndpointWithOwner("thing.com", "1.2.3.6", "A", "owner2"))
|
||||
// ensure it was deleted
|
||||
found = false
|
||||
for _, e := range registry.recordsCache {
|
||||
if e.DNSName == "thing.com" && e.RecordType == "A" {
|
||||
if e.Targets.Same([]string{"1.2.3.6"}) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
t.Fatal("should not have been able to find record after deleting")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
helper methods
|
||||
|
Loading…
Reference in New Issue
Block a user