added support for using nodes as source

This commit is contained in:
Reinier Schoof 2019-10-01 09:27:06 +02:00
parent 8db7e77d78
commit a491d8f6a2
4 changed files with 347 additions and 1 deletions

View File

@ -270,7 +270,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("contour-load-balancer", "The fully-qualified name of the Contour load balancer service. (default: heptio-contour/contour)").Default("heptio-contour/contour").StringVar(&cfg.ContourLoadBalancerService)
// Flags related to processing sources
app.Flag("source", "The resource types that are queried for endpoints; specify multiple times for multiple sources (required, options: service, ingress, fake, connector, istio-gateway, cloudfoundry, contour-ingressroute, crd, empty").Required().PlaceHolder("source").EnumsVar(&cfg.Sources, "service", "ingress", "istio-gateway", "cloudfoundry", "contour-ingressroute", "fake", "connector", "crd", "empty")
app.Flag("source", "The resource types that are queried for endpoints; specify multiple times for multiple sources (required, options: service, ingress, node, fake, connector, istio-gateway, cloudfoundry, contour-ingressroute, crd, empty").Required().PlaceHolder("source").EnumsVar(&cfg.Sources, "service", "ingress", "node", "istio-gateway", "cloudfoundry", "contour-ingressroute", "fake", "connector", "crd", "empty")
app.Flag("namespace", "Limit sources of endpoints to a specific namespace (default: all namespaces)").Default(defaultConfig.Namespace).StringVar(&cfg.Namespace)
app.Flag("annotation-filter", "Filter sources managed by external-dns via annotation using label selector semantics (default: all sources)").Default(defaultConfig.AnnotationFilter).StringVar(&cfg.AnnotationFilter)
app.Flag("fqdn-template", "A templated string that's used to generate DNS names from sources that don't define a hostname themselves, or to add a hostname suffix when paired with the fake source (optional). Accepts comma separated list for multiple global FQDN.").Default(defaultConfig.FQDNTemplate).StringVar(&cfg.FQDNTemplate)

151
source/node.go Normal file
View File

@ -0,0 +1,151 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package source
import (
"bytes"
"fmt"
"strings"
"text/template"
"time"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/kubernetes-incubator/external-dns/endpoint"
)
type nodeSource struct {
client kubernetes.Interface
fqdnTemplate *template.Template
nodeInformer coreinformers.NodeInformer
}
func NewNodeSource(kubeClient kubernetes.Interface, fqdnTemplate string) (Source, error) {
var (
tmpl *template.Template
err error
)
if fqdnTemplate != "" {
tmpl, err = template.New("endpoint").Funcs(template.FuncMap{
"trimPrefix": strings.TrimPrefix,
}).Parse(fqdnTemplate)
if err != nil {
return nil, err
}
}
// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0)
nodeInformer := informerFactory.Core().V1().Nodes()
// Add default resource event handler to properly initialize informer.
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("node added")
},
},
)
// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated.
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
return nodeInformer.Informer().HasSynced() == true, nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
}
return &nodeSource{
client: kubeClient,
fqdnTemplate: tmpl,
nodeInformer: nodeInformer,
}, nil
}
// Endpoints returns endpoint objects for each service that should be processed.
func (ns *nodeSource) Endpoints() ([]*endpoint.Endpoint, error) {
nodes, err := ns.nodeInformer.Lister().List(labels.Everything())
if err != nil {
return nil, err
}
endpoints := []*endpoint.Endpoint{}
// create endpoints for all nodes
for _, node := range nodes {
log.Debugf("creating endpoint for node %s", node.Name)
// create new endpoint with the information we already have
ep := &endpoint.Endpoint{
RecordType: "A", // hardcoded DNS record type
}
if ns.fqdnTemplate != nil {
// Process the whole template string
var buf bytes.Buffer
err := ns.fqdnTemplate.Execute(&buf, node)
if err != nil {
return nil, fmt.Errorf("failed to apply template on node %s: %v", node.Name, err)
}
ep.DNSName = buf.String()
log.Debugf("applied template for %s, converting to %s", node.Name, ep.DNSName)
} else {
ep.DNSName = node.Name
log.Debugf("not applying template for %s", node.Name)
}
addr, err := ns.nodeAddress(node)
if err != nil {
log.Error(err)
continue
}
ep.Targets = endpoint.Targets([]string{addr})
log.Debugf("adding endpoint %s", ep)
endpoints = append(endpoints, ep)
}
return endpoints, nil
}
// nodeAddress returns node's externalIP and if that's not found, node's internalIP
// basically what k8s.io/kubernetes/pkg/util/node.GetPreferredNodeAddress does
func (ns *nodeSource) nodeAddress(node *v1.Node) (string, error) {
for _, t := range []v1.NodeAddressType{v1.NodeExternalIP, v1.NodeInternalIP} {
for _, addr := range node.Status.Addresses {
if addr.Type == t {
return addr.Address, nil
}
}
}
return "", fmt.Errorf("could not find node address for %s", node.Name)
}

189
source/node_test.go Normal file
View File

@ -0,0 +1,189 @@
package source
import (
"testing"
"github.com/kubernetes-incubator/external-dns/endpoint"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)
func TestNodeSource(t *testing.T) {
//suite.Run(t, new(ServiceSuite))
//t.Run("Interface", testServiceSourceImplementsSource)
//t.Run("NewNodeSource", testNodeSourceNewNodeSource)
t.Run("Endpoints", testNodeSourceEndpoints)
}
// testNodeSourceNewNodeSource tests that NewNodeService doesn't return an error.
func testNodeSourceNewNodeSource(t *testing.T) {
for _, ti := range []struct {
title string
fqdnTemplate string
expectError bool
}{
{
title: "invalid template",
expectError: true,
fqdnTemplate: "{{.Name",
},
{
title: "valid empty template",
expectError: false,
},
{
title: "valid template",
expectError: false,
fqdnTemplate: "{{.Name}}-{{.Namespace}}.ext-dns.test.com",
},
} {
t.Run(ti.title, func(t *testing.T) {
_, err := NewNodeSource(
fake.NewSimpleClientset(),
ti.fqdnTemplate,
)
if ti.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
// testNodeSourceEndpoints tests that various node generate the correct endpoints.
func testNodeSourceEndpoints(t *testing.T) {
for _, tc := range []struct {
title string
fqdnTemplate string
nodeName string
nodeAddresses []v1.NodeAddress
labels map[string]string
annotations map[string]string
expected []*endpoint.Endpoint
expectError bool
}{
{
"node with short hostname returns one endpoint",
"",
"node1",
[]v1.NodeAddress{{v1.NodeExternalIP, "1.2.3.4"}},
map[string]string{},
map[string]string{},
[]*endpoint.Endpoint{
{RecordType: "A", DNSName: "node1", Targets: endpoint.Targets{"1.2.3.4"}},
},
false,
},
{
"node with fqdn returns one endpoint",
"",
"node1.example.org",
[]v1.NodeAddress{{v1.NodeExternalIP, "1.2.3.4"}},
map[string]string{},
map[string]string{},
[]*endpoint.Endpoint{
{RecordType: "A", DNSName: "node1.example.org", Targets: endpoint.Targets{"1.2.3.4"}},
},
false,
},
{
"node with fqdn template returns endpoint with expanded hostname",
"{{.Name}}.example.org",
"node1",
[]v1.NodeAddress{{v1.NodeExternalIP, "1.2.3.4"}},
map[string]string{},
map[string]string{},
[]*endpoint.Endpoint{
{RecordType: "A", DNSName: "node1.example.org", Targets: endpoint.Targets{"1.2.3.4"}},
},
false,
},
{
"node with fqdn and fqdn template returns one endpoint",
"{{.Name}}.example.org",
"node1.example.org",
[]v1.NodeAddress{{v1.NodeExternalIP, "1.2.3.4"}},
map[string]string{},
map[string]string{},
[]*endpoint.Endpoint{
{RecordType: "A", DNSName: "node1.example.org.example.org", Targets: endpoint.Targets{"1.2.3.4"}},
},
false,
},
{
"node with both external and internal IP returns an endpoint with external IP",
"",
"node1",
[]v1.NodeAddress{{v1.NodeExternalIP, "1.2.3.4"}, {v1.NodeInternalIP, "2.3.4.5"}},
map[string]string{},
map[string]string{},
[]*endpoint.Endpoint{
{RecordType: "A", DNSName: "node1", Targets: endpoint.Targets{"1.2.3.4"}},
},
false,
},
{
"node with only internal IP returns an endpoint with internal IP",
"",
"node1",
[]v1.NodeAddress{{v1.NodeInternalIP, "2.3.4.5"}},
map[string]string{},
map[string]string{},
[]*endpoint.Endpoint{
{RecordType: "A", DNSName: "node1", Targets: endpoint.Targets{"2.3.4.5"}},
},
false,
},
{
"node with neither external nor internal IP returns no endpoints",
"",
"node1",
[]v1.NodeAddress{},
map[string]string{},
map[string]string{},
[]*endpoint.Endpoint{},
false,
},
} {
t.Run(tc.title, func(t *testing.T) {
// Create a Kubernetes testing client
kubernetes := fake.NewSimpleClientset()
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: tc.nodeName,
Labels: tc.labels,
Annotations: tc.annotations,
},
Status: v1.NodeStatus{
Addresses: tc.nodeAddresses,
},
}
_, err := kubernetes.CoreV1().Nodes().Create(node)
require.NoError(t, err)
// Create our object under test and get the endpoints.
client, err := NewNodeSource(
kubernetes,
tc.fqdnTemplate,
)
require.NoError(t, err)
endpoints, err := client.Endpoints()
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
// Validate returned endpoints against desired endpoints.
validateEndpoints(t, endpoints, tc.expected)
})
}
}

View File

@ -153,6 +153,12 @@ func ByNames(p ClientGenerator, names []string, cfg *Config) ([]Source, error) {
// BuildWithConfig allows to generate a Source implementation from the shared config
func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, error) {
switch source {
case "node":
client, err := p.KubeClient()
if err != nil {
return nil, err
}
return NewNodeSource(client, cfg.FQDNTemplate)
case "service":
client, err := p.KubeClient()
if err != nil {