Merge pull request #3648 from johngmyers/dynamodb

Add DynamoDB registry implementation
This commit is contained in:
Kubernetes Prow Robot 2023-06-23 13:03:07 -07:00 committed by GitHub
commit ae0c06e3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1723 additions and 493 deletions

41
docs/registry/dynamodb.md Normal file
View File

@ -0,0 +1,41 @@
# The DynamoDB registry
The DynamoDB registry stores DNS record metadata in an AWS DynamoDB table.
## The DynamoDB Table
By default, the DynamoDB registry stores data in the table named `external-dns`.
A different table may be specified using the `--dynamodb-table` flag.
A different region may be specified using the `--dynamodb-region` flag.
The table must have a partition (hash) key named `k` and string type.
The table must not have a sort (range) key.
## IAM permissions
The ExternalDNS Role must be granted the following permissions:
```json
{
"Effect": "Allow",
"Action": [
"DynamoDB:DescribeTable",
"DynamoDB:PartiQLDelete",
"DynamoDB:PartiQLInsert",
"DynamoDB:PartiQLUpdate",
"DynamoDB:Scan"
],
"Resource": [
"arn:aws:dynamodb:*:*:table/external-dns"
]
}
```
The region and account ID may be specified explicitly specified instead of using wildcards.
## Caching
The DynamoDB registry can optionally cache DNS records read from the provider. This can mitigate
rate limits imposed by the provider.
Caching is enabled by specifying a cache duration with the `--txt-cache-interval` flag.

View File

@ -11,6 +11,7 @@ The registry implementation is specified using the `--registry` flag.
## Supported registries
* [txt](txt.md) (default) - Stores in TXT records in the same provider
* [txt](txt.md) (default) - Stores metadata in TXT records in the same provider.
* [dynamodb](dynamodb.md) - Stores metadata in an AWS DynamoDB table.
* noop - Passes metadata directly to the provider. For most providers, this means the metadata is not persisted.
* aws-sd - Stores metadata in AWS Service Discovery. Only usable with the `aws-sd` provider.

View File

@ -162,6 +162,13 @@ type ProviderSpecificProperty struct {
// ProviderSpecific holds configuration which is specific to individual DNS providers
type ProviderSpecific []ProviderSpecificProperty
// EndpointKey is the type of a map key for separating endpoints or targets.
type EndpointKey struct {
DNSName string
RecordType string
SetIdentifier string
}
// Endpoint is a high-level way of a connection between a service and an IP
type Endpoint struct {
// The hostname of the DNS record
@ -261,6 +268,15 @@ func (e *Endpoint) DeleteProviderSpecificProperty(key string) {
}
}
// Key returns the EndpointKey of the Endpoint.
func (e *Endpoint) Key() EndpointKey {
return EndpointKey{
DNSName: e.DNSName,
RecordType: e.RecordType,
SetIdentifier: e.SetIdentifier,
}
}
func (e *Endpoint) String() string {
return fmt.Sprintf("%s %d IN %s %s %s %s", e.DNSName, e.RecordTTL, e.RecordType, e.SetIdentifier, e.Targets, e.ProviderSpecific)
}

31
main.go
View File

@ -25,6 +25,11 @@ import (
"syscall"
"time"
awsSDK "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/route53"
sd "github.com/aws/aws-sdk-go/service/servicediscovery"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
@ -180,6 +185,20 @@ func main() {
zoneTypeFilter := provider.NewZoneTypeFilter(cfg.AWSZoneType)
zoneTagFilter := provider.NewZoneTagFilter(cfg.AWSZoneTagFilter)
var awsSession *session.Session
if cfg.Provider == "aws" || cfg.Provider == "aws-sd" || cfg.Registry == "dynamodb" {
awsSession, err = aws.NewSession(
aws.AWSSessionConfig{
AssumeRole: cfg.AWSAssumeRole,
AssumeRoleExternalID: cfg.AWSAssumeRoleExternalID,
APIRetries: cfg.AWSAPIRetries,
},
)
if err != nil {
log.Fatal(err)
}
}
var p provider.Provider
switch cfg.Provider {
case "akamai":
@ -207,13 +226,11 @@ func main() {
BatchChangeSize: cfg.AWSBatchChangeSize,
BatchChangeInterval: cfg.AWSBatchChangeInterval,
EvaluateTargetHealth: cfg.AWSEvaluateTargetHealth,
AssumeRole: cfg.AWSAssumeRole,
AssumeRoleExternalID: cfg.AWSAssumeRoleExternalID,
APIRetries: cfg.AWSAPIRetries,
PreferCNAME: cfg.AWSPreferCNAME,
DryRun: cfg.DryRun,
ZoneCacheDuration: cfg.AWSZoneCacheDuration,
},
route53.New(awsSession),
)
case "aws-sd":
// Check that only compatible Registry is used with AWS-SD
@ -221,7 +238,7 @@ func main() {
log.Infof("Registry \"%s\" cannot be used with AWS Cloud Map. Switching to \"aws-sd\".", cfg.Registry)
cfg.Registry = "aws-sd"
}
p, err = awssd.NewAWSSDProvider(domainFilter, cfg.AWSZoneType, cfg.AWSAssumeRole, cfg.AWSAssumeRoleExternalID, cfg.DryRun, cfg.AWSSDServiceCleanup, cfg.TXTOwnerID)
p, err = awssd.NewAWSSDProvider(domainFilter, cfg.AWSZoneType, cfg.DryRun, cfg.AWSSDServiceCleanup, cfg.TXTOwnerID, sd.New(awsSession))
case "azure-dns", "azure":
p, err = azure.NewAzureProvider(cfg.AzureConfigFile, domainFilter, zoneNameFilter, zoneIDFilter, cfg.AzureResourceGroup, cfg.AzureUserAssignedIdentityClientID, cfg.DryRun)
case "azure-private-dns":
@ -380,6 +397,12 @@ func main() {
var r registry.Registry
switch cfg.Registry {
case "dynamodb":
config := awsSDK.NewConfig()
if cfg.AWSDynamoDBRegion != "" {
config = config.WithRegion(cfg.AWSDynamoDBRegion)
}
r, err = registry.NewDynamoDBRegistry(p, cfg.TXTOwnerID, dynamodb.New(awsSession, config), cfg.AWSDynamoDBTable, cfg.TXTCacheInterval)
case "noop":
r, err = registry.NewNoopRegistry(p)
case "txt":

View File

@ -15,6 +15,7 @@ nav:
- Registries:
- About: registry/registry.md
- TXT: registry/txt.md
- DynamoDB: registry/dynamodb.md
- Advanced Topics:
- Initial Design: initial-design.md
- TTL: ttl.md

View File

@ -93,6 +93,8 @@ type Config struct {
AWSPreferCNAME bool
AWSZoneCacheDuration time.Duration
AWSSDServiceCleanup bool
AWSDynamoDBRegion string
AWSDynamoDBTable string
AzureConfigFile string
AzureResourceGroup string
AzureSubscriptionID string
@ -255,6 +257,8 @@ var defaultConfig = &Config{
AWSPreferCNAME: false,
AWSZoneCacheDuration: 0 * time.Second,
AWSSDServiceCleanup: false,
AWSDynamoDBRegion: "",
AWSDynamoDBTable: "external-dns",
AzureConfigFile: "/etc/kubernetes/azure.json",
AzureResourceGroup: "",
AzureSubscriptionID: "",
@ -457,12 +461,12 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("alibaba-cloud-zone-type", "When using the Alibaba Cloud provider, filter for zones of this type (optional, options: public, private)").Default(defaultConfig.AlibabaCloudZoneType).EnumVar(&cfg.AlibabaCloudZoneType, "", "public", "private")
app.Flag("aws-zone-type", "When using the AWS provider, filter for zones of this type (optional, options: public, private)").Default(defaultConfig.AWSZoneType).EnumVar(&cfg.AWSZoneType, "", "public", "private")
app.Flag("aws-zone-tags", "When using the AWS provider, filter for zones with these tags").Default("").StringsVar(&cfg.AWSZoneTagFilter)
app.Flag("aws-assume-role", "When using the AWS provider, assume this IAM role. Useful for hosted zones in another AWS account. Specify the full ARN, e.g. `arn:aws:iam::123455567:role/external-dns` (optional)").Default(defaultConfig.AWSAssumeRole).StringVar(&cfg.AWSAssumeRole)
app.Flag("aws-assume-role-external-id", "When using the AWS provider and assuming a role then specify this external ID` (optional)").Default(defaultConfig.AWSAssumeRoleExternalID).StringVar(&cfg.AWSAssumeRoleExternalID)
app.Flag("aws-assume-role", "When using the AWS API, assume this IAM role. Useful for hosted zones in another AWS account. Specify the full ARN, e.g. `arn:aws:iam::123455567:role/external-dns` (optional)").Default(defaultConfig.AWSAssumeRole).StringVar(&cfg.AWSAssumeRole)
app.Flag("aws-assume-role-external-id", "When using the AWS API and assuming a role then specify this external ID` (optional)").Default(defaultConfig.AWSAssumeRoleExternalID).StringVar(&cfg.AWSAssumeRoleExternalID)
app.Flag("aws-batch-change-size", "When using the AWS provider, set the maximum number of changes that will be applied in each batch.").Default(strconv.Itoa(defaultConfig.AWSBatchChangeSize)).IntVar(&cfg.AWSBatchChangeSize)
app.Flag("aws-batch-change-interval", "When using the AWS provider, set the interval between batch changes.").Default(defaultConfig.AWSBatchChangeInterval.String()).DurationVar(&cfg.AWSBatchChangeInterval)
app.Flag("aws-evaluate-target-health", "When using the AWS provider, set whether to evaluate the health of a DNS target (default: enabled, disable with --no-aws-evaluate-target-health)").Default(strconv.FormatBool(defaultConfig.AWSEvaluateTargetHealth)).BoolVar(&cfg.AWSEvaluateTargetHealth)
app.Flag("aws-api-retries", "When using the AWS provider, set the maximum number of retries for API calls before giving up.").Default(strconv.Itoa(defaultConfig.AWSAPIRetries)).IntVar(&cfg.AWSAPIRetries)
app.Flag("aws-api-retries", "When using the AWS API, set the maximum number of retries before giving up.").Default(strconv.Itoa(defaultConfig.AWSAPIRetries)).IntVar(&cfg.AWSAPIRetries)
app.Flag("aws-prefer-cname", "When using the AWS provider, prefer using CNAME instead of ALIAS (default: disabled)").BoolVar(&cfg.AWSPreferCNAME)
app.Flag("aws-zones-cache-duration", "When using the AWS provider, set the zones list cache TTL (0s to disable).").Default(defaultConfig.AWSZoneCacheDuration.String()).DurationVar(&cfg.AWSZoneCacheDuration)
app.Flag("aws-sd-service-cleanup", "When using the AWS CloudMap provider, delete empty Services without endpoints (default: disabled)").BoolVar(&cfg.AWSSDServiceCleanup)
@ -572,13 +576,15 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("policy", "Modify how DNS records are synchronized between sources and providers (default: sync, options: sync, upsert-only, create-only)").Default(defaultConfig.Policy).EnumVar(&cfg.Policy, "sync", "upsert-only", "create-only")
// Flags related to the registry
app.Flag("registry", "The registry implementation to use to keep track of DNS record ownership (default: txt, options: txt, noop, aws-sd)").Default(defaultConfig.Registry).EnumVar(&cfg.Registry, "txt", "noop", "aws-sd")
app.Flag("txt-owner-id", "When using the TXT registry, a name that identifies this instance of ExternalDNS (default: default)").Default(defaultConfig.TXTOwnerID).StringVar(&cfg.TXTOwnerID)
app.Flag("registry", "The registry implementation to use to keep track of DNS record ownership (default: txt, options: txt, noop, dynamodb, aws-sd)").Default(defaultConfig.Registry).EnumVar(&cfg.Registry, "txt", "noop", "dynamodb", "aws-sd")
app.Flag("txt-owner-id", "When using the TXT or DynamoDB registry, a name that identifies this instance of ExternalDNS (default: default)").Default(defaultConfig.TXTOwnerID).StringVar(&cfg.TXTOwnerID)
app.Flag("txt-prefix", "When using the TXT registry, a custom string that's prefixed to each ownership DNS record (optional). Could contain record type template like '%{record_type}-prefix-'. Mutual exclusive with txt-suffix!").Default(defaultConfig.TXTPrefix).StringVar(&cfg.TXTPrefix)
app.Flag("txt-suffix", "When using the TXT registry, a custom string that's suffixed to the host portion of each ownership DNS record (optional). Could contain record type template like '-%{record_type}-suffix'. Mutual exclusive with txt-prefix!").Default(defaultConfig.TXTSuffix).StringVar(&cfg.TXTSuffix)
app.Flag("txt-wildcard-replacement", "When using the TXT registry, a custom string that's used instead of an asterisk for TXT records corresponding to wildcard DNS records (optional)").Default(defaultConfig.TXTWildcardReplacement).StringVar(&cfg.TXTWildcardReplacement)
app.Flag("txt-encrypt-enabled", "When using the TXT registry, set if TXT records should be encrypted before stored (default: disabled)").BoolVar(&cfg.TXTEncryptEnabled)
app.Flag("txt-encrypt-aes-key", "When using the TXT registry, set TXT record decryption and encryption 32 byte aes key (required when --txt-encrypt=true)").Default(defaultConfig.TXTEncryptAESKey).StringVar(&cfg.TXTEncryptAESKey)
app.Flag("dynamodb-region", "When using the DynamoDB registry, the AWS region of the DynamoDB table (optional)").Default(cfg.AWSDynamoDBRegion).StringVar(&cfg.AWSDynamoDBRegion)
app.Flag("dynamodb-table", "When using the DynamoDB registry, the name of the DynamoDB table (default: \"external-dns\")").Default(defaultConfig.AWSDynamoDBTable).StringVar(&cfg.AWSDynamoDBTable)
// Flags related to the main control loop
app.Flag("txt-cache-interval", "The interval between cache synchronizations in duration format (default: disabled)").Default(defaultConfig.TXTCacheInterval.String()).DurationVar(&cfg.TXTCacheInterval)

View File

@ -65,6 +65,7 @@ var (
AWSPreferCNAME: false,
AWSZoneCacheDuration: 0 * time.Second,
AWSSDServiceCleanup: false,
AWSDynamoDBTable: "external-dns",
AzureConfigFile: "/etc/kubernetes/azure.json",
AzureResourceGroup: "",
AzureSubscriptionID: "",
@ -170,6 +171,7 @@ var (
AWSPreferCNAME: true,
AWSZoneCacheDuration: 10 * time.Second,
AWSSDServiceCleanup: true,
AWSDynamoDBTable: "custom-table",
AzureConfigFile: "azure.json",
AzureResourceGroup: "arg",
AzureSubscriptionID: "arg",
@ -351,6 +353,7 @@ func TestParseFlags(t *testing.T) {
"--txt-owner-id=owner-1",
"--txt-prefix=associated-txt-record",
"--txt-cache-interval=12h",
"--dynamodb-table=custom-table",
"--interval=10m",
"--min-event-sync-interval=50s",
"--once",
@ -464,6 +467,7 @@ func TestParseFlags(t *testing.T) {
"EXTERNAL_DNS_AWS_PREFER_CNAME": "true",
"EXTERNAL_DNS_AWS_ZONES_CACHE_DURATION": "10s",
"EXTERNAL_DNS_AWS_SD_SERVICE_CLEANUP": "true",
"EXTERNAL_DNS_DYNAMODB_TABLE": "custom-table",
"EXTERNAL_DNS_POLICY": "upsert-only",
"EXTERNAL_DNS_REGISTRY": "noop",
"EXTERNAL_DNS_TXT_OWNER_ID": "owner-1",

View File

@ -25,11 +25,8 @@ import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/route53"
"github.com/linki/instrumented_http"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@ -226,49 +223,15 @@ type AWSConfig struct {
BatchChangeSize int
BatchChangeInterval time.Duration
EvaluateTargetHealth bool
AssumeRole string
AssumeRoleExternalID string
APIRetries int
PreferCNAME bool
DryRun bool
ZoneCacheDuration time.Duration
}
// NewAWSProvider initializes a new AWS Route53 based Provider.
func NewAWSProvider(awsConfig AWSConfig) (*AWSProvider, error) {
config := aws.NewConfig().WithMaxRetries(awsConfig.APIRetries)
config.WithHTTPClient(
instrumented_http.NewClient(config.HTTPClient, &instrumented_http.Callbacks{
PathProcessor: func(path string) string {
parts := strings.Split(path, "/")
return parts[len(parts)-1]
},
}),
)
session, err := session.NewSessionWithOptions(session.Options{
Config: *config,
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
return nil, errors.Wrap(err, "failed to instantiate AWS session")
}
if awsConfig.AssumeRole != "" {
if awsConfig.AssumeRoleExternalID != "" {
log.Infof("Assuming role: %s with external id %s", awsConfig.AssumeRole, awsConfig.AssumeRoleExternalID)
session.Config.WithCredentials(stscreds.NewCredentials(session, awsConfig.AssumeRole, func(p *stscreds.AssumeRoleProvider) {
p.ExternalID = &awsConfig.AssumeRoleExternalID
}))
} else {
log.Infof("Assuming role: %s", awsConfig.AssumeRole)
session.Config.WithCredentials(stscreds.NewCredentials(session, awsConfig.AssumeRole))
}
}
func NewAWSProvider(awsConfig AWSConfig, client Route53API) (*AWSProvider, error) {
provider := &AWSProvider{
client: route53.New(session),
client: client,
domainFilter: awsConfig.DomainFilter,
zoneIDFilter: awsConfig.ZoneIDFilter,
zoneTypeFilter: awsConfig.ZoneTypeFilter,

75
provider/aws/session.go Normal file
View File

@ -0,0 +1,75 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"fmt"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/linki/instrumented_http"
"github.com/sirupsen/logrus"
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
)
// AWSSessionConfig contains configuration to create a new AWS provider.
type AWSSessionConfig struct {
AssumeRole string
AssumeRoleExternalID string
APIRetries int
}
func NewSession(awsConfig AWSSessionConfig) (*session.Session, error) {
config := aws.NewConfig().WithMaxRetries(awsConfig.APIRetries)
config.WithHTTPClient(
instrumented_http.NewClient(config.HTTPClient, &instrumented_http.Callbacks{
PathProcessor: func(path string) string {
parts := strings.Split(path, "/")
return parts[len(parts)-1]
},
}),
)
session, err := session.NewSessionWithOptions(session.Options{
Config: *config,
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
return nil, fmt.Errorf("instantiating AWS session: %w", err)
}
if awsConfig.AssumeRole != "" {
if awsConfig.AssumeRoleExternalID != "" {
logrus.Infof("Assuming role: %s with external id %s", awsConfig.AssumeRole, awsConfig.AssumeRoleExternalID)
session.Config.WithCredentials(stscreds.NewCredentials(session, awsConfig.AssumeRole, func(p *stscreds.AssumeRoleProvider) {
p.ExternalID = &awsConfig.AssumeRoleExternalID
}))
} else {
logrus.Infof("Assuming role: %s", awsConfig.AssumeRole)
session.Config.WithCredentials(stscreds.NewCredentials(session, awsConfig.AssumeRole))
}
}
session.Handlers.Build.PushBack(request.MakeAddToUserAgentHandler("ExternalDNS", externaldns.Version))
return session, nil
}

View File

@ -25,15 +25,10 @@ import (
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
sd "github.com/aws/aws-sdk-go/service/servicediscovery"
"github.com/linki/instrumented_http"
log "github.com/sirupsen/logrus"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
"sigs.k8s.io/external-dns/plan"
"sigs.k8s.io/external-dns/provider"
)
@ -86,42 +81,9 @@ type AWSSDProvider struct {
}
// NewAWSSDProvider initializes a new AWS Cloud Map based Provider.
func NewAWSSDProvider(domainFilter endpoint.DomainFilter, namespaceType string, assumeRole string, assumeRoleExternalID string, dryRun, cleanEmptyService bool, ownerID string) (*AWSSDProvider, error) {
config := aws.NewConfig()
config = config.WithHTTPClient(
instrumented_http.NewClient(config.HTTPClient, &instrumented_http.Callbacks{
PathProcessor: func(path string) string {
parts := strings.Split(path, "/")
return parts[len(parts)-1]
},
}),
)
sess, err := session.NewSessionWithOptions(session.Options{
Config: *config,
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
return nil, err
}
if assumeRole != "" {
if assumeRoleExternalID != "" {
log.Infof("Assuming role %q with external ID %q", assumeRole, assumeRoleExternalID)
sess.Config.WithCredentials(stscreds.NewCredentials(sess, assumeRole, func(p *stscreds.AssumeRoleProvider) {
p.ExternalID = &assumeRoleExternalID
}))
} else {
log.Infof("Assuming role: %s", assumeRole)
sess.Config.WithCredentials(stscreds.NewCredentials(sess, assumeRole))
}
}
sess.Handlers.Build.PushBack(request.MakeAddToUserAgentHandler("ExternalDNS", externaldns.Version))
func NewAWSSDProvider(domainFilter endpoint.DomainFilter, namespaceType string, dryRun, cleanEmptyService bool, ownerID string, client AWSSDClient) (*AWSSDProvider, error) {
provider := &AWSSDProvider{
client: sd.New(sess),
client: client,
dryRun: dryRun,
namespaceFilter: domainFilter,
namespaceTypeFilter: newSdNamespaceFilter(namespaceType),

View File

@ -22,6 +22,7 @@ import (
"strings"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/plan"
@ -132,11 +133,7 @@ func (im *InMemoryProvider) Records(ctx context.Context) ([]*endpoint.Endpoint,
return nil, err
}
for _, record := range records {
ep := endpoint.NewEndpoint(record.Name, record.Type, record.Target).WithSetIdentifier(record.SetIdentifier)
ep.Labels = record.Labels
endpoints = append(endpoints, ep)
}
endpoints = append(endpoints, copyEndpoints(records)...)
}
return endpoints, nil
@ -187,11 +184,11 @@ func (im *InMemoryProvider) ApplyChanges(ctx context.Context, changes *plan.Chan
}
for zoneID := range perZoneChanges {
change := &inMemoryChange{
Create: convertToInMemoryRecord(perZoneChanges[zoneID].Create),
UpdateNew: convertToInMemoryRecord(perZoneChanges[zoneID].UpdateNew),
UpdateOld: convertToInMemoryRecord(perZoneChanges[zoneID].UpdateOld),
Delete: convertToInMemoryRecord(perZoneChanges[zoneID].Delete),
change := &plan.Changes{
Create: perZoneChanges[zoneID].Create,
UpdateNew: perZoneChanges[zoneID].UpdateNew,
UpdateOld: perZoneChanges[zoneID].UpdateOld,
Delete: perZoneChanges[zoneID].Delete,
}
err := im.client.ApplyChanges(ctx, zoneID, change)
if err != nil {
@ -202,16 +199,15 @@ func (im *InMemoryProvider) ApplyChanges(ctx context.Context, changes *plan.Chan
return nil
}
func convertToInMemoryRecord(endpoints []*endpoint.Endpoint) []*inMemoryRecord {
records := []*inMemoryRecord{}
func copyEndpoints(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint {
records := make([]*endpoint.Endpoint, 0, len(endpoints))
for _, ep := range endpoints {
records = append(records, &inMemoryRecord{
Type: ep.RecordType,
Name: ep.DNSName,
Target: ep.Targets[0],
SetIdentifier: ep.SetIdentifier,
Labels: ep.Labels,
})
newEp := endpoint.NewEndpointWithTTL(ep.DNSName, ep.RecordType, ep.RecordTTL, ep.Targets...).WithSetIdentifier(ep.SetIdentifier)
newEp.Labels = endpoint.NewLabels()
for k, v := range ep.Labels {
newEp.Labels[k] = v
}
records = append(records, newEp)
}
return records
}
@ -244,26 +240,7 @@ func (f *filter) EndpointZoneID(endpoint *endpoint.Endpoint, zones map[string]st
return matchZoneID
}
// inMemoryRecord - record stored in memory
// Type - type of record
// Name - DNS name assigned to the record
// Target - target of the record
type inMemoryRecord struct {
Type string
SetIdentifier string
Name string
Target string
Labels endpoint.Labels
}
type zone map[string][]*inMemoryRecord
type inMemoryChange struct {
Create []*inMemoryRecord
UpdateNew []*inMemoryRecord
UpdateOld []*inMemoryRecord
Delete []*inMemoryRecord
}
type zone map[endpoint.EndpointKey]*endpoint.Endpoint
type inMemoryClient struct {
zones map[string]zone
@ -273,14 +250,14 @@ func newInMemoryClient() *inMemoryClient {
return &inMemoryClient{map[string]zone{}}
}
func (c *inMemoryClient) Records(zone string) ([]*inMemoryRecord, error) {
func (c *inMemoryClient) Records(zone string) ([]*endpoint.Endpoint, error) {
if _, ok := c.zones[zone]; !ok {
return nil, ErrZoneNotFound
}
records := []*inMemoryRecord{}
var records []*endpoint.Endpoint
for _, rec := range c.zones[zone] {
records = append(records, rec...)
records = append(records, rec)
}
return records, nil
}
@ -297,66 +274,44 @@ func (c *inMemoryClient) CreateZone(zone string) error {
if _, ok := c.zones[zone]; ok {
return ErrZoneAlreadyExists
}
c.zones[zone] = map[string][]*inMemoryRecord{}
c.zones[zone] = map[endpoint.EndpointKey]*endpoint.Endpoint{}
return nil
}
func (c *inMemoryClient) ApplyChanges(ctx context.Context, zoneID string, changes *inMemoryChange) error {
func (c *inMemoryClient) ApplyChanges(ctx context.Context, zoneID string, changes *plan.Changes) error {
if err := c.validateChangeBatch(zoneID, changes); err != nil {
return err
}
for _, newEndpoint := range changes.Create {
if _, ok := c.zones[zoneID][newEndpoint.Name]; !ok {
c.zones[zoneID][newEndpoint.Name] = make([]*inMemoryRecord, 0)
}
c.zones[zoneID][newEndpoint.Name] = append(c.zones[zoneID][newEndpoint.Name], newEndpoint)
c.zones[zoneID][newEndpoint.Key()] = newEndpoint
}
for _, updateEndpoint := range changes.UpdateNew {
for _, rec := range c.zones[zoneID][updateEndpoint.Name] {
if rec.Type == updateEndpoint.Type {
rec.Target = updateEndpoint.Target
break
}
}
c.zones[zoneID][updateEndpoint.Key()] = updateEndpoint
}
for _, deleteEndpoint := range changes.Delete {
newSet := make([]*inMemoryRecord, 0)
for _, rec := range c.zones[zoneID][deleteEndpoint.Name] {
if rec.Type != deleteEndpoint.Type {
newSet = append(newSet, rec)
}
}
c.zones[zoneID][deleteEndpoint.Name] = newSet
delete(c.zones[zoneID], deleteEndpoint.Key())
}
return nil
}
func (c *inMemoryClient) updateMesh(mesh map[string]map[string]map[string]bool, record *inMemoryRecord) error {
if _, exists := mesh[record.Name]; exists {
if _, exists := mesh[record.Name][record.Type]; exists {
if mesh[record.Name][record.Type][record.SetIdentifier] {
return ErrDuplicateRecordFound
}
mesh[record.Name][record.Type][record.SetIdentifier] = true
return nil
}
mesh[record.Name][record.Type] = map[string]bool{record.SetIdentifier: true}
return nil
func (c *inMemoryClient) updateMesh(mesh sets.Set[endpoint.EndpointKey], record *endpoint.Endpoint) error {
if mesh.Has(record.Key()) {
return ErrDuplicateRecordFound
}
mesh[record.Name] = map[string]map[string]bool{record.Type: {record.SetIdentifier: true}}
mesh.Insert(record.Key())
return nil
}
// validateChangeBatch validates that the changes passed to InMemory DNS provider is valid
func (c *inMemoryClient) validateChangeBatch(zone string, changes *inMemoryChange) error {
func (c *inMemoryClient) validateChangeBatch(zone string, changes *plan.Changes) error {
curZone, ok := c.zones[zone]
if !ok {
return ErrZoneNotFound
}
mesh := map[string]map[string]map[string]bool{}
mesh := sets.New[endpoint.EndpointKey]()
for _, newEndpoint := range changes.Create {
if c.findByTypeAndSetIdentifier(newEndpoint.Type, newEndpoint.SetIdentifier, curZone[newEndpoint.Name]) != nil {
if _, exists := curZone[newEndpoint.Key()]; exists {
return ErrRecordAlreadyExists
}
if err := c.updateMesh(mesh, newEndpoint); err != nil {
@ -364,7 +319,7 @@ func (c *inMemoryClient) validateChangeBatch(zone string, changes *inMemoryChang
}
}
for _, updateEndpoint := range changes.UpdateNew {
if c.findByTypeAndSetIdentifier(updateEndpoint.Type, updateEndpoint.SetIdentifier, curZone[updateEndpoint.Name]) == nil {
if _, exists := curZone[updateEndpoint.Key()]; !exists {
return ErrRecordNotFound
}
if err := c.updateMesh(mesh, updateEndpoint); err != nil {
@ -372,12 +327,12 @@ func (c *inMemoryClient) validateChangeBatch(zone string, changes *inMemoryChang
}
}
for _, updateOldEndpoint := range changes.UpdateOld {
if rec := c.findByTypeAndSetIdentifier(updateOldEndpoint.Type, updateOldEndpoint.SetIdentifier, curZone[updateOldEndpoint.Name]); rec == nil || rec.Target != updateOldEndpoint.Target {
if rec, exists := curZone[updateOldEndpoint.Key()]; !exists || rec.Targets[0] != updateOldEndpoint.Targets[0] {
return ErrRecordNotFound
}
}
for _, deleteEndpoint := range changes.Delete {
if rec := c.findByTypeAndSetIdentifier(deleteEndpoint.Type, deleteEndpoint.SetIdentifier, curZone[deleteEndpoint.Name]); rec == nil || rec.Target != deleteEndpoint.Target {
if rec, exists := curZone[deleteEndpoint.Key()]; !exists || rec.Targets[0] != deleteEndpoint.Targets[0] {
return ErrRecordNotFound
}
if err := c.updateMesh(mesh, deleteEndpoint); err != nil {
@ -386,12 +341,3 @@ func (c *inMemoryClient) validateChangeBatch(zone string, changes *inMemoryChang
}
return nil
}
func (c *inMemoryClient) findByTypeAndSetIdentifier(recordType, setIdentifier string, records []*inMemoryRecord) *inMemoryRecord {
for _, record := range records {
if record.Type == recordType && record.SetIdentifier == setIdentifier {
return record
}
}
return nil
}

View File

@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
"sigs.k8s.io/external-dns/plan"
@ -32,7 +31,6 @@ import (
var _ provider.Provider = &InMemoryProvider{}
func TestInMemoryProvider(t *testing.T) {
t.Run("findByType", testInMemoryFindByType)
t.Run("Records", testInMemoryRecords)
t.Run("validateChangeBatch", testInMemoryValidateChangeBatch)
t.Run("ApplyChanges", testInMemoryApplyChanges)
@ -40,114 +38,6 @@ func TestInMemoryProvider(t *testing.T) {
t.Run("CreateZone", testInMemoryCreateZone)
}
func testInMemoryFindByType(t *testing.T) {
for _, ti := range []struct {
title string
findType string
findSetIdentifier string
records []*inMemoryRecord
expected *inMemoryRecord
expectedEmpty bool
}{
{
title: "no records, empty type",
findType: "",
records: nil,
expected: nil,
expectedEmpty: true,
},
{
title: "no records, non-empty type",
findType: endpoint.RecordTypeA,
records: nil,
expected: nil,
expectedEmpty: true,
},
{
title: "one record, empty type",
findType: "",
records: []*inMemoryRecord{
{
Type: endpoint.RecordTypeA,
},
},
expected: nil,
expectedEmpty: true,
},
{
title: "one record, wrong type",
findType: endpoint.RecordTypeCNAME,
records: []*inMemoryRecord{
{
Type: endpoint.RecordTypeA,
},
},
expected: nil,
expectedEmpty: true,
},
{
title: "one record, right type",
findType: endpoint.RecordTypeA,
records: []*inMemoryRecord{
{
Type: endpoint.RecordTypeA,
},
},
expected: &inMemoryRecord{
Type: endpoint.RecordTypeA,
},
},
{
title: "multiple records, right type",
findType: endpoint.RecordTypeA,
records: []*inMemoryRecord{
{
Type: endpoint.RecordTypeA,
},
{
Type: endpoint.RecordTypeTXT,
},
},
expected: &inMemoryRecord{
Type: endpoint.RecordTypeA,
},
},
{
title: "multiple records, right type and set identifier",
findType: endpoint.RecordTypeA,
findSetIdentifier: "test-set-1",
records: []*inMemoryRecord{
{
Type: endpoint.RecordTypeA,
SetIdentifier: "test-set-1",
},
{
Type: endpoint.RecordTypeA,
SetIdentifier: "test-set-2",
},
{
Type: endpoint.RecordTypeTXT,
},
},
expected: &inMemoryRecord{
Type: endpoint.RecordTypeA,
SetIdentifier: "test-set-1",
},
},
} {
t.Run(ti.title, func(t *testing.T) {
c := newInMemoryClient()
record := c.findByTypeAndSetIdentifier(ti.findType, ti.findSetIdentifier, ti.records)
if ti.expectedEmpty {
assert.Nil(t, record)
} else {
require.NotNil(t, record)
assert.Equal(t, *ti.expected, *record)
}
})
}
}
func testInMemoryRecords(t *testing.T) {
for _, ti := range []struct {
title string
@ -175,35 +65,12 @@ func testInMemoryRecords(t *testing.T) {
title: "records, zone with records",
zone: "org",
init: map[string]zone{
"org": {
"example.org": []*inMemoryRecord{
{
Name: "example.org",
Target: "8.8.8.8",
Type: endpoint.RecordTypeA,
},
{
Name: "example.org",
Type: endpoint.RecordTypeTXT,
},
},
"foo.org": []*inMemoryRecord{
{
Name: "foo.org",
Target: "4.4.4.4",
Type: endpoint.RecordTypeCNAME,
},
},
},
"com": {
"example.com": []*inMemoryRecord{
{
Name: "example.com",
Target: "4.4.4.4",
Type: endpoint.RecordTypeCNAME,
},
},
},
"org": makeZone(
"example.org", "8.8.8.8", endpoint.RecordTypeA,
"example.org", "", endpoint.RecordTypeTXT,
"foo.org", "4.4.4.4", endpoint.RecordTypeCNAME,
),
"com": makeZone("example.com", "4.4.4.4", endpoint.RecordTypeCNAME),
},
expectError: false,
expected: []*endpoint.Endpoint{
@ -246,41 +113,13 @@ func testInMemoryRecords(t *testing.T) {
func testInMemoryValidateChangeBatch(t *testing.T) {
init := map[string]zone{
"org": {
"example.org": []*inMemoryRecord{
{
Name: "example.org",
Target: "8.8.8.8",
Type: endpoint.RecordTypeA,
},
{
Name: "example.org",
},
},
"foo.org": []*inMemoryRecord{
{
Name: "foo.org",
Target: "bar.org",
Type: endpoint.RecordTypeCNAME,
},
},
"foo.bar.org": []*inMemoryRecord{
{
Name: "foo.bar.org",
Target: "5.5.5.5",
Type: endpoint.RecordTypeA,
},
},
},
"com": {
"example.com": []*inMemoryRecord{
{
Name: "example.com",
Target: "another-example.com",
Type: endpoint.RecordTypeCNAME,
},
},
},
"org": makeZone(
"example.org", "8.8.8.8", endpoint.RecordTypeA,
"example.org", "", endpoint.RecordTypeTXT,
"foo.org", "bar.org", endpoint.RecordTypeCNAME,
"foo.bar.org", "5.5.5.5", endpoint.RecordTypeA,
),
"com": makeZone("example.com", "another-example.com", endpoint.RecordTypeCNAME),
}
for _, ti := range []struct {
title string
@ -561,11 +400,11 @@ func testInMemoryValidateChangeBatch(t *testing.T) {
t.Run(ti.title, func(t *testing.T) {
c := &inMemoryClient{}
c.zones = ti.init
ichanges := &inMemoryChange{
Create: convertToInMemoryRecord(ti.changes.Create),
UpdateNew: convertToInMemoryRecord(ti.changes.UpdateNew),
UpdateOld: convertToInMemoryRecord(ti.changes.UpdateOld),
Delete: convertToInMemoryRecord(ti.changes.Delete),
ichanges := &plan.Changes{
Create: ti.changes.Create,
UpdateNew: ti.changes.UpdateNew,
UpdateOld: ti.changes.UpdateOld,
Delete: ti.changes.Delete,
}
err := c.validateChangeBatch(ti.zone, ichanges)
if ti.expectError {
@ -579,42 +418,12 @@ func testInMemoryValidateChangeBatch(t *testing.T) {
func getInitData() map[string]zone {
return map[string]zone{
"org": {
"example.org": []*inMemoryRecord{
{
Name: "example.org",
Target: "8.8.8.8",
Type: endpoint.RecordTypeA,
},
{
Name: "example.org",
Type: endpoint.RecordTypeTXT,
},
},
"foo.org": []*inMemoryRecord{
{
Name: "foo.org",
Target: "4.4.4.4",
Type: endpoint.RecordTypeCNAME,
},
},
"foo.bar.org": []*inMemoryRecord{
{
Name: "foo.bar.org",
Target: "5.5.5.5",
Type: endpoint.RecordTypeA,
},
},
},
"com": {
"example.com": []*inMemoryRecord{
{
Name: "example.com",
Target: "4.4.4.4",
Type: endpoint.RecordTypeCNAME,
},
},
},
"org": makeZone("example.org", "8.8.8.8", endpoint.RecordTypeA,
"example.org", "", endpoint.RecordTypeTXT,
"foo.org", "4.4.4.4", endpoint.RecordTypeCNAME,
"foo.bar.org", "5.5.5.5", endpoint.RecordTypeA,
),
"com": makeZone("example.com", "4.4.4.4", endpoint.RecordTypeCNAME),
}
}
@ -679,36 +488,11 @@ func testInMemoryApplyChanges(t *testing.T) {
},
},
expectedZonesState: map[string]zone{
"org": {
"example.org": []*inMemoryRecord{
{
Name: "example.org",
Target: "8.8.8.8",
Type: endpoint.RecordTypeA,
},
{
Name: "example.org",
Type: endpoint.RecordTypeTXT,
},
},
"foo.org": []*inMemoryRecord{
{
Name: "foo.org",
Target: "4.4.4.4",
Type: endpoint.RecordTypeCNAME,
},
},
"foo.bar.org": []*inMemoryRecord{},
},
"com": {
"example.com": []*inMemoryRecord{
{
Name: "example.com",
Target: "4.4.4.4",
Type: endpoint.RecordTypeCNAME,
},
},
},
"org": makeZone("example.org", "8.8.8.8", endpoint.RecordTypeA,
"example.org", "", endpoint.RecordTypeTXT,
"foo.org", "4.4.4.4", endpoint.RecordTypeCNAME,
),
"com": makeZone("example.com", "4.4.4.4", endpoint.RecordTypeCNAME),
},
},
{
@ -720,6 +504,7 @@ func testInMemoryApplyChanges(t *testing.T) {
DNSName: "foo.bar.new.org",
Targets: endpoint.Targets{"4.8.8.9"},
RecordType: endpoint.RecordTypeA,
Labels: endpoint.NewLabels(),
},
},
UpdateNew: []*endpoint.Endpoint{
@ -727,6 +512,7 @@ func testInMemoryApplyChanges(t *testing.T) {
DNSName: "foo.bar.org",
Targets: endpoint.Targets{"4.8.8.4"},
RecordType: endpoint.RecordTypeA,
Labels: endpoint.NewLabels(),
},
},
UpdateOld: []*endpoint.Endpoint{
@ -734,6 +520,7 @@ func testInMemoryApplyChanges(t *testing.T) {
DNSName: "foo.bar.org",
Targets: endpoint.Targets{"5.5.5.5"},
RecordType: endpoint.RecordTypeA,
Labels: endpoint.NewLabels(),
},
},
Delete: []*endpoint.Endpoint{
@ -741,48 +528,18 @@ func testInMemoryApplyChanges(t *testing.T) {
DNSName: "example.org",
Targets: endpoint.Targets{"8.8.8.8"},
RecordType: endpoint.RecordTypeA,
Labels: endpoint.NewLabels(),
},
},
},
expectedZonesState: map[string]zone{
"org": {
"example.org": []*inMemoryRecord{
{
Name: "example.org",
Type: endpoint.RecordTypeTXT,
},
},
"foo.org": []*inMemoryRecord{
{
Name: "foo.org",
Target: "4.4.4.4",
Type: endpoint.RecordTypeCNAME,
},
},
"foo.bar.org": []*inMemoryRecord{
{
Name: "foo.bar.org",
Target: "4.8.8.4",
Type: endpoint.RecordTypeA,
},
},
"foo.bar.new.org": []*inMemoryRecord{
{
Name: "foo.bar.new.org",
Target: "4.8.8.9",
Type: endpoint.RecordTypeA,
},
},
},
"com": {
"example.com": []*inMemoryRecord{
{
Name: "example.com",
Target: "4.4.4.4",
Type: endpoint.RecordTypeCNAME,
},
},
},
"org": makeZone(
"example.org", "", endpoint.RecordTypeTXT,
"foo.org", "4.4.4.4", endpoint.RecordTypeCNAME,
"foo.bar.org", "4.8.8.4", endpoint.RecordTypeA,
"foo.bar.new.org", "4.8.8.9", endpoint.RecordTypeA,
),
"com": makeZone("example.com", "4.4.4.4", endpoint.RecordTypeCNAME),
},
},
} {
@ -815,3 +572,17 @@ func testInMemoryCreateZone(t *testing.T) {
err = im.CreateZone("zone")
assert.EqualError(t, err, ErrZoneAlreadyExists.Error())
}
func makeZone(s ...string) map[endpoint.EndpointKey]*endpoint.Endpoint {
if len(s)%3 != 0 {
panic("makeZone arguments must be multiple of 3")
}
output := map[endpoint.EndpointKey]*endpoint.Endpoint{}
for i := 0; i < len(s); i += 3 {
ep := endpoint.NewEndpoint(s[i], s[i+2], s[i+1])
output[ep.Key()] = ep
}
return output
}

437
registry/dynamodb.go Normal file
View File

@ -0,0 +1,437 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/dynamodb"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/plan"
"sigs.k8s.io/external-dns/provider"
)
// DynamoDBAPI is the subset of the AWS Route53 API that we actually use. Add methods as required. Signatures must match exactly.
type DynamoDBAPI interface {
DescribeTableWithContext(ctx aws.Context, input *dynamodb.DescribeTableInput, opts ...request.Option) (*dynamodb.DescribeTableOutput, error)
ScanPagesWithContext(ctx aws.Context, input *dynamodb.ScanInput, fn func(*dynamodb.ScanOutput, bool) bool, opts ...request.Option) error
BatchExecuteStatementWithContext(aws.Context, *dynamodb.BatchExecuteStatementInput, ...request.Option) (*dynamodb.BatchExecuteStatementOutput, error)
}
// DynamoDBRegistry implements registry interface with ownership implemented via an AWS DynamoDB table.
type DynamoDBRegistry struct {
provider provider.Provider
ownerID string // refers to the owner id of the current instance
dynamodbAPI DynamoDBAPI
table string
// cache the dynamodb records owned by us.
labels map[endpoint.EndpointKey]endpoint.Labels
orphanedLabels sets.Set[endpoint.EndpointKey]
// cache the records in memory and update on an interval instead.
recordsCache []*endpoint.Endpoint
recordsCacheRefreshTime time.Time
cacheInterval time.Duration
}
// NewDynamoDBRegistry returns a new DynamoDBRegistry object.
func NewDynamoDBRegistry(provider provider.Provider, ownerID string, dynamodbAPI DynamoDBAPI, table string, cacheInterval time.Duration) (*DynamoDBRegistry, error) {
if ownerID == "" {
return nil, errors.New("owner id cannot be empty")
}
if table == "" {
return nil, errors.New("table cannot be empty")
}
return &DynamoDBRegistry{
provider: provider,
ownerID: ownerID,
dynamodbAPI: dynamodbAPI,
table: table,
cacheInterval: cacheInterval,
}, nil
}
func (im *DynamoDBRegistry) GetDomainFilter() endpoint.DomainFilterInterface {
return im.provider.GetDomainFilter()
}
// Records returns the current records from the registry.
func (im *DynamoDBRegistry) Records(ctx context.Context) ([]*endpoint.Endpoint, error) {
// If we have the zones cached AND we have refreshed the cache since the
// last given interval, then just use the cached results.
if im.recordsCache != nil && time.Since(im.recordsCacheRefreshTime) < im.cacheInterval {
log.Debug("Using cached records.")
return im.recordsCache, nil
}
if im.labels == nil {
if err := im.readLabels(ctx); err != nil {
return nil, err
}
}
records, err := im.provider.Records(ctx)
if err != nil {
return nil, err
}
orphanedLabels := sets.KeySet(im.labels)
endpoints := make([]*endpoint.Endpoint, 0, len(records))
for _, record := range records {
key := record.Key()
if labels := im.labels[key]; labels != nil {
record.Labels = labels
orphanedLabels.Delete(key)
} else {
record.Labels = endpoint.NewLabels()
}
endpoints = append(endpoints, record)
}
im.orphanedLabels = orphanedLabels
// Update the cache.
if im.cacheInterval > 0 {
im.recordsCache = endpoints
im.recordsCacheRefreshTime = time.Now()
}
return endpoints, nil
}
// ApplyChanges updates the DNS provider and DynamoDB table with the changes.
func (im *DynamoDBRegistry) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
filteredChanges := &plan.Changes{
Create: changes.Create,
UpdateNew: filterOwnedRecords(im.ownerID, changes.UpdateNew),
UpdateOld: filterOwnedRecords(im.ownerID, changes.UpdateOld),
Delete: filterOwnedRecords(im.ownerID, changes.Delete),
}
statements := make([]*dynamodb.BatchStatementRequest, 0, len(filteredChanges.Create)+len(filteredChanges.UpdateNew))
for _, r := range filteredChanges.Create {
if r.Labels == nil {
r.Labels = make(map[string]string)
}
r.Labels[endpoint.OwnerLabelKey] = im.ownerID
key := r.Key()
oldLabels := im.labels[key]
if oldLabels == nil {
statements = append(statements, &dynamodb.BatchStatementRequest{
Statement: aws.String(fmt.Sprintf("INSERT INTO %q VALUE {'k':?, 'o':?, 'l':?}", im.table)),
Parameters: []*dynamodb.AttributeValue{
toDynamoKey(key),
{S: aws.String(im.ownerID)},
toDynamoLabels(r.Labels),
},
ConsistentRead: aws.Bool(true),
})
} else {
im.orphanedLabels.Delete(key)
statements = im.appendUpdate(statements, key, oldLabels, r.Labels)
}
im.labels[key] = r.Labels
if im.cacheInterval > 0 {
im.addToCache(r)
}
}
for _, r := range filteredChanges.Delete {
delete(im.labels, r.Key())
if im.cacheInterval > 0 {
im.removeFromCache(r)
}
}
oldLabels := make(map[endpoint.EndpointKey]endpoint.Labels, len(filteredChanges.UpdateOld))
for _, r := range filteredChanges.UpdateOld {
oldLabels[r.Key()] = r.Labels
// remove old version of record from cache
if im.cacheInterval > 0 {
im.removeFromCache(r)
}
}
for _, r := range filteredChanges.UpdateNew {
key := r.Key()
statements = im.appendUpdate(statements, key, oldLabels[key], r.Labels)
// add new version of record to caches
im.labels[key] = r.Labels
if im.cacheInterval > 0 {
im.addToCache(r)
}
}
err := im.executeStatements(ctx, statements, func(request *dynamodb.BatchStatementRequest, response *dynamodb.BatchStatementResponse) error {
var context string
if strings.HasPrefix(*request.Statement, "INSERT") {
if aws.StringValue(response.Error.Code) == "DuplicateItem" {
// We lost a race with a different owner or another owner has an orphaned ownership record.
key := fromDynamoKey(request.Parameters[0])
for i, endpoint := range filteredChanges.Create {
if endpoint.Key() == key {
log.Infof("Skipping endpoint %v because owner does not match", endpoint)
filteredChanges.Create = append(filteredChanges.Create[:i], filteredChanges.Create[i+1:]...)
// The dynamodb insertion failed; remove from our cache.
im.removeFromCache(endpoint)
delete(im.labels, key)
return nil
}
}
}
context = fmt.Sprintf("inserting dynamodb record %q", aws.StringValue(request.Parameters[0].S))
} else {
context = fmt.Sprintf("updating dynamodb record %q", aws.StringValue(request.Parameters[1].S))
}
return fmt.Errorf("%s: %s: %s", context, aws.StringValue(response.Error.Code), aws.StringValue(response.Error.Message))
})
if err != nil {
im.recordsCache = nil
im.labels = nil
return err
}
// When caching is enabled, disable the provider from using the cache.
if im.cacheInterval > 0 {
ctx = context.WithValue(ctx, provider.RecordsContextKey, nil)
}
err = im.provider.ApplyChanges(ctx, filteredChanges)
if err != nil {
im.recordsCache = nil
im.labels = nil
return err
}
statements = make([]*dynamodb.BatchStatementRequest, 0, len(filteredChanges.Delete)+len(im.orphanedLabels))
for _, r := range filteredChanges.Delete {
statements = im.appendDelete(statements, r.Key())
}
for r := range im.orphanedLabels {
statements = im.appendDelete(statements, r)
delete(im.labels, r)
}
im.orphanedLabels = nil
return im.executeStatements(ctx, statements, func(request *dynamodb.BatchStatementRequest, response *dynamodb.BatchStatementResponse) error {
im.labels = nil
return fmt.Errorf("deleting dynamodb record %q: %s: %s", aws.StringValue(request.Parameters[0].S), aws.StringValue(response.Error.Code), aws.StringValue(response.Error.Message))
})
}
// PropertyValuesEqual compares two attribute values for equality.
func (im *DynamoDBRegistry) PropertyValuesEqual(name string, previous string, current string) bool {
return im.provider.PropertyValuesEqual(name, previous, current)
}
// AdjustEndpoints modifies the endpoints as needed by the specific provider.
func (im *DynamoDBRegistry) AdjustEndpoints(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint {
return im.provider.AdjustEndpoints(endpoints)
}
func (im *DynamoDBRegistry) readLabels(ctx context.Context) error {
table, err := im.dynamodbAPI.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(im.table),
})
if err != nil {
return fmt.Errorf("describing table %q: %w", im.table, err)
}
foundKey := false
for _, def := range table.Table.AttributeDefinitions {
if aws.StringValue(def.AttributeName) == "k" {
if aws.StringValue(def.AttributeType) != "S" {
return fmt.Errorf("table %q attribute \"k\" must have type \"S\"", im.table)
}
foundKey = true
}
}
if !foundKey {
return fmt.Errorf("table %q must have attribute \"k\" of type \"S\"", im.table)
}
if aws.StringValue(table.Table.KeySchema[0].AttributeName) != "k" {
return fmt.Errorf("table %q must have hash key \"k\"", im.table)
}
if len(table.Table.KeySchema) > 1 {
return fmt.Errorf("table %q must not have a range key", im.table)
}
labels := map[endpoint.EndpointKey]endpoint.Labels{}
err = im.dynamodbAPI.ScanPagesWithContext(ctx, &dynamodb.ScanInput{
TableName: aws.String(im.table),
FilterExpression: aws.String("o = :ownerval"),
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":ownerval": {S: aws.String(im.ownerID)},
},
ProjectionExpression: aws.String("k,l"),
ConsistentRead: aws.Bool(true),
}, func(output *dynamodb.ScanOutput, last bool) bool {
for _, item := range output.Items {
labels[fromDynamoKey(item["k"])] = fromDynamoLabels(item["l"], im.ownerID)
}
return true
})
if err != nil {
return fmt.Errorf("querying dynamodb: %w", err)
}
im.labels = labels
return nil
}
func fromDynamoKey(key *dynamodb.AttributeValue) endpoint.EndpointKey {
split := strings.SplitN(aws.StringValue(key.S), "#", 3)
return endpoint.EndpointKey{
DNSName: split[0],
RecordType: split[1],
SetIdentifier: split[2],
}
}
func toDynamoKey(key endpoint.EndpointKey) *dynamodb.AttributeValue {
return &dynamodb.AttributeValue{
S: aws.String(fmt.Sprintf("%s#%s#%s", key.DNSName, key.RecordType, key.SetIdentifier)),
}
}
func fromDynamoLabels(label *dynamodb.AttributeValue, owner string) endpoint.Labels {
labels := endpoint.NewLabels()
for k, v := range label.M {
labels[k] = aws.StringValue(v.S)
}
labels[endpoint.OwnerLabelKey] = owner
return labels
}
func toDynamoLabels(labels endpoint.Labels) *dynamodb.AttributeValue {
labelMap := make(map[string]*dynamodb.AttributeValue, len(labels))
for k, v := range labels {
if k == endpoint.OwnerLabelKey {
continue
}
labelMap[k] = &dynamodb.AttributeValue{S: aws.String(v)}
}
return &dynamodb.AttributeValue{M: labelMap}
}
func (im *DynamoDBRegistry) appendUpdate(statements []*dynamodb.BatchStatementRequest, key endpoint.EndpointKey, old endpoint.Labels, new endpoint.Labels) []*dynamodb.BatchStatementRequest {
if len(old) == len(new) {
equal := true
for k, v := range old {
if newV, exists := new[k]; !exists || v != newV {
equal = false
break
}
}
if equal {
return statements
}
}
return append(statements, &dynamodb.BatchStatementRequest{
Statement: aws.String(fmt.Sprintf("UPDATE %q SET \"l\"=? WHERE \"k\"=?", im.table)),
Parameters: []*dynamodb.AttributeValue{
toDynamoLabels(new),
toDynamoKey(key),
},
})
}
func (im *DynamoDBRegistry) appendDelete(statements []*dynamodb.BatchStatementRequest, key endpoint.EndpointKey) []*dynamodb.BatchStatementRequest {
return append(statements, &dynamodb.BatchStatementRequest{
Statement: aws.String(fmt.Sprintf("DELETE FROM %q WHERE \"k\"=? AND \"o\"=?", im.table)),
Parameters: []*dynamodb.AttributeValue{
toDynamoKey(key),
{S: aws.String(im.ownerID)},
},
})
}
func (im *DynamoDBRegistry) executeStatements(ctx context.Context, statements []*dynamodb.BatchStatementRequest, handleErr func(request *dynamodb.BatchStatementRequest, response *dynamodb.BatchStatementResponse) error) error {
for len(statements) > 0 {
var chunk []*dynamodb.BatchStatementRequest
if len(statements) > 25 {
chunk = chunk[:25]
statements = statements[25:]
} else {
chunk = statements
statements = nil
}
output, err := im.dynamodbAPI.BatchExecuteStatementWithContext(ctx, &dynamodb.BatchExecuteStatementInput{
Statements: chunk,
})
if err != nil {
return err
}
for i, response := range output.Responses {
request := chunk[i]
if response.Error == nil {
op, _, _ := strings.Cut(*request.Statement, " ")
var key string
if op == "UPDATE" {
key = *request.Parameters[1].S
} else {
key = *request.Parameters[0].S
}
log.Infof("%s dynamodb record %q", op, key)
} else {
if err := handleErr(request, response); err != nil {
return err
}
}
}
}
return nil
}
func (im *DynamoDBRegistry) addToCache(ep *endpoint.Endpoint) {
if im.recordsCache != nil {
im.recordsCache = append(im.recordsCache, ep)
}
}
func (im *DynamoDBRegistry) removeFromCache(ep *endpoint.Endpoint) {
if im.recordsCache == nil || ep == nil {
return
}
for i, e := range im.recordsCache {
if e.DNSName == ep.DNSName && e.RecordType == ep.RecordType && e.SetIdentifier == ep.SetIdentifier && e.Targets.Same(ep.Targets) {
// We found a match; delete the endpoint from the cache.
im.recordsCache = append(im.recordsCache[:i], im.recordsCache[i+1:]...)
return
}
}
}

995
registry/dynamodb_test.go Normal file
View File

@ -0,0 +1,995 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"context"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
"sigs.k8s.io/external-dns/plan"
"sigs.k8s.io/external-dns/provider"
"sigs.k8s.io/external-dns/provider/inmemory"
)
func TestDynamoDBRegistryNew(t *testing.T) {
api, p := newDynamoDBAPIStub(t, nil)
_, err := NewDynamoDBRegistry(p, "test-owner", api, "test-table", time.Hour)
require.NoError(t, err)
_, err = NewDynamoDBRegistry(p, "", api, "test-table", time.Hour)
require.EqualError(t, err, "owner id cannot be empty")
_, err = NewDynamoDBRegistry(p, "test-owner", api, "", time.Hour)
require.EqualError(t, err, "table cannot be empty")
}
func TestDynamoDBRegistryRecordsBadTable(t *testing.T) {
for _, tc := range []struct {
name string
setup func(desc *dynamodb.TableDescription)
expected string
}{
{
name: "missing attribute k",
setup: func(desc *dynamodb.TableDescription) {
desc.AttributeDefinitions[0].AttributeName = aws.String("wrong")
},
expected: "table \"test-table\" must have attribute \"k\" of type \"S\"",
},
{
name: "wrong attribute type",
setup: func(desc *dynamodb.TableDescription) {
desc.AttributeDefinitions[0].AttributeType = aws.String("SS")
},
expected: "table \"test-table\" attribute \"k\" must have type \"S\"",
},
{
name: "wrong key",
setup: func(desc *dynamodb.TableDescription) {
desc.KeySchema[0].AttributeName = aws.String("wrong")
},
expected: "table \"test-table\" must have hash key \"k\"",
},
{
name: "has range key",
setup: func(desc *dynamodb.TableDescription) {
desc.AttributeDefinitions = append(desc.AttributeDefinitions, &dynamodb.AttributeDefinition{
AttributeName: aws.String("o"),
AttributeType: aws.String("S"),
})
desc.KeySchema = append(desc.KeySchema, &dynamodb.KeySchemaElement{
AttributeName: aws.String("o"),
KeyType: aws.String("RANGE"),
})
},
expected: "table \"test-table\" must not have a range key",
},
} {
t.Run(tc.name, func(t *testing.T) {
api, p := newDynamoDBAPIStub(t, nil)
tc.setup(&api.tableDescription)
r, _ := NewDynamoDBRegistry(p, "test-owner", api, "test-table", time.Hour)
_, err := r.Records(context.Background())
assert.EqualError(t, err, tc.expected)
})
}
}
func TestDynamoDBRegistryRecords(t *testing.T) {
api, p := newDynamoDBAPIStub(t, nil)
ctx := context.Background()
expectedRecords := []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
}
r, _ := NewDynamoDBRegistry(p, "test-owner", api, "test-table", time.Hour)
records, err := r.Records(ctx)
require.Nil(t, err)
assert.True(t, testutils.SameEndpoints(records, expectedRecords))
}
func TestDynamoDBRegistryApplyChanges(t *testing.T) {
for _, tc := range []struct {
name string
stubConfig DynamoDBStubConfig
changes plan.Changes
expectedError string
expectedRecords []*endpoint.Endpoint
}{
{
name: "create",
changes: plan.Changes{
Create: []*endpoint.Endpoint{
{
DNSName: "new.test-zone.example.org",
Targets: endpoint.Targets{"new.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
SetIdentifier: "set-new",
Labels: map[string]string{
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{
ExpectInsert: map[string]map[string]string{
"new.test-zone.example.org#CNAME#set-new": {endpoint.ResourceLabelKey: "ingress/default/new-ingress"},
},
ExpectDelete: sets.New("quux.test-zone.example.org#A#set-2"),
},
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
{
DNSName: "new.test-zone.example.org",
Targets: endpoint.Targets{"new.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
SetIdentifier: "set-new",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
},
},
{
name: "create orphaned",
changes: plan.Changes{
Create: []*endpoint.Endpoint{
{
DNSName: "quux.test-zone.example.org",
Targets: endpoint.Targets{"5.5.5.5"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.ResourceLabelKey: "ingress/default/quux-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{},
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
{
DNSName: "quux.test-zone.example.org",
Targets: endpoint.Targets{"5.5.5.5"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/quux-ingress",
},
},
},
},
{
name: "create orphaned change",
changes: plan.Changes{
Create: []*endpoint.Endpoint{
{
DNSName: "quux.test-zone.example.org",
Targets: endpoint.Targets{"5.5.5.5"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{
ExpectUpdate: map[string]map[string]string{
"quux.test-zone.example.org#A#set-2": {endpoint.ResourceLabelKey: "ingress/default/new-ingress"},
},
},
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
{
DNSName: "quux.test-zone.example.org",
Targets: endpoint.Targets{"5.5.5.5"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
},
},
{
name: "create duplicate",
changes: plan.Changes{
Create: []*endpoint.Endpoint{
{
DNSName: "new.test-zone.example.org",
Targets: endpoint.Targets{"new.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
SetIdentifier: "set-new",
Labels: map[string]string{
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{
ExpectInsertError: map[string]string{
"new.test-zone.example.org#CNAME#set-new": "DuplicateItem",
},
ExpectDelete: sets.New("quux.test-zone.example.org#A#set-2"),
},
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
},
},
{
name: "create error",
changes: plan.Changes{
Create: []*endpoint.Endpoint{
{
DNSName: "new.test-zone.example.org",
Targets: endpoint.Targets{"new.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
SetIdentifier: "set-new",
Labels: map[string]string{
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{
ExpectInsertError: map[string]string{
"new.test-zone.example.org#CNAME#set-new": "TestingError",
},
},
expectedError: "inserting dynamodb record \"new.test-zone.example.org#CNAME#set-new\": TestingError: testing error",
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
},
},
{
name: "update",
changes: plan.Changes{
UpdateOld: []*endpoint.Endpoint{
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
},
UpdateNew: []*endpoint.Endpoint{
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"new-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{
ExpectDelete: sets.New("quux.test-zone.example.org#A#set-2"),
},
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"new-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
},
},
{
name: "update change",
changes: plan.Changes{
UpdateOld: []*endpoint.Endpoint{
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
},
UpdateNew: []*endpoint.Endpoint{
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"new-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{
ExpectDelete: sets.New("quux.test-zone.example.org#A#set-2"),
ExpectUpdate: map[string]map[string]string{
"bar.test-zone.example.org#CNAME#": {endpoint.ResourceLabelKey: "ingress/default/new-ingress"},
},
},
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"new-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
},
},
{
name: "update error",
changes: plan.Changes{
UpdateOld: []*endpoint.Endpoint{
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
},
UpdateNew: []*endpoint.Endpoint{
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"new-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/new-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{
ExpectUpdateError: map[string]string{
"bar.test-zone.example.org#CNAME#": "TestingError",
},
},
expectedError: "updating dynamodb record \"bar.test-zone.example.org#CNAME#\": TestingError: testing error",
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
},
},
{
name: "delete",
changes: plan.Changes{
Delete: []*endpoint.Endpoint{
{
DNSName: "bar.test-zone.example.org",
Targets: endpoint.Targets{"my-domain.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
},
},
stubConfig: DynamoDBStubConfig{
ExpectDelete: sets.New("bar.test-zone.example.org#CNAME#", "quux.test-zone.example.org#A#set-2"),
},
expectedRecords: []*endpoint.Endpoint{
{
DNSName: "foo.test-zone.example.org",
Targets: endpoint.Targets{"foo.loadbalancer.com"},
RecordType: endpoint.RecordTypeCNAME,
Labels: map[string]string{
endpoint.OwnerLabelKey: "",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"1.1.1.1"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-1",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/my-ingress",
},
},
{
DNSName: "baz.test-zone.example.org",
Targets: endpoint.Targets{"2.2.2.2"},
RecordType: endpoint.RecordTypeA,
SetIdentifier: "set-2",
Labels: map[string]string{
endpoint.OwnerLabelKey: "test-owner",
endpoint.ResourceLabelKey: "ingress/default/other-ingress",
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
api, p := newDynamoDBAPIStub(t, &tc.stubConfig)
ctx := context.Background()
r, _ := NewDynamoDBRegistry(p, "test-owner", api, "test-table", time.Hour)
_, err := r.Records(ctx)
require.Nil(t, err)
err = r.ApplyChanges(ctx, &tc.changes)
if tc.expectedError == "" {
assert.Nil(t, err)
} else {
assert.EqualError(t, err, tc.expectedError)
}
assert.Empty(t, tc.stubConfig.ExpectInsert, "all expected inserts made")
assert.Empty(t, tc.stubConfig.ExpectDelete, "all expected deletions made")
records, err := r.Records(ctx)
require.Nil(t, err)
assert.True(t, testutils.SameEndpoints(records, tc.expectedRecords))
r.recordsCache = nil
records, err = r.Records(ctx)
require.Nil(t, err)
assert.True(t, testutils.SameEndpoints(records, tc.expectedRecords))
if tc.expectedError == "" {
assert.Empty(t, r.orphanedLabels)
}
})
}
}
// DynamoDBAPIStub is a minimal implementation of DynamoDBAPI, used primarily for unit testing.
type DynamoDBStub struct {
t *testing.T
stubConfig *DynamoDBStubConfig
tableDescription dynamodb.TableDescription
changesApplied bool
}
type DynamoDBStubConfig struct {
ExpectInsert map[string]map[string]string
ExpectInsertError map[string]string
ExpectUpdate map[string]map[string]string
ExpectUpdateError map[string]string
ExpectDelete sets.Set[string]
}
type wrappedProvider struct {
provider.Provider
stub *DynamoDBStub
}
func (w *wrappedProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
assert.False(w.stub.t, w.stub.changesApplied, "ApplyChanges already called")
w.stub.changesApplied = true
return w.Provider.ApplyChanges(ctx, changes)
}
func newDynamoDBAPIStub(t *testing.T, stubConfig *DynamoDBStubConfig) (*DynamoDBStub, provider.Provider) {
stub := &DynamoDBStub{
t: t,
stubConfig: stubConfig,
tableDescription: dynamodb.TableDescription{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("k"),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("k"),
KeyType: aws.String("HASH"),
},
},
},
}
p := inmemory.NewInMemoryProvider()
_ = p.CreateZone(testZone)
_ = p.ApplyChanges(context.Background(), &plan.Changes{
Create: []*endpoint.Endpoint{
endpoint.NewEndpoint("foo.test-zone.example.org", endpoint.RecordTypeCNAME, "foo.loadbalancer.com"),
endpoint.NewEndpoint("bar.test-zone.example.org", endpoint.RecordTypeCNAME, "my-domain.com"),
endpoint.NewEndpoint("baz.test-zone.example.org", endpoint.RecordTypeA, "1.1.1.1").WithSetIdentifier("set-1"),
endpoint.NewEndpoint("baz.test-zone.example.org", endpoint.RecordTypeA, "2.2.2.2").WithSetIdentifier("set-2"),
},
})
return stub, &wrappedProvider{
Provider: p,
stub: stub,
}
}
func (r *DynamoDBStub) DescribeTableWithContext(ctx aws.Context, input *dynamodb.DescribeTableInput, opts ...request.Option) (*dynamodb.DescribeTableOutput, error) {
assert.NotNil(r.t, ctx)
assert.Equal(r.t, "test-table", *input.TableName, "table name")
return &dynamodb.DescribeTableOutput{
Table: &r.tableDescription,
}, nil
}
func (r *DynamoDBStub) ScanPagesWithContext(ctx aws.Context, input *dynamodb.ScanInput, fn func(*dynamodb.ScanOutput, bool) bool, opts ...request.Option) error {
assert.NotNil(r.t, ctx)
assert.Equal(r.t, "test-table", *input.TableName, "table name")
assert.Equal(r.t, "o = :ownerval", *input.FilterExpression)
assert.Len(r.t, input.ExpressionAttributeValues, 1)
assert.Equal(r.t, "test-owner", *input.ExpressionAttributeValues[":ownerval"].S)
assert.Equal(r.t, "k,l", *input.ProjectionExpression)
assert.True(r.t, *input.ConsistentRead)
fn(&dynamodb.ScanOutput{
Items: []map[string]*dynamodb.AttributeValue{
{
"k": &dynamodb.AttributeValue{S: aws.String("bar.test-zone.example.org#CNAME#")},
"l": &dynamodb.AttributeValue{M: map[string]*dynamodb.AttributeValue{
endpoint.ResourceLabelKey: {S: aws.String("ingress/default/my-ingress")},
}},
},
{
"k": &dynamodb.AttributeValue{S: aws.String("baz.test-zone.example.org#A#set-1")},
"l": &dynamodb.AttributeValue{M: map[string]*dynamodb.AttributeValue{
endpoint.ResourceLabelKey: {S: aws.String("ingress/default/my-ingress")},
}},
},
{
"k": &dynamodb.AttributeValue{S: aws.String("baz.test-zone.example.org#A#set-2")},
"l": &dynamodb.AttributeValue{M: map[string]*dynamodb.AttributeValue{
endpoint.ResourceLabelKey: {S: aws.String("ingress/default/other-ingress")},
}},
},
{
"k": &dynamodb.AttributeValue{S: aws.String("quux.test-zone.example.org#A#set-2")},
"l": &dynamodb.AttributeValue{M: map[string]*dynamodb.AttributeValue{
endpoint.ResourceLabelKey: {S: aws.String("ingress/default/quux-ingress")},
}},
},
},
}, true)
return nil
}
func (r *DynamoDBStub) BatchExecuteStatementWithContext(context aws.Context, input *dynamodb.BatchExecuteStatementInput, option ...request.Option) (*dynamodb.BatchExecuteStatementOutput, error) {
assert.NotNil(r.t, context)
hasDelete := strings.HasPrefix(strings.ToLower(aws.StringValue(input.Statements[0].Statement)), "delete")
assert.Equal(r.t, hasDelete, r.changesApplied, "delete after provider changes, everything else before")
assert.LessOrEqual(r.t, len(input.Statements), 25)
responses := make([]*dynamodb.BatchStatementResponse, 0, len(input.Statements))
for _, statement := range input.Statements {
assert.Equal(r.t, hasDelete, strings.HasPrefix(strings.ToLower(aws.StringValue(statement.Statement)), "delete"))
switch aws.StringValue(statement.Statement) {
case "DELETE FROM \"test-table\" WHERE \"k\"=? AND \"o\"=?":
assert.True(r.t, r.changesApplied, "unexpected delete before provider changes")
key := aws.StringValue(statement.Parameters[0].S)
assert.True(r.t, r.stubConfig.ExpectDelete.Has(key), "unexpected delete for key %q", key)
r.stubConfig.ExpectDelete.Delete(key)
assert.Equal(r.t, "test-owner", aws.StringValue(statement.Parameters[1].S))
responses = append(responses, &dynamodb.BatchStatementResponse{})
case "INSERT INTO \"test-table\" VALUE {'k':?, 'o':?, 'l':?}":
assert.False(r.t, r.changesApplied, "unexpected insert after provider changes")
key := aws.StringValue(statement.Parameters[0].S)
if code, exists := r.stubConfig.ExpectInsertError[key]; exists {
delete(r.stubConfig.ExpectInsertError, key)
responses = append(responses, &dynamodb.BatchStatementResponse{
Error: &dynamodb.BatchStatementError{
Code: aws.String(code),
Message: aws.String("testing error"),
},
})
break
}
expectedLabels, found := r.stubConfig.ExpectInsert[key]
assert.True(r.t, found, "unexpected insert for key %q", key)
delete(r.stubConfig.ExpectInsert, key)
assert.Equal(r.t, "test-owner", aws.StringValue(statement.Parameters[1].S))
for label, attribute := range statement.Parameters[2].M {
value := aws.StringValue(attribute.S)
expectedValue, found := expectedLabels[label]
assert.True(r.t, found, "insert for key %q has unexpected label %q", key, label)
delete(expectedLabels, label)
assert.Equal(r.t, expectedValue, value, "insert for key %q label %q value", key, label)
}
for label := range expectedLabels {
r.t.Errorf("insert for key %q did not get expected label %q", key, label)
}
responses = append(responses, &dynamodb.BatchStatementResponse{})
case "UPDATE \"test-table\" SET \"l\"=? WHERE \"k\"=?":
assert.False(r.t, r.changesApplied, "unexpected update after provider changes")
key := aws.StringValue(statement.Parameters[1].S)
if code, exists := r.stubConfig.ExpectUpdateError[key]; exists {
delete(r.stubConfig.ExpectInsertError, key)
responses = append(responses, &dynamodb.BatchStatementResponse{
Error: &dynamodb.BatchStatementError{
Code: aws.String(code),
Message: aws.String("testing error"),
},
})
break
}
expectedLabels, found := r.stubConfig.ExpectUpdate[key]
assert.True(r.t, found, "unexpected update for key %q", key)
delete(r.stubConfig.ExpectUpdate, key)
for label, attribute := range statement.Parameters[0].M {
value := aws.StringValue(attribute.S)
expectedValue, found := expectedLabels[label]
assert.True(r.t, found, "update for key %q has unexpected label %q", key, label)
delete(expectedLabels, label)
assert.Equal(r.t, expectedValue, value, "update for key %q label %q value", key, label)
}
for label := range expectedLabels {
r.t.Errorf("update for key %q did not get expected label %q", key, label)
}
responses = append(responses, &dynamodb.BatchStatementResponse{})
default:
r.t.Errorf("unexpected statement: %s", aws.StringValue(statement.Statement))
}
}
return &dynamodb.BatchExecuteStatementOutput{
Responses: responses,
}, nil
}

View File

@ -294,10 +294,6 @@ func (im *TXTRegistry) AdjustEndpoints(endpoints []*endpoint.Endpoint) []*endpoi
return im.provider.AdjustEndpoints(endpoints)
}
/**
TXT registry specific private methods
*/
/**
nameMapper is the interface for mapping between the endpoint for the source
and the endpoint for the TXT record.
@ -452,7 +448,6 @@ func (im *TXTRegistry) addToCache(ep *endpoint.Endpoint) {
func (im *TXTRegistry) removeFromCache(ep *endpoint.Endpoint) {
if im.recordsCache == nil || ep == nil {
// return early.
return
}

View File

@ -219,7 +219,7 @@ func testTXTRegistryRecordsPrefixed(t *testing.T) {
assert.True(t, testutils.SameEndpoints(records, expectedRecords))
// Ensure prefix is case-insensitive
r, _ = NewTXTRegistry(p, "TxT.", "", "owner", time.Hour, "", []string{}, false, nil)
r, _ = NewTXTRegistry(p, "TxT.", "", "owner", time.Hour, "wc", []string{}, false, nil)
records, _ = r.Records(ctx)
assert.True(t, testutils.SameEndpointLabels(records, expectedRecords))

View File

@ -88,7 +88,7 @@ func (ns *nodeSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, erro
return nil, err
}
endpoints := map[endpointKey]*endpoint.Endpoint{}
endpoints := map[endpoint.EndpointKey]*endpoint.Endpoint{}
// create endpoints for all nodes
for _, node := range nodes {
@ -136,13 +136,13 @@ func (ns *nodeSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, erro
ep.Labels = endpoint.NewLabels()
for _, addr := range addrs {
log.Debugf("adding endpoint %s target %s", ep, addr)
key := endpointKey{
dnsName: ep.DNSName,
recordType: suitableType(addr),
key := endpoint.EndpointKey{
DNSName: ep.DNSName,
RecordType: suitableType(addr),
}
if _, ok := endpoints[key]; !ok {
epCopy := *ep
epCopy.RecordType = key.recordType
epCopy.RecordType = key.RecordType
endpoints[key] = &epCopy
}
endpoints[key].Targets = append(endpoints[key].Targets, addr)

View File

@ -82,7 +82,7 @@ func (ps *podSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
return nil, err
}
endpointMap := make(map[endpointKey][]string)
endpointMap := make(map[endpoint.EndpointKey][]string)
for _, pod := range pods {
if !pod.Spec.HostNetwork {
log.Debugf("skipping pod %s. hostNetwork=false", pod.Name)
@ -135,15 +135,15 @@ func (ps *podSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
}
endpoints := []*endpoint.Endpoint{}
for key, targets := range endpointMap {
endpoints = append(endpoints, endpoint.NewEndpoint(key.dnsName, key.recordType, targets...))
endpoints = append(endpoints, endpoint.NewEndpoint(key.DNSName, key.RecordType, targets...))
}
return endpoints, nil
}
func addToEndpointMap(endpointMap map[endpointKey][]string, domain string, recordType string, address string) {
key := endpointKey{
dnsName: domain,
recordType: recordType,
func addToEndpointMap(endpointMap map[endpoint.EndpointKey][]string, domain string, recordType string, address string) {
key := endpoint.EndpointKey{
DNSName: domain,
RecordType: recordType,
}
if _, ok := endpointMap[key]; !ok {
endpointMap[key] = []string{}

View File

@ -271,7 +271,7 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations)
targetsByHeadlessDomainAndType := make(map[endpointKey]endpoint.Targets)
targetsByHeadlessDomainAndType := make(map[endpoint.EndpointKey]endpoint.Targets)
for _, subset := range endpointsObject.Subsets {
addresses := subset.Addresses
if svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses {
@ -325,9 +325,9 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
}
}
for _, target := range targets {
key := endpointKey{
dnsName: headlessDomain,
recordType: suitableType(target),
key := endpoint.EndpointKey{
DNSName: headlessDomain,
RecordType: suitableType(target),
}
targetsByHeadlessDomainAndType[key] = append(targetsByHeadlessDomainAndType[key], target)
}
@ -335,15 +335,15 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
}
}
headlessKeys := []endpointKey{}
headlessKeys := []endpoint.EndpointKey{}
for headlessKey := range targetsByHeadlessDomainAndType {
headlessKeys = append(headlessKeys, headlessKey)
}
sort.Slice(headlessKeys, func(i, j int) bool {
if headlessKeys[i].dnsName != headlessKeys[j].dnsName {
return headlessKeys[i].dnsName < headlessKeys[j].dnsName
if headlessKeys[i].DNSName != headlessKeys[j].DNSName {
return headlessKeys[i].DNSName < headlessKeys[j].DNSName
}
return headlessKeys[i].recordType < headlessKeys[j].recordType
return headlessKeys[i].RecordType < headlessKeys[j].RecordType
})
for _, headlessKey := range headlessKeys {
allTargets := targetsByHeadlessDomainAndType[headlessKey]
@ -361,9 +361,9 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
}
if ttl.IsConfigured() {
endpoints = append(endpoints, endpoint.NewEndpointWithTTL(headlessKey.dnsName, headlessKey.recordType, ttl, targets...))
endpoints = append(endpoints, endpoint.NewEndpointWithTTL(headlessKey.DNSName, headlessKey.RecordType, ttl, targets...))
} else {
endpoints = append(endpoints, endpoint.NewEndpoint(headlessKey.dnsName, headlessKey.recordType, targets...))
endpoints = append(endpoints, endpoint.NewEndpoint(headlessKey.DNSName, headlessKey.RecordType, targets...))
}
}

View File

@ -86,12 +86,6 @@ type Source interface {
AddEventHandler(context.Context, func())
}
// endpointKey is the type of a map key for separating endpoints or targets.
type endpointKey struct {
dnsName string
recordType string
}
func getTTLFromAnnotations(annotations map[string]string) (endpoint.TTL, error) {
ttlNotConfigured := endpoint.TTL(0)
ttlAnnotation, exists := annotations[ttlAnnotationKey]