external-dns/provider/awssd/aws_sd.go

672 lines
21 KiB
Go

/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package awssd
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"regexp"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
sd "github.com/aws/aws-sdk-go-v2/service/servicediscovery"
sdtypes "github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
log "github.com/sirupsen/logrus"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/plan"
"sigs.k8s.io/external-dns/provider"
)
const (
defaultTTL = 300
// https://github.com/aws/aws-sdk-go-v2/blob/cf8509382340d6afdc93612550d56d685181bbb3/service/servicediscovery/api_op_ListServices.go#L42
maxResults = 100
sdNamespaceTypePublic = "public"
sdNamespaceTypePrivate = "private"
sdInstanceAttrIPV4 = "AWS_INSTANCE_IPV4"
sdInstanceAttrIPV6 = "AWS_INSTANCE_IPV6"
sdInstanceAttrCname = "AWS_INSTANCE_CNAME"
sdInstanceAttrAlias = "AWS_ALIAS_DNS_NAME"
)
var (
// matches ELB with hostname format load-balancer.us-east-1.elb.amazonaws.com
sdElbHostnameRegex = regexp.MustCompile(`.+\.[^.]+\.elb\.amazonaws\.com$`)
// matches NLB with hostname format load-balancer.elb.us-east-1.amazonaws.com
sdNlbHostnameRegex = regexp.MustCompile(`.+\.elb\.[^.]+\.amazonaws\.com$`)
)
// AWSSDClient is the subset of the AWS Cloud Map API that we actually use. Add methods as required.
// Signatures must match exactly. Taken from https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/servicediscovery
type AWSSDClient interface {
CreateService(ctx context.Context, params *sd.CreateServiceInput, optFns ...func(*sd.Options)) (*sd.CreateServiceOutput, error)
DeregisterInstance(ctx context.Context, params *sd.DeregisterInstanceInput, optFns ...func(*sd.Options)) (*sd.DeregisterInstanceOutput, error)
DiscoverInstances(ctx context.Context, params *sd.DiscoverInstancesInput, optFns ...func(*sd.Options)) (*sd.DiscoverInstancesOutput, error)
ListNamespaces(ctx context.Context, params *sd.ListNamespacesInput, optFns ...func(*sd.Options)) (*sd.ListNamespacesOutput, error)
ListServices(ctx context.Context, params *sd.ListServicesInput, optFns ...func(*sd.Options)) (*sd.ListServicesOutput, error)
RegisterInstance(ctx context.Context, params *sd.RegisterInstanceInput, optFns ...func(*sd.Options)) (*sd.RegisterInstanceOutput, error)
UpdateService(ctx context.Context, params *sd.UpdateServiceInput, optFns ...func(*sd.Options)) (*sd.UpdateServiceOutput, error)
DeleteService(ctx context.Context, params *sd.DeleteServiceInput, optFns ...func(*sd.Options)) (*sd.DeleteServiceOutput, error)
}
// AWSSDProvider is an implementation of Provider for AWS Cloud Map.
type AWSSDProvider struct {
provider.BaseProvider
client AWSSDClient
dryRun bool
// only consider namespaces ending in this suffix
namespaceFilter *endpoint.DomainFilter
// filter namespace by type (private or public)
namespaceTypeFilter []sdtypes.NamespaceFilter
// enables service without instances cleanup
cleanEmptyService bool
// filter services for removal
ownerID string
// tags to be added to the service
tags []sdtypes.Tag
}
// NewAWSSDProvider initializes a new AWS Cloud Map based Provider.
func NewAWSSDProvider(domainFilter *endpoint.DomainFilter, namespaceType string, dryRun, cleanEmptyService bool, ownerID string, tags map[string]string, client AWSSDClient) (*AWSSDProvider, error) {
p := &AWSSDProvider{
client: client,
dryRun: dryRun,
namespaceFilter: domainFilter,
namespaceTypeFilter: newSdNamespaceFilter(namespaceType),
cleanEmptyService: cleanEmptyService,
ownerID: ownerID,
tags: awsTags(tags),
}
return p, nil
}
// newSdNamespaceFilter returns NamespaceFilter based on the given namespace type configuration.
// If the config is "public", it filters for public namespaces; if "private", for private namespaces.
// For any other value (including empty), it returns filters for both public and private namespaces.
// ref: https://docs.aws.amazon.com/cloud-map/latest/api/API_ListNamespaces.html
func newSdNamespaceFilter(namespaceTypeConfig string) []sdtypes.NamespaceFilter {
switch namespaceTypeConfig {
case sdNamespaceTypePublic:
return []sdtypes.NamespaceFilter{
{
Name: sdtypes.NamespaceFilterNameType,
Values: []string{string(sdtypes.NamespaceTypeDnsPublic)},
},
}
case sdNamespaceTypePrivate:
return []sdtypes.NamespaceFilter{
{
Name: sdtypes.NamespaceFilterNameType,
Values: []string{string(sdtypes.NamespaceTypeDnsPrivate)},
},
}
default:
return []sdtypes.NamespaceFilter{}
}
}
// awsTags converts user-supplied tags to AWS format
func awsTags(tags map[string]string) []sdtypes.Tag {
awsTags := make([]sdtypes.Tag, 0, len(tags))
for k, v := range tags {
awsTags = append(awsTags, sdtypes.Tag{Key: aws.String(k), Value: aws.String(v)})
}
return awsTags
}
// Records returns list of all endpoints.
func (p *AWSSDProvider) Records(ctx context.Context) ([]*endpoint.Endpoint, error) {
namespaces, err := p.ListNamespaces(ctx)
if err != nil {
return nil, err
}
endpoints := make([]*endpoint.Endpoint, 0)
for _, ns := range namespaces {
services, err := p.ListServicesByNamespaceID(ctx, ns.Id)
if err != nil {
return nil, err
}
for _, srv := range services {
resp, err := p.client.DiscoverInstances(ctx, &sd.DiscoverInstancesInput{
NamespaceName: ns.Name,
ServiceName: srv.Name,
})
if err != nil {
return nil, err
}
if len(resp.Instances) == 0 {
if err := p.DeleteService(ctx, srv); err != nil {
log.Errorf("Failed to delete service %q, error: %s", *srv.Name, err)
}
continue
}
if srv.Description == nil {
log.Warnf("Skipping service %q as owner id not configured", *srv.Name)
continue
}
endpoints = append(endpoints, p.instancesToEndpoint(ns, srv, resp.Instances))
}
}
return endpoints, nil
}
func (p *AWSSDProvider) instancesToEndpoint(ns *sdtypes.NamespaceSummary, srv *sdtypes.Service, instances []sdtypes.HttpInstanceSummary) *endpoint.Endpoint {
// DNS name of the record is a concatenation of service and namespace
recordName := *srv.Name + "." + *ns.Name
labels := endpoint.NewLabels()
labels[endpoint.AWSSDDescriptionLabel] = *srv.Description
newEndpoint := &endpoint.Endpoint{
DNSName: recordName,
RecordTTL: endpoint.TTL(*srv.DnsConfig.DnsRecords[0].TTL),
Targets: make(endpoint.Targets, 0, len(instances)),
Labels: labels,
}
for _, inst := range instances {
// CNAME
if inst.Attributes[sdInstanceAttrCname] != "" && srv.DnsConfig.DnsRecords[0].Type == sdtypes.RecordTypeCname {
newEndpoint.RecordType = endpoint.RecordTypeCNAME
newEndpoint.Targets = append(newEndpoint.Targets, inst.Attributes[sdInstanceAttrCname])
// ALIAS
} else if inst.Attributes[sdInstanceAttrAlias] != "" {
newEndpoint.RecordType = endpoint.RecordTypeCNAME
newEndpoint.Targets = append(newEndpoint.Targets, inst.Attributes[sdInstanceAttrAlias])
// IPv4-based target
} else if inst.Attributes[sdInstanceAttrIPV4] != "" {
newEndpoint.RecordType = endpoint.RecordTypeA
newEndpoint.Targets = append(newEndpoint.Targets, inst.Attributes[sdInstanceAttrIPV4])
// IPv6-based target
} else if inst.Attributes[sdInstanceAttrIPV6] != "" {
newEndpoint.RecordType = endpoint.RecordTypeAAAA
newEndpoint.Targets = append(newEndpoint.Targets, inst.Attributes[sdInstanceAttrIPV6])
} else {
log.Warnf("Invalid instance \"%v\" found in service \"%v\"", inst, srv.Name)
}
}
return newEndpoint
}
// ApplyChanges applies Kubernetes changes in endpoints to AWS API
func (p *AWSSDProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
// return early if there is nothing to change
if len(changes.Create) == 0 && len(changes.Delete) == 0 && len(changes.Update) == 0 {
log.Info("All records are already up to date")
return nil
}
// convert updates to delete and create operation if applicable (updates not supported)
creates, deletes := p.updatesToCreates(changes)
changes.Delete = append(changes.Delete, deletes...)
changes.Create = append(changes.Create, creates...)
namespaces, err := p.ListNamespaces(ctx)
if err != nil {
return err
}
err = p.submitDeletes(ctx, namespaces, changes.Delete)
if err != nil {
return err
}
err = p.submitCreates(ctx, namespaces, changes.Create)
if err != nil {
return err
}
return nil
}
func (p *AWSSDProvider) updatesToCreates(changes *plan.Changes) ([]*endpoint.Endpoint, []*endpoint.Endpoint) {
updateNewMap := map[string]*endpoint.Endpoint{}
for _, e := range changes.UpdateNew() {
updateNewMap[e.DNSName] = e
}
var creates, deletes []*endpoint.Endpoint
for _, old := range changes.UpdateOld() {
current := updateNewMap[old.DNSName]
if !old.Targets.Same(current.Targets) {
currentTargetsMap := make(map[string]struct{}, len(current.Targets))
for _, newTarget := range current.Targets {
currentTargetsMap[newTarget] = struct{}{}
}
// If targets changed, only deregister removed targets (i.e. in `UpdateOld` but not in `UpdateNew`)
targetsToRemove := make(endpoint.Targets, 0)
for _, oldTarget := range old.Targets {
if _, found := currentTargetsMap[oldTarget]; !found {
targetsToRemove = append(targetsToRemove, oldTarget)
}
}
old.Targets = targetsToRemove
deletes = append(deletes, old)
}
// always register (or re-register) instance with the current data
creates = append(creates, current)
}
return creates, deletes
}
func (p *AWSSDProvider) submitCreates(ctx context.Context, namespaces []*sdtypes.NamespaceSummary, changes []*endpoint.Endpoint) error {
changesByNamespaceID := p.changesByNamespaceID(namespaces, changes)
for nsID, changeList := range changesByNamespaceID {
services, err := p.ListServicesByNamespaceID(ctx, aws.String(nsID))
if err != nil {
return err
}
for _, ch := range changeList {
_, srvName := p.parseHostname(ch.DNSName)
srv := services[srvName]
if srv == nil {
// when service is missing create a new one
srv, err = p.CreateService(ctx, &nsID, &srvName, ch)
if err != nil {
return err
}
// update a local list of services
services[*srv.Name] = srv
} else if ch.RecordTTL.IsConfigured() && *srv.DnsConfig.DnsRecords[0].TTL != int64(ch.RecordTTL) {
// update service when TTL differ
err = p.UpdateService(ctx, srv, ch)
if err != nil {
return err
}
}
err = p.RegisterInstance(ctx, srv, ch)
if err != nil {
return err
}
}
}
return nil
}
func (p *AWSSDProvider) submitDeletes(ctx context.Context, namespaces []*sdtypes.NamespaceSummary, changes []*endpoint.Endpoint) error {
changesByNamespaceID := p.changesByNamespaceID(namespaces, changes)
for nsID, changeList := range changesByNamespaceID {
services, err := p.ListServicesByNamespaceID(ctx, aws.String(nsID))
if err != nil {
return err
}
for _, ch := range changeList {
hostname := ch.DNSName
_, srvName := p.parseHostname(hostname)
srv := services[srvName]
if srv == nil {
return fmt.Errorf("service \"%s\" is missing when trying to delete \"%v\"", srvName, hostname)
}
err := p.DeregisterInstance(ctx, srv, ch)
if err != nil {
return err
}
}
}
return nil
}
// ListNamespaces returns all namespaces matching defined namespace filter
func (p *AWSSDProvider) ListNamespaces(ctx context.Context) ([]*sdtypes.NamespaceSummary, error) {
namespaces := make([]*sdtypes.NamespaceSummary, 0)
paginator := sd.NewListNamespacesPaginator(p.client, &sd.ListNamespacesInput{
Filters: p.namespaceTypeFilter,
})
for paginator.HasMorePages() {
resp, err := paginator.NextPage(ctx)
if err != nil {
return nil, err
}
for _, ns := range resp.Namespaces {
if !p.namespaceFilter.Match(*ns.Name) {
continue
}
namespaces = append(namespaces, &ns)
}
}
return namespaces, nil
}
// ListServicesByNamespaceID returns a list of services in a given namespace.
func (p *AWSSDProvider) ListServicesByNamespaceID(ctx context.Context, namespaceID *string) (map[string]*sdtypes.Service, error) {
services := make([]sdtypes.ServiceSummary, 0)
paginator := sd.NewListServicesPaginator(p.client, &sd.ListServicesInput{
Filters: []sdtypes.ServiceFilter{{
Name: sdtypes.ServiceFilterNameNamespaceId,
Values: []string{*namespaceID},
}},
MaxResults: aws.Int32(maxResults),
})
for paginator.HasMorePages() {
resp, err := paginator.NextPage(ctx)
if err != nil {
return nil, err
}
services = append(services, resp.Services...)
}
servicesMap := make(map[string]*sdtypes.Service)
for _, serviceSummary := range services {
service := &sdtypes.Service{
Arn: serviceSummary.Arn,
CreateDate: serviceSummary.CreateDate,
Description: serviceSummary.Description,
DnsConfig: serviceSummary.DnsConfig,
HealthCheckConfig: serviceSummary.HealthCheckConfig,
HealthCheckCustomConfig: serviceSummary.HealthCheckCustomConfig,
Id: serviceSummary.Id,
InstanceCount: serviceSummary.InstanceCount,
Name: serviceSummary.Name,
NamespaceId: namespaceID,
Type: serviceSummary.Type,
}
servicesMap[*service.Name] = service
}
return servicesMap, nil
}
// CreateService creates a new service in AWS API. Returns the created service.
func (p *AWSSDProvider) CreateService(ctx context.Context, namespaceID *string, srvName *string, ep *endpoint.Endpoint) (*sdtypes.Service, error) {
log.Infof("Creating a new service \"%s\" in \"%s\" namespace", *srvName, *namespaceID)
srvType := p.serviceTypeFromEndpoint(ep)
routingPolicy := p.routingPolicyFromEndpoint(ep)
ttl := int64(defaultTTL)
if ep.RecordTTL.IsConfigured() {
ttl = int64(ep.RecordTTL)
}
if p.dryRun {
// return a mock service summary in case of a dry run
return &sdtypes.Service{Id: aws.String("dry-run-service"), Name: aws.String("dry-run-service")}, nil
}
out, err := p.client.CreateService(ctx, &sd.CreateServiceInput{
Name: srvName,
Description: aws.String(ep.Labels[endpoint.AWSSDDescriptionLabel]),
DnsConfig: &sdtypes.DnsConfig{
RoutingPolicy: routingPolicy,
DnsRecords: []sdtypes.DnsRecord{{
Type: srvType,
TTL: aws.Int64(ttl),
}},
},
NamespaceId: namespaceID,
Tags: p.tags,
})
if err != nil {
return nil, err
}
return out.Service, nil
}
// UpdateService updates the specified service with information from the provided endpoint.
func (p *AWSSDProvider) UpdateService(ctx context.Context, service *sdtypes.Service, ep *endpoint.Endpoint) error {
log.Infof("Updating service \"%s\"", *service.Name)
srvType := p.serviceTypeFromEndpoint(ep)
ttl := int64(defaultTTL)
if ep.RecordTTL.IsConfigured() {
ttl = int64(ep.RecordTTL)
}
if p.dryRun {
return nil
}
_, err := p.client.UpdateService(ctx, &sd.UpdateServiceInput{
Id: service.Id,
Service: &sdtypes.ServiceChange{
Description: aws.String(ep.Labels[endpoint.AWSSDDescriptionLabel]),
DnsConfig: &sdtypes.DnsConfigChange{
DnsRecords: []sdtypes.DnsRecord{{
Type: srvType,
TTL: aws.Int64(ttl),
}},
},
},
})
return err
}
// DeleteService deletes empty Service from AWS API if its owner id match
func (p *AWSSDProvider) DeleteService(ctx context.Context, service *sdtypes.Service) error {
log.Debugf("Check if service \"%s\" owner id match and it can be deleted", *service.Name)
if p.dryRun || !p.cleanEmptyService {
return nil
}
// convert ownerID string to the service description format
label := endpoint.NewLabels()
label[endpoint.OwnerLabelKey] = p.ownerID
label[endpoint.AWSSDDescriptionLabel] = label.SerializePlain(false)
if service.Description == nil {
log.Debugf("Skipping service removal %q because owner id (service.Description) not set, when should be %q", *service.Name, label[endpoint.AWSSDDescriptionLabel])
return nil
}
if strings.HasPrefix(*service.Description, label[endpoint.AWSSDDescriptionLabel]) {
log.Infof("Deleting service \"%s\"", *service.Name)
_, err := p.client.DeleteService(ctx, &sd.DeleteServiceInput{
Id: aws.String(*service.Id),
})
return err
}
log.Debugf("Skipping service removal %q because owner id does not match, found: %q, required: %q", *service.Name, *service.Description, label[endpoint.AWSSDDescriptionLabel])
return nil
}
// RegisterInstance creates a new instance in given service.
func (p *AWSSDProvider) RegisterInstance(ctx context.Context, service *sdtypes.Service, ep *endpoint.Endpoint) error {
for _, target := range ep.Targets {
log.Infof("Registering a new instance \"%s\" for service \"%s\" (%s)", target, *service.Name, *service.Id)
attr := make(map[string]string)
switch ep.RecordType {
case endpoint.RecordTypeCNAME:
if p.isAWSLoadBalancer(target) {
attr[sdInstanceAttrAlias] = target
} else {
attr[sdInstanceAttrCname] = target
}
case endpoint.RecordTypeA:
attr[sdInstanceAttrIPV4] = target
case endpoint.RecordTypeAAAA:
attr[sdInstanceAttrIPV6] = target
default:
return fmt.Errorf("invalid endpoint type (%v)", ep)
}
if !p.dryRun {
_, err := p.client.RegisterInstance(ctx, &sd.RegisterInstanceInput{
ServiceId: service.Id,
Attributes: attr,
InstanceId: aws.String(p.targetToInstanceID(target)),
})
if err != nil {
return err
}
}
}
return nil
}
// DeregisterInstance removes an instance from given service.
func (p *AWSSDProvider) DeregisterInstance(ctx context.Context, service *sdtypes.Service, ep *endpoint.Endpoint) error {
for _, target := range ep.Targets {
log.Infof("De-registering an instance \"%s\" for service \"%s\" (%s)", target, *service.Name, *service.Id)
if !p.dryRun {
_, err := p.client.DeregisterInstance(ctx, &sd.DeregisterInstanceInput{
InstanceId: aws.String(p.targetToInstanceID(target)),
ServiceId: service.Id,
})
if err != nil {
return err
}
}
}
return nil
}
// Instance ID length is limited by AWS API to 64 characters. For longer strings SHA-256 hash will be used instead of
// the verbatim target to limit the length.
func (p *AWSSDProvider) targetToInstanceID(target string) string {
if len(target) > 64 {
hash := sha256.Sum256([]byte(strings.ToLower(target)))
return hex.EncodeToString(hash[:])
}
return strings.ToLower(target)
}
func (p *AWSSDProvider) changesByNamespaceID(namespaces []*sdtypes.NamespaceSummary, changes []*endpoint.Endpoint) map[string][]*endpoint.Endpoint {
changesByNsID := make(map[string][]*endpoint.Endpoint)
for _, ns := range namespaces {
changesByNsID[*ns.Id] = []*endpoint.Endpoint{}
}
for _, c := range changes {
// trim the trailing dot from hostname if any
hostname := strings.TrimSuffix(c.DNSName, ".")
nsName, _ := p.parseHostname(hostname)
matchingNamespaces := matchingNamespaces(nsName, namespaces)
if len(matchingNamespaces) == 0 {
log.Warnf("Skipping record %s because no namespace matching record DNS Name was detected ", c.String())
continue
}
for _, ns := range matchingNamespaces {
changesByNsID[*ns.Id] = append(changesByNsID[*ns.Id], c)
}
}
// separating a change could lead to empty sub changes, remove them here.
for zone, change := range changesByNsID {
if len(change) == 0 {
delete(changesByNsID, zone)
}
}
return changesByNsID
}
// returns list of all namespaces matching given hostname
func matchingNamespaces(hostname string, namespaces []*sdtypes.NamespaceSummary) []*sdtypes.NamespaceSummary {
matchingNamespaces := make([]*sdtypes.NamespaceSummary, 0)
for _, ns := range namespaces {
if *ns.Name == hostname {
matchingNamespaces = append(matchingNamespaces, ns)
}
}
return matchingNamespaces
}
// parseHostname parse hostname to namespace (domain) and service
func (p *AWSSDProvider) parseHostname(hostname string) (string, string) {
parts := strings.Split(hostname, ".")
return strings.Join(parts[1:], "."), parts[0]
}
// determine service routing policy based on endpoint type
func (p *AWSSDProvider) routingPolicyFromEndpoint(ep *endpoint.Endpoint) sdtypes.RoutingPolicy {
if ep.RecordType == endpoint.RecordTypeA || ep.RecordType == endpoint.RecordTypeAAAA {
return sdtypes.RoutingPolicyMultivalue
}
return sdtypes.RoutingPolicyWeighted
}
// determine the service type (A, AAAA, CNAME) from a given endpoint
func (p *AWSSDProvider) serviceTypeFromEndpoint(ep *endpoint.Endpoint) sdtypes.RecordType {
switch ep.RecordType {
case endpoint.RecordTypeCNAME:
// FIXME service type is derived from the first target only. Theoretically this may be problem.
// But I don't see a scenario where one endpoint contains targets of different types.
if p.isAWSLoadBalancer(ep.Targets[0]) {
// ALIAS target uses DNS record of type A
return sdtypes.RecordTypeA
}
return sdtypes.RecordTypeCname
case endpoint.RecordTypeAAAA:
return sdtypes.RecordTypeAaaa
default:
return sdtypes.RecordTypeA
}
}
// determine if a given hostname belongs to an AWS load balancer
func (p *AWSSDProvider) isAWSLoadBalancer(hostname string) bool {
matchElb := sdElbHostnameRegex.MatchString(hostname)
matchNlb := sdNlbHostnameRegex.MatchString(hostname)
return matchElb || matchNlb
}