From c668865e65ff119ab3ba9191414aa5f33dde91fb Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Tue, 21 Aug 2018 22:39:38 +0530 Subject: [PATCH] Update CoreDNS provider to use etcd v3 client --- provider/coredns.go | 144 ++++++++++++++++++++------------------- provider/coredns_test.go | 8 ++- 2 files changed, 81 insertions(+), 71 deletions(-) diff --git a/provider/coredns.go b/provider/coredns.go index 7454e1691..38403ae61 100644 --- a/provider/coredns.go +++ b/provider/coredns.go @@ -17,7 +17,7 @@ limitations under the License. package provider import ( - "container/list" + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -26,14 +26,12 @@ import ( "io/ioutil" "math/rand" "net" - "net/http" "os" "strings" "time" - etcd "github.com/coreos/etcd/client" + etcdcv3 "github.com/coreos/etcd/clientv3" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" "github.com/kubernetes-incubator/external-dns/endpoint" "github.com/kubernetes-incubator/external-dns/plan" @@ -43,8 +41,17 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// skyDNSClient is an interface to work with SkyDNS service records in etcd -type skyDNSClient interface { +const ( + priority = 10 // default priority when nothing is set + etcdTimeout = 5 * time.Second + + coreDNSPrefix = "/skydns/" + + randomPrefixLabel = "prefix" +) + +// coreDNSClient is an interface to work with CoreDNS service records in etcd +type coreDNSClient interface { GetServices(prefix string) ([]*Service, error) SaveService(value *Service) error DeleteService(key string) error @@ -53,10 +60,10 @@ type skyDNSClient interface { type coreDNSProvider struct { dryRun bool domainFilter DomainFilter - client skyDNSClient + client coreDNSClient } -// Service represents SkyDNS/CoreDNS etcd record +// Service represents CoreDNS etcd record type Service struct { Host string `json:"host,omitempty"` Port int `json:"port,omitempty"` @@ -83,52 +90,58 @@ type Service struct { } type etcdClient struct { - api etcd.KeysAPI + client *etcdcv3.Client + ctx context.Context } -var _ skyDNSClient = etcdClient{} +var _ coreDNSClient = etcdClient{} // GetService return all Service records stored in etcd stored anywhere under the given key (recursively) func (c etcdClient) GetServices(prefix string) ([]*Service, error) { - var result []*Service - opts := &etcd.GetOptions{Recursive: true} - data, err := c.api.Get(context.Background(), prefix, opts) + ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout) + defer cancel() + + path := prefix + r, err := c.client.Get(ctx, path, etcdcv3.WithPrefix()) if err != nil { - if etcd.IsKeyNotFound(err) { - return nil, nil - } return nil, err } - queue := list.New() - queue.PushFront(data.Node) - for queueNode := queue.Front(); queueNode != nil; queueNode = queueNode.Next() { - node := queueNode.Value.(*etcd.Node) - if node.Dir { - for _, childNode := range node.Nodes { - queue.PushBack(childNode) - } + var svcs []*Service + bx := make(map[Service]bool) + for _, n := range r.Kvs { + svc := new(Service) + if err := json.Unmarshal(n.Value, svc); err != nil { + return nil, fmt.Errorf("%s: %s", n.Key, err.Error()) + } + b := Service{Host: svc.Host, Port: svc.Port, Priority: svc.Priority, Weight: svc.Weight, Text: svc.Text, Key: string(n.Key)} + if _, ok := bx[b]; ok { + // skip the service if already added to service list. + // the same service might be found in multiple etcd nodes. continue } - service := &Service{} - err = json.Unmarshal([]byte(node.Value), service) - if err != nil { - log.Error("Cannot parse JSON value ", node.Value) - continue + bx[b] = true + + svc.Key = string(n.Key) + if svc.Priority == 0 { + svc.Priority = priority } - service.Key = node.Key - result = append(result, service) + svcs = append(svcs, svc) } - return result, nil + + return svcs, nil } // SaveService persists service data into etcd func (c etcdClient) SaveService(service *Service) error { + ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout) + defer cancel() + value, err := json.Marshal(&service) if err != nil { return err } - _, err = c.api.Set(context.Background(), service.Key, string(value), nil) + _, err = c.client.Put(ctx, service.Key, string(value)) if err != nil { return err } @@ -137,9 +150,11 @@ func (c etcdClient) SaveService(service *Service) error { // DeleteService deletes service record from etcd func (c etcdClient) DeleteService(key string) error { - _, err := c.api.Delete(context.Background(), key, nil) - return err + ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout) + defer cancel() + _, err := c.client.Delete(ctx, key) + return err } // loads TLS artifacts and builds tls.Clonfig object @@ -186,21 +201,8 @@ func loadRoots(caPath string) (*x509.CertPool, error) { return roots, nil } -// constructs http.Transport object for https protocol -func newHTTPSTransport(cc *tls.Config) *http.Transport { - return &http.Transport{ - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - TLSClientConfig: cc, - } -} - // builds etcd client config depending on connection scheme and TLS parameters -func getETCDConfig() (*etcd.Config, error) { +func getETCDConfig() (*etcdcv3.Config, error) { etcdURLsStr := os.Getenv("ETCD_URLS") if etcdURLsStr == "" { etcdURLsStr = "http://localhost:2379" @@ -208,7 +210,7 @@ func getETCDConfig() (*etcd.Config, error) { etcdURLs := strings.Split(etcdURLsStr, ",") firstURL := strings.ToLower(etcdURLs[0]) if strings.HasPrefix(firstURL, "http://") { - return &etcd.Config{Endpoints: etcdURLs}, nil + return &etcdcv3.Config{Endpoints: etcdURLs}, nil } else if strings.HasPrefix(firstURL, "https://") { caFile := os.Getenv("ETCD_CA_FILE") certFile := os.Getenv("ETCD_CERT_FILE") @@ -220,9 +222,9 @@ func getETCDConfig() (*etcd.Config, error) { if err != nil { return nil, err } - return &etcd.Config{ + return &etcdcv3.Config{ Endpoints: etcdURLs, - Transport: newHTTPSTransport(tlsConfig), + TLS: tlsConfig, }, nil } else { return nil, errors.New("etcd URLs must start with either http:// or https://") @@ -230,16 +232,16 @@ func getETCDConfig() (*etcd.Config, error) { } //newETCDClient is an etcd client constructor -func newETCDClient() (skyDNSClient, error) { +func newETCDClient() (coreDNSClient, error) { cfg, err := getETCDConfig() if err != nil { return nil, err } - c, err := etcd.New(*cfg) + c, err := etcdcv3.New(*cfg) if err != nil { return nil, err } - return etcdClient{etcd.NewKeysAPI(c)}, nil + return etcdClient{c, context.Background()}, nil } // NewCoreDNSProvider is a CoreDNS provider constructor @@ -255,16 +257,16 @@ func NewCoreDNSProvider(domainFilter DomainFilter, dryRun bool) (Provider, error }, nil } -// Records returns all DNS records found in SkyDNS/CoreDNS etcd backend. Depending on the record fields +// Records returns all DNS records found in CoreDNS etcd backend. Depending on the record fields // it may be mapped to one or two records of type A, CNAME, TXT, A+TXT, CNAME+TXT func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) { var result []*endpoint.Endpoint - services, err := p.client.GetServices("/skydns") + services, err := p.client.GetServices(coreDNSPrefix) if err != nil { return nil, err } for _, service := range services { - domains := strings.Split(strings.TrimPrefix(service.Key, "/skydns/"), "/") + domains := strings.Split(strings.TrimPrefix(service.Key, coreDNSPrefix), "/") reverse(domains) dnsName := strings.Join(domains[service.TargetStrip:], ".") if !p.domainFilter.Match(dnsName) { @@ -272,13 +274,14 @@ func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) { } prefix := strings.Join(domains[:service.TargetStrip], ".") if service.Host != "" { - ep := endpoint.NewEndpoint( + ep := endpoint.NewEndpointWithTTL( dnsName, guessRecordType(service.Host), + endpoint.TTL(service.TTL), service.Host, ) ep.Labels["originalText"] = service.Text - ep.Labels["prefix"] = prefix + ep.Labels[randomPrefixLabel] = prefix result = append(result, ep) } if service.Text != "" { @@ -287,20 +290,21 @@ func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) { endpoint.RecordTypeTXT, service.Text, ) - ep.Labels["prefix"] = prefix + ep.Labels[randomPrefixLabel] = prefix result = append(result, ep) } } return result, nil } -// ApplyChanges stores changes back to etcd converting them to SkyDNS format and aggregating A/CNAME and TXT records +// ApplyChanges stores changes back to etcd converting them to CoreDNS format and aggregating A/CNAME and TXT records func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error { grouped := map[string][]*endpoint.Endpoint{} for _, ep := range changes.Create { grouped[ep.DNSName] = append(grouped[ep.DNSName], ep) } - for _, ep := range changes.UpdateNew { + for i, ep := range changes.UpdateNew { + ep.Labels[randomPrefixLabel] = changes.UpdateOld[i].Labels[randomPrefixLabel] grouped[ep.DNSName] = append(grouped[ep.DNSName], ep) } for dnsName, group := range grouped { @@ -313,7 +317,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error { if ep.RecordType == endpoint.RecordTypeTXT { continue } - prefix := ep.Labels["prefix"] + prefix := ep.Labels[randomPrefixLabel] if prefix == "" { prefix = fmt.Sprintf("%08x", rand.Int31()) } @@ -322,6 +326,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error { Text: ep.Labels["originalText"], Key: etcdKeyFor(prefix + "." + dnsName), TargetStrip: strings.Count(prefix, ".") + 1, + TTL: uint32(ep.RecordTTL), } services = append(services, service) } @@ -331,13 +336,14 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error { continue } if index >= len(services) { - prefix := ep.Labels["prefix"] + prefix := ep.Labels[randomPrefixLabel] if prefix == "" { prefix = fmt.Sprintf("%08x", rand.Int31()) } services = append(services, Service{ Key: etcdKeyFor(prefix + "." + dnsName), TargetStrip: strings.Count(prefix, ".") + 1, + TTL: uint32(ep.RecordTTL), }) } services[index].Text = ep.Targets[0] @@ -349,7 +355,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error { } for _, service := range services { - log.Infof("Add/set key %s to Host=%s, Text=%s", service.Key, service.Host, service.Text) + log.Infof("Add/set key %s to Host=%s, Text=%s, TTL=%d", service.Key, service.Host, service.Text, service.TTL) if !p.dryRun { err := p.client.SaveService(&service) if err != nil { @@ -361,8 +367,8 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error { for _, ep := range changes.Delete { dnsName := ep.DNSName - if ep.Labels["prefix"] != "" { - dnsName = ep.Labels["prefix"] + "." + dnsName + if ep.Labels[randomPrefixLabel] != "" { + dnsName = ep.Labels[randomPrefixLabel] + "." + dnsName } key := etcdKeyFor(dnsName) log.Infof("Delete key %s", key) @@ -387,7 +393,7 @@ func guessRecordType(target string) string { func etcdKeyFor(dnsName string) string { domains := strings.Split(dnsName, ".") reverse(domains) - return "/skydns/" + strings.Join(domains, "/") + return coreDNSPrefix + strings.Join(domains, "/") } func reverse(slice []string) { diff --git a/provider/coredns_test.go b/provider/coredns_test.go index 9c4b90ce1..147711743 100644 --- a/provider/coredns_test.go +++ b/provider/coredns_test.go @@ -235,8 +235,6 @@ func TestCoreDNSApplyChanges(t *testing.T) { } validateServices(client.services, expectedServices1, t, 1) - updatedEp := endpoint.NewEndpoint("domain1.local", endpoint.RecordTypeA, "6.6.6.6") - updatedEp.Labels["originalText"] = "string1" changes2 := &plan.Changes{ Create: []*endpoint.Endpoint{ endpoint.NewEndpoint("domain3.local", endpoint.RecordTypeA, "7.7.7.7"), @@ -245,6 +243,12 @@ func TestCoreDNSApplyChanges(t *testing.T) { endpoint.NewEndpoint("domain1.local", "A", "6.6.6.6"), }, } + records, _ := coredns.Records() + for _, ep := range records { + if ep.DNSName == "domain1.local" { + changes2.UpdateOld = append(changes2.UpdateOld, ep) + } + } applyServiceChanges(coredns, changes2) expectedServices2 := map[string]*Service{