Update CoreDNS provider to use etcd v3 client

This commit is contained in:
shashidharatd 2018-08-21 22:39:38 +05:30
parent 21b7c3a613
commit c668865e65
2 changed files with 81 additions and 71 deletions

View File

@ -17,7 +17,7 @@ limitations under the License.
package provider package provider
import ( import (
"container/list" "context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
@ -26,14 +26,12 @@ import (
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"net" "net"
"net/http"
"os" "os"
"strings" "strings"
"time" "time"
etcd "github.com/coreos/etcd/client" etcdcv3 "github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"github.com/kubernetes-incubator/external-dns/endpoint" "github.com/kubernetes-incubator/external-dns/endpoint"
"github.com/kubernetes-incubator/external-dns/plan" "github.com/kubernetes-incubator/external-dns/plan"
@ -43,8 +41,17 @@ func init() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }
// skyDNSClient is an interface to work with SkyDNS service records in etcd const (
type skyDNSClient interface { priority = 10 // default priority when nothing is set
etcdTimeout = 5 * time.Second
coreDNSPrefix = "/skydns/"
randomPrefixLabel = "prefix"
)
// coreDNSClient is an interface to work with CoreDNS service records in etcd
type coreDNSClient interface {
GetServices(prefix string) ([]*Service, error) GetServices(prefix string) ([]*Service, error)
SaveService(value *Service) error SaveService(value *Service) error
DeleteService(key string) error DeleteService(key string) error
@ -53,10 +60,10 @@ type skyDNSClient interface {
type coreDNSProvider struct { type coreDNSProvider struct {
dryRun bool dryRun bool
domainFilter DomainFilter domainFilter DomainFilter
client skyDNSClient client coreDNSClient
} }
// Service represents SkyDNS/CoreDNS etcd record // Service represents CoreDNS etcd record
type Service struct { type Service struct {
Host string `json:"host,omitempty"` Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"` Port int `json:"port,omitempty"`
@ -83,52 +90,58 @@ type Service struct {
} }
type etcdClient struct { type etcdClient struct {
api etcd.KeysAPI client *etcdcv3.Client
ctx context.Context
} }
var _ skyDNSClient = etcdClient{} var _ coreDNSClient = etcdClient{}
// GetService return all Service records stored in etcd stored anywhere under the given key (recursively) // GetService return all Service records stored in etcd stored anywhere under the given key (recursively)
func (c etcdClient) GetServices(prefix string) ([]*Service, error) { func (c etcdClient) GetServices(prefix string) ([]*Service, error) {
var result []*Service ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout)
opts := &etcd.GetOptions{Recursive: true} defer cancel()
data, err := c.api.Get(context.Background(), prefix, opts)
path := prefix
r, err := c.client.Get(ctx, path, etcdcv3.WithPrefix())
if err != nil { if err != nil {
if etcd.IsKeyNotFound(err) {
return nil, nil
}
return nil, err return nil, err
} }
queue := list.New() var svcs []*Service
queue.PushFront(data.Node) bx := make(map[Service]bool)
for queueNode := queue.Front(); queueNode != nil; queueNode = queueNode.Next() { for _, n := range r.Kvs {
node := queueNode.Value.(*etcd.Node) svc := new(Service)
if node.Dir { if err := json.Unmarshal(n.Value, svc); err != nil {
for _, childNode := range node.Nodes { return nil, fmt.Errorf("%s: %s", n.Key, err.Error())
queue.PushBack(childNode) }
} b := Service{Host: svc.Host, Port: svc.Port, Priority: svc.Priority, Weight: svc.Weight, Text: svc.Text, Key: string(n.Key)}
if _, ok := bx[b]; ok {
// skip the service if already added to service list.
// the same service might be found in multiple etcd nodes.
continue continue
} }
service := &Service{} bx[b] = true
err = json.Unmarshal([]byte(node.Value), service)
if err != nil { svc.Key = string(n.Key)
log.Error("Cannot parse JSON value ", node.Value) if svc.Priority == 0 {
continue svc.Priority = priority
} }
service.Key = node.Key svcs = append(svcs, svc)
result = append(result, service)
} }
return result, nil
return svcs, nil
} }
// SaveService persists service data into etcd // SaveService persists service data into etcd
func (c etcdClient) SaveService(service *Service) error { func (c etcdClient) SaveService(service *Service) error {
ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout)
defer cancel()
value, err := json.Marshal(&service) value, err := json.Marshal(&service)
if err != nil { if err != nil {
return err return err
} }
_, err = c.api.Set(context.Background(), service.Key, string(value), nil) _, err = c.client.Put(ctx, service.Key, string(value))
if err != nil { if err != nil {
return err return err
} }
@ -137,9 +150,11 @@ func (c etcdClient) SaveService(service *Service) error {
// DeleteService deletes service record from etcd // DeleteService deletes service record from etcd
func (c etcdClient) DeleteService(key string) error { func (c etcdClient) DeleteService(key string) error {
_, err := c.api.Delete(context.Background(), key, nil) ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout)
return err defer cancel()
_, err := c.client.Delete(ctx, key)
return err
} }
// loads TLS artifacts and builds tls.Clonfig object // loads TLS artifacts and builds tls.Clonfig object
@ -186,21 +201,8 @@ func loadRoots(caPath string) (*x509.CertPool, error) {
return roots, nil return roots, nil
} }
// constructs http.Transport object for https protocol
func newHTTPSTransport(cc *tls.Config) *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: cc,
}
}
// builds etcd client config depending on connection scheme and TLS parameters // builds etcd client config depending on connection scheme and TLS parameters
func getETCDConfig() (*etcd.Config, error) { func getETCDConfig() (*etcdcv3.Config, error) {
etcdURLsStr := os.Getenv("ETCD_URLS") etcdURLsStr := os.Getenv("ETCD_URLS")
if etcdURLsStr == "" { if etcdURLsStr == "" {
etcdURLsStr = "http://localhost:2379" etcdURLsStr = "http://localhost:2379"
@ -208,7 +210,7 @@ func getETCDConfig() (*etcd.Config, error) {
etcdURLs := strings.Split(etcdURLsStr, ",") etcdURLs := strings.Split(etcdURLsStr, ",")
firstURL := strings.ToLower(etcdURLs[0]) firstURL := strings.ToLower(etcdURLs[0])
if strings.HasPrefix(firstURL, "http://") { if strings.HasPrefix(firstURL, "http://") {
return &etcd.Config{Endpoints: etcdURLs}, nil return &etcdcv3.Config{Endpoints: etcdURLs}, nil
} else if strings.HasPrefix(firstURL, "https://") { } else if strings.HasPrefix(firstURL, "https://") {
caFile := os.Getenv("ETCD_CA_FILE") caFile := os.Getenv("ETCD_CA_FILE")
certFile := os.Getenv("ETCD_CERT_FILE") certFile := os.Getenv("ETCD_CERT_FILE")
@ -220,9 +222,9 @@ func getETCDConfig() (*etcd.Config, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &etcd.Config{ return &etcdcv3.Config{
Endpoints: etcdURLs, Endpoints: etcdURLs,
Transport: newHTTPSTransport(tlsConfig), TLS: tlsConfig,
}, nil }, nil
} else { } else {
return nil, errors.New("etcd URLs must start with either http:// or https://") return nil, errors.New("etcd URLs must start with either http:// or https://")
@ -230,16 +232,16 @@ func getETCDConfig() (*etcd.Config, error) {
} }
//newETCDClient is an etcd client constructor //newETCDClient is an etcd client constructor
func newETCDClient() (skyDNSClient, error) { func newETCDClient() (coreDNSClient, error) {
cfg, err := getETCDConfig() cfg, err := getETCDConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }
c, err := etcd.New(*cfg) c, err := etcdcv3.New(*cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return etcdClient{etcd.NewKeysAPI(c)}, nil return etcdClient{c, context.Background()}, nil
} }
// NewCoreDNSProvider is a CoreDNS provider constructor // NewCoreDNSProvider is a CoreDNS provider constructor
@ -255,16 +257,16 @@ func NewCoreDNSProvider(domainFilter DomainFilter, dryRun bool) (Provider, error
}, nil }, nil
} }
// Records returns all DNS records found in SkyDNS/CoreDNS etcd backend. Depending on the record fields // Records returns all DNS records found in CoreDNS etcd backend. Depending on the record fields
// it may be mapped to one or two records of type A, CNAME, TXT, A+TXT, CNAME+TXT // it may be mapped to one or two records of type A, CNAME, TXT, A+TXT, CNAME+TXT
func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) { func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) {
var result []*endpoint.Endpoint var result []*endpoint.Endpoint
services, err := p.client.GetServices("/skydns") services, err := p.client.GetServices(coreDNSPrefix)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, service := range services { for _, service := range services {
domains := strings.Split(strings.TrimPrefix(service.Key, "/skydns/"), "/") domains := strings.Split(strings.TrimPrefix(service.Key, coreDNSPrefix), "/")
reverse(domains) reverse(domains)
dnsName := strings.Join(domains[service.TargetStrip:], ".") dnsName := strings.Join(domains[service.TargetStrip:], ".")
if !p.domainFilter.Match(dnsName) { if !p.domainFilter.Match(dnsName) {
@ -272,13 +274,14 @@ func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) {
} }
prefix := strings.Join(domains[:service.TargetStrip], ".") prefix := strings.Join(domains[:service.TargetStrip], ".")
if service.Host != "" { if service.Host != "" {
ep := endpoint.NewEndpoint( ep := endpoint.NewEndpointWithTTL(
dnsName, dnsName,
guessRecordType(service.Host), guessRecordType(service.Host),
endpoint.TTL(service.TTL),
service.Host, service.Host,
) )
ep.Labels["originalText"] = service.Text ep.Labels["originalText"] = service.Text
ep.Labels["prefix"] = prefix ep.Labels[randomPrefixLabel] = prefix
result = append(result, ep) result = append(result, ep)
} }
if service.Text != "" { if service.Text != "" {
@ -287,20 +290,21 @@ func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) {
endpoint.RecordTypeTXT, endpoint.RecordTypeTXT,
service.Text, service.Text,
) )
ep.Labels["prefix"] = prefix ep.Labels[randomPrefixLabel] = prefix
result = append(result, ep) result = append(result, ep)
} }
} }
return result, nil return result, nil
} }
// ApplyChanges stores changes back to etcd converting them to SkyDNS format and aggregating A/CNAME and TXT records // ApplyChanges stores changes back to etcd converting them to CoreDNS format and aggregating A/CNAME and TXT records
func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error { func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
grouped := map[string][]*endpoint.Endpoint{} grouped := map[string][]*endpoint.Endpoint{}
for _, ep := range changes.Create { for _, ep := range changes.Create {
grouped[ep.DNSName] = append(grouped[ep.DNSName], ep) grouped[ep.DNSName] = append(grouped[ep.DNSName], ep)
} }
for _, ep := range changes.UpdateNew { for i, ep := range changes.UpdateNew {
ep.Labels[randomPrefixLabel] = changes.UpdateOld[i].Labels[randomPrefixLabel]
grouped[ep.DNSName] = append(grouped[ep.DNSName], ep) grouped[ep.DNSName] = append(grouped[ep.DNSName], ep)
} }
for dnsName, group := range grouped { for dnsName, group := range grouped {
@ -313,7 +317,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
if ep.RecordType == endpoint.RecordTypeTXT { if ep.RecordType == endpoint.RecordTypeTXT {
continue continue
} }
prefix := ep.Labels["prefix"] prefix := ep.Labels[randomPrefixLabel]
if prefix == "" { if prefix == "" {
prefix = fmt.Sprintf("%08x", rand.Int31()) prefix = fmt.Sprintf("%08x", rand.Int31())
} }
@ -322,6 +326,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
Text: ep.Labels["originalText"], Text: ep.Labels["originalText"],
Key: etcdKeyFor(prefix + "." + dnsName), Key: etcdKeyFor(prefix + "." + dnsName),
TargetStrip: strings.Count(prefix, ".") + 1, TargetStrip: strings.Count(prefix, ".") + 1,
TTL: uint32(ep.RecordTTL),
} }
services = append(services, service) services = append(services, service)
} }
@ -331,13 +336,14 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
continue continue
} }
if index >= len(services) { if index >= len(services) {
prefix := ep.Labels["prefix"] prefix := ep.Labels[randomPrefixLabel]
if prefix == "" { if prefix == "" {
prefix = fmt.Sprintf("%08x", rand.Int31()) prefix = fmt.Sprintf("%08x", rand.Int31())
} }
services = append(services, Service{ services = append(services, Service{
Key: etcdKeyFor(prefix + "." + dnsName), Key: etcdKeyFor(prefix + "." + dnsName),
TargetStrip: strings.Count(prefix, ".") + 1, TargetStrip: strings.Count(prefix, ".") + 1,
TTL: uint32(ep.RecordTTL),
}) })
} }
services[index].Text = ep.Targets[0] services[index].Text = ep.Targets[0]
@ -349,7 +355,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
} }
for _, service := range services { for _, service := range services {
log.Infof("Add/set key %s to Host=%s, Text=%s", service.Key, service.Host, service.Text) log.Infof("Add/set key %s to Host=%s, Text=%s, TTL=%d", service.Key, service.Host, service.Text, service.TTL)
if !p.dryRun { if !p.dryRun {
err := p.client.SaveService(&service) err := p.client.SaveService(&service)
if err != nil { if err != nil {
@ -361,8 +367,8 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
for _, ep := range changes.Delete { for _, ep := range changes.Delete {
dnsName := ep.DNSName dnsName := ep.DNSName
if ep.Labels["prefix"] != "" { if ep.Labels[randomPrefixLabel] != "" {
dnsName = ep.Labels["prefix"] + "." + dnsName dnsName = ep.Labels[randomPrefixLabel] + "." + dnsName
} }
key := etcdKeyFor(dnsName) key := etcdKeyFor(dnsName)
log.Infof("Delete key %s", key) log.Infof("Delete key %s", key)
@ -387,7 +393,7 @@ func guessRecordType(target string) string {
func etcdKeyFor(dnsName string) string { func etcdKeyFor(dnsName string) string {
domains := strings.Split(dnsName, ".") domains := strings.Split(dnsName, ".")
reverse(domains) reverse(domains)
return "/skydns/" + strings.Join(domains, "/") return coreDNSPrefix + strings.Join(domains, "/")
} }
func reverse(slice []string) { func reverse(slice []string) {

View File

@ -235,8 +235,6 @@ func TestCoreDNSApplyChanges(t *testing.T) {
} }
validateServices(client.services, expectedServices1, t, 1) validateServices(client.services, expectedServices1, t, 1)
updatedEp := endpoint.NewEndpoint("domain1.local", endpoint.RecordTypeA, "6.6.6.6")
updatedEp.Labels["originalText"] = "string1"
changes2 := &plan.Changes{ changes2 := &plan.Changes{
Create: []*endpoint.Endpoint{ Create: []*endpoint.Endpoint{
endpoint.NewEndpoint("domain3.local", endpoint.RecordTypeA, "7.7.7.7"), endpoint.NewEndpoint("domain3.local", endpoint.RecordTypeA, "7.7.7.7"),
@ -245,6 +243,12 @@ func TestCoreDNSApplyChanges(t *testing.T) {
endpoint.NewEndpoint("domain1.local", "A", "6.6.6.6"), endpoint.NewEndpoint("domain1.local", "A", "6.6.6.6"),
}, },
} }
records, _ := coredns.Records()
for _, ep := range records {
if ep.DNSName == "domain1.local" {
changes2.UpdateOld = append(changes2.UpdateOld, ep)
}
}
applyServiceChanges(coredns, changes2) applyServiceChanges(coredns, changes2)
expectedServices2 := map[string]*Service{ expectedServices2 := map[string]*Service{