refactor source registration (#217)

* ref(sources): refactor source registration and lookup to be lazy.

* fix(fake): don't make changes to passed in config values

* rework without init, tests are missing

* make client provider public

* fix all tests

* change parameter list order, minor improvements

* clientprovider -> clientgenerator, switch naming for interface/struct
This commit is contained in:
Martin Linkhorst 2017-06-30 16:54:58 +02:00 committed by GitHub
parent 73d397961e
commit 81974cd8a0
11 changed files with 245 additions and 210 deletions

79
main.go
View File

@ -20,17 +20,13 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
log "github.com/Sirupsen/logrus"
"github.com/linki/instrumented_http"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/clientcmd"
"github.com/kubernetes-incubator/external-dns/controller"
"github.com/kubernetes-incubator/external-dns/pkg/apis/externaldns"
@ -67,45 +63,23 @@ func main() {
go serveMetrics(cfg.MetricsAddress)
go handleSigterm(stopChan)
var client *kubernetes.Clientset
// create only those services we explicitly ask for in cfg.Sources
for _, sourceType := range cfg.Sources {
// we only need a k8s client if we're creating a non-fake source, and
// have not already instantiated a k8s client
if sourceType != "fake" && client == nil {
var err error
client, err = newClient(cfg)
if err != nil {
log.Fatal(err)
}
}
var src source.Source
var err error
switch sourceType {
case "fake":
src, err = source.NewFakeSource(cfg.FqdnTemplate)
case "service":
src, err = source.NewServiceSource(client, cfg.Namespace, cfg.FqdnTemplate, cfg.Compatibility)
case "ingress":
src, err = source.NewIngressSource(client, cfg.Namespace, cfg.FqdnTemplate)
default:
log.Fatalf("Don't know how to handle sourceType '%s'", sourceType)
}
if err != nil {
log.Fatal(err)
}
source.Register(sourceType, src)
// Create a source.Config from the flags passed by the user.
sourceCfg := &source.Config{
Namespace: cfg.Namespace,
FQDNTemplate: cfg.FQDNTemplate,
Compatibility: cfg.Compatibility,
}
sources, err := source.LookupMultiple(cfg.Sources)
// Lookup all the selected sources by names and pass them the desired configuration.
sources, err := source.ByNames(&source.SingletonClientGenerator{
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
}, cfg.Sources, sourceCfg)
if err != nil {
log.Fatal(err)
}
// Combine multiple sources into a single, deduplicated source.
endpointsSource := source.NewDedupSource(source.NewMultiSource(sources))
domainFilter := provider.NewDomainFilter(cfg.DomainFilter)
@ -181,37 +155,6 @@ func handleSigterm(stopChan chan struct{}) {
close(stopChan)
}
func newClient(cfg *externaldns.Config) (*kubernetes.Clientset, error) {
if cfg.KubeConfig == "" {
if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
cfg.KubeConfig = clientcmd.RecommendedHomeFile
}
}
config, err := clientcmd.BuildConfigFromFlags(cfg.Master, cfg.KubeConfig)
if err != nil {
return nil, err
}
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return instrumented_http.NewTransport(rt, &instrumented_http.Callbacks{
PathProcessor: func(path string) string {
parts := strings.Split(path, "/")
return parts[len(parts)-1]
},
})
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
log.Infof("Connected to cluster at %s", config.Host)
return client, nil
}
func serveMetrics(address string) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)

View File

@ -32,7 +32,7 @@ type Config struct {
KubeConfig string
Sources []string
Namespace string
FqdnTemplate string
FQDNTemplate string
Compatibility string
Provider string
GoogleProject string
@ -56,7 +56,7 @@ var defaultConfig = &Config{
KubeConfig: "",
Sources: nil,
Namespace: "",
FqdnTemplate: "",
FQDNTemplate: "",
Compatibility: "",
Provider: "",
GoogleProject: "",
@ -93,7 +93,7 @@ func (cfg *Config) ParseFlags(args []string) error {
// Flags related to processing sources
app.Flag("source", "The resource types that are queried for endpoints; specify multiple times for multiple sources (required, options: service, ingress, fake)").Required().PlaceHolder("source").EnumsVar(&cfg.Sources, "service", "ingress", "fake")
app.Flag("namespace", "Limit sources of endpoints to a specific namespace (default: all namespaces)").Default(defaultConfig.Namespace).StringVar(&cfg.Namespace)
app.Flag("fqdn-template", "A templated string that's used to generate DNS names from sources that don't define a hostname themselves, or to add a hostname suffix when paired with the fake source (optional)").Default(defaultConfig.FqdnTemplate).StringVar(&cfg.FqdnTemplate)
app.Flag("fqdn-template", "A templated string that's used to generate DNS names from sources that don't define a hostname themselves, or to add a hostname suffix when paired with the fake source (optional)").Default(defaultConfig.FQDNTemplate).StringVar(&cfg.FQDNTemplate)
app.Flag("compatibility", "Process annotation semantics from legacy implementations (optional, options: mate, molecule)").Default(defaultConfig.Compatibility).EnumVar(&cfg.Compatibility, "", "mate", "molecule")
// Flags related to providers

View File

@ -31,7 +31,7 @@ var (
KubeConfig: "",
Sources: []string{"service"},
Namespace: "",
FqdnTemplate: "",
FQDNTemplate: "",
Compatibility: "",
Provider: "google",
GoogleProject: "",
@ -55,7 +55,7 @@ var (
KubeConfig: "/some/path",
Sources: []string{"service", "ingress"},
Namespace: "namespace",
FqdnTemplate: "{{.Name}}.service.example.com",
FQDNTemplate: "{{.Name}}.service.example.com",
Compatibility: "mate",
Provider: "google",
GoogleProject: "project",

View File

@ -29,34 +29,32 @@ import (
"github.com/kubernetes-incubator/external-dns/endpoint"
)
// fakeSource is an implementation of Source for that provides dummy endpoints
// for testing/dry-running of dns providers without needing an attached
// kubernetes cluster.
// fakeSource is an implementation of Source that provides dummy endpoints for
// testing/dry-running of dns providers without needing an attached Kubernetes cluster.
type fakeSource struct {
dnsName string
}
const (
defaultDNSName = "example.com"
defaultFQDNTemplate = "example.com"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// NewFakeSource creates a new fakeSource with the given client and namespace scope.
func NewFakeSource(dnsName string) (Source, error) {
if dnsName == "" {
dnsName = defaultDNSName
// NewFakeSource creates a new fakeSource with the given config.
func NewFakeSource(fqdnTemplate string) (Source, error) {
if fqdnTemplate == "" {
fqdnTemplate = defaultFQDNTemplate
}
return &fakeSource{
dnsName: dnsName,
dnsName: fqdnTemplate,
}, nil
}
// Endpoints returns endpoint objects
// Endpoints returns endpoint objects.
func (sc *fakeSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := make([]*endpoint.Endpoint, 10)

View File

@ -37,26 +37,28 @@ import (
type ingressSource struct {
client kubernetes.Interface
namespace string
fqdntemplate *template.Template
fqdnTemplate *template.Template
}
// NewIngressSource creates a new ingressSource with the given client and namespace scope.
func NewIngressSource(client kubernetes.Interface, namespace string, fqdntemplate string) (Source, error) {
var tmpl *template.Template
var err error
if fqdntemplate != "" {
// NewIngressSource creates a new ingressSource with the given config.
func NewIngressSource(kubeClient kubernetes.Interface, namespace, fqdnTemplate string) (Source, error) {
var (
tmpl *template.Template
err error
)
if fqdnTemplate != "" {
tmpl, err = template.New("endpoint").Funcs(template.FuncMap{
"trimPrefix": strings.TrimPrefix,
}).Parse(fqdntemplate)
}).Parse(fqdnTemplate)
if err != nil {
return nil, err
}
}
return &ingressSource{
client: client,
client: kubeClient,
namespace: namespace,
fqdntemplate: tmpl,
fqdnTemplate: tmpl,
}, nil
}
@ -82,7 +84,7 @@ func (sc *ingressSource) Endpoints() ([]*endpoint.Endpoint, error) {
ingEndpoints := endpointsFromIngress(&ing)
// apply template if host is missing on ingress
if len(ingEndpoints) == 0 && sc.fqdntemplate != nil {
if len(ingEndpoints) == 0 && sc.fqdnTemplate != nil {
ingEndpoints, err = sc.endpointsFromTemplate(&ing)
if err != nil {
return nil, err
@ -105,7 +107,7 @@ func (sc *ingressSource) endpointsFromTemplate(ing *v1beta1.Ingress) ([]*endpoin
var endpoints []*endpoint.Endpoint
var buf bytes.Buffer
err := sc.fqdntemplate.Execute(&buf, ing)
err := sc.fqdnTemplate.Execute(&buf, ing)
if err != nil {
return nil, fmt.Errorf("failed to apply template on ingress %s: %v", ing.String(), err)
}

View File

@ -41,13 +41,13 @@ func TestIngress(t *testing.T) {
func TestNewIngressSource(t *testing.T) {
for _, ti := range []struct {
title string
fqdntemplate string
fqdnTemplate string
expectError bool
}{
{
title: "invalid template",
expectError: true,
fqdntemplate: "{{.Name",
fqdnTemplate: "{{.Name",
},
{
title: "valid empty template",
@ -56,11 +56,15 @@ func TestNewIngressSource(t *testing.T) {
{
title: "valid template",
expectError: false,
fqdntemplate: "{{.Name}}-{{.Namespace}}.ext-dns.test.com",
fqdnTemplate: "{{.Name}}-{{.Namespace}}.ext-dns.test.com",
},
} {
t.Run(ti.title, func(t *testing.T) {
_, err := NewIngressSource(fake.NewSimpleClientset(), "", ti.fqdntemplate)
_, err := NewIngressSource(
fake.NewSimpleClientset(),
"",
ti.fqdnTemplate,
)
if ti.expectError {
assert.Error(t, err)
} else {
@ -167,7 +171,7 @@ func testIngressEndpoints(t *testing.T) {
targetNamespace string
ingressItems []fakeIngress
expected []*endpoint.Endpoint
fqdntemplate string
fqdnTemplate string
}{
{
title: "no ingress",
@ -315,7 +319,7 @@ func testIngressEndpoints(t *testing.T) {
Target: "elb.com",
},
},
fqdntemplate: "{{.Name}}.ext-dns.test.com",
fqdnTemplate: "{{.Name}}.ext-dns.test.com",
},
{
title: "another controller annotation skipped even with template",
@ -332,7 +336,7 @@ func testIngressEndpoints(t *testing.T) {
},
},
expected: []*endpoint.Endpoint{},
fqdntemplate: "{{.Name}}.ext-dns.test.com",
fqdnTemplate: "{{.Name}}.ext-dns.test.com",
},
} {
t.Run(ti.title, func(t *testing.T) {
@ -342,7 +346,11 @@ func testIngressEndpoints(t *testing.T) {
}
fakeClient := fake.NewSimpleClientset()
ingressSource, _ := NewIngressSource(fakeClient, ti.targetNamespace, ti.fqdntemplate)
ingressSource, _ := NewIngressSource(
fakeClient,
ti.targetNamespace,
ti.fqdnTemplate,
)
for _, ingress := range ingresses {
_, err := fakeClient.Extensions().Ingresses(ingress.Namespace).Create(ingress)
require.NoError(t, err)

View File

@ -41,27 +41,29 @@ type serviceSource struct {
namespace string
// process Services with legacy annotations
compatibility string
fqdntemplate *template.Template
fqdnTemplate *template.Template
}
// NewServiceSource creates a new serviceSource with the given client and namespace scope.
func NewServiceSource(client kubernetes.Interface, namespace, fqdntemplate string, compatibility string) (Source, error) {
var tmpl *template.Template
var err error
if fqdntemplate != "" {
// NewServiceSource creates a new serviceSource with the given config.
func NewServiceSource(kubeClient kubernetes.Interface, namespace, fqdnTemplate, compatibility string) (Source, error) {
var (
tmpl *template.Template
err error
)
if fqdnTemplate != "" {
tmpl, err = template.New("endpoint").Funcs(template.FuncMap{
"trimPrefix": strings.TrimPrefix,
}).Parse(fqdntemplate)
}).Parse(fqdnTemplate)
if err != nil {
return nil, err
}
}
return &serviceSource{
client: client,
client: kubeClient,
namespace: namespace,
compatibility: compatibility,
fqdntemplate: tmpl,
fqdnTemplate: tmpl,
}, nil
}
@ -91,7 +93,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
}
// apply template if none of the above is found
if len(svcEndpoints) == 0 && sc.fqdntemplate != nil {
if len(svcEndpoints) == 0 && sc.fqdnTemplate != nil {
svcEndpoints, err = sc.endpointsFromTemplate(&svc)
if err != nil {
return nil, err
@ -114,7 +116,7 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.End
var endpoints []*endpoint.Endpoint
var buf bytes.Buffer
err := sc.fqdntemplate.Execute(&buf, svc)
err := sc.fqdnTemplate.Execute(&buf, svc)
if err != nil {
return nil, fmt.Errorf("failed to apply template on service %s: %v", svc.String(), err)
}

View File

@ -45,13 +45,13 @@ func testServiceSourceImplementsSource(t *testing.T) {
func testServiceSourceNewServiceSource(t *testing.T) {
for _, ti := range []struct {
title string
fqdntemplate string
fqdnTemplate string
expectError bool
}{
{
title: "invalid template",
expectError: true,
fqdntemplate: "{{.Name",
fqdnTemplate: "{{.Name",
},
{
title: "valid empty template",
@ -60,11 +60,16 @@ func testServiceSourceNewServiceSource(t *testing.T) {
{
title: "valid template",
expectError: false,
fqdntemplate: "{{.Name}}-{{.Namespace}}.ext-dns.test.com",
fqdnTemplate: "{{.Name}}-{{.Namespace}}.ext-dns.test.com",
},
} {
t.Run(ti.title, func(t *testing.T) {
_, err := NewServiceSource(fake.NewSimpleClientset(), "", ti.fqdntemplate, "")
_, err := NewServiceSource(
fake.NewSimpleClientset(),
"",
ti.fqdnTemplate,
"",
)
if ti.expectError {
assert.Error(t, err)
@ -83,7 +88,7 @@ func testServiceSourceEndpoints(t *testing.T) {
svcNamespace string
svcName string
compatibility string
fqdntemplate string
fqdnTemplate string
labels map[string]string
annotations map[string]string
lbs []string
@ -323,7 +328,7 @@ func testServiceSourceEndpoints(t *testing.T) {
false,
},
{
"not annotated services with set fqdntemplate return an endpoint with target IP",
"not annotated services with set fqdnTemplate return an endpoint with target IP",
"",
"testing",
"foo",
@ -401,7 +406,12 @@ func testServiceSourceEndpoints(t *testing.T) {
require.NoError(t, err)
// Create our object under test and get the endpoints.
client, err := NewServiceSource(kubernetes, tc.targetNamespace, tc.fqdntemplate, tc.compatibility)
client, _ := NewServiceSource(
kubernetes,
tc.targetNamespace,
tc.fqdnTemplate,
tc.compatibility,
)
require.NoError(t, err)
endpoints, err := client.Endpoints()

View File

@ -16,9 +16,7 @@ limitations under the License.
package source
import (
"github.com/kubernetes-incubator/external-dns/endpoint"
)
import "github.com/kubernetes-incubator/external-dns/endpoint"
const (
// The annotation used for figuring out which controller is responsible

View File

@ -16,31 +16,118 @@ limitations under the License.
package source
import "fmt"
import (
"errors"
"net/http"
"os"
"strings"
var store = map[string]Source{}
"sync"
// Register registers a Source under a given name.
func Register(name string, source Source) {
store[name] = source
log "github.com/Sirupsen/logrus"
"github.com/linki/instrumented_http"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
// ErrSourceNotFound is returned when a requested source doesn't exist.
var ErrSourceNotFound = errors.New("source not found")
// Config holds shared configuration options for all Sources.
type Config struct {
Namespace string
FQDNTemplate string
Compatibility string
}
// Lookup returns a Source by the given name.
func Lookup(name string) Source {
return store[name]
// ClientGenerator provides clients
type ClientGenerator interface {
KubeClient() (kubernetes.Interface, error)
}
// LookupMultiple returns multiple Sources given multiple names.
func LookupMultiple(names []string) ([]Source, error) {
// SingletonClientGenerator stores provider clients and guarantees that only one instance of client
// will be generated
type SingletonClientGenerator struct {
KubeConfig string
KubeMaster string
client kubernetes.Interface
sync.Once
}
// KubeClient generates a kube client if it was not created before
func (p *SingletonClientGenerator) KubeClient() (kubernetes.Interface, error) {
var err error
p.Once.Do(func() {
p.client, err = NewKubeClient(p.KubeConfig, p.KubeMaster)
})
return p.client, err
}
// ByNames returns multiple Sources given multiple names.
func ByNames(p ClientGenerator, names []string, cfg *Config) ([]Source, error) {
sources := []Source{}
for _, name := range names {
source := Lookup(name)
if source == nil {
return nil, fmt.Errorf("%s source could not be identified", name)
source, err := BuildWithConfig(name, p, cfg)
if err != nil {
return nil, err
}
sources = append(sources, source)
}
return sources, nil
}
// BuildWithConfig allows to generate a Source implementation from the shared config
func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, error) {
switch source {
case "service":
client, err := p.KubeClient()
if err != nil {
return nil, err
}
return NewServiceSource(client, cfg.FQDNTemplate, cfg.Namespace, cfg.Compatibility)
case "ingress":
client, err := p.KubeClient()
if err != nil {
return nil, err
}
return NewIngressSource(client, cfg.FQDNTemplate, cfg.Namespace)
case "fake":
return NewFakeSource(cfg.FQDNTemplate)
}
return nil, ErrSourceNotFound
}
// NewKubeClient returns a new Kubernetes client object. It takes a Config and
// uses KubeMaster and KubeConfig attributes to connect to the cluster. If
// KubeConfig isn't provided it defaults to using the recommended default.
func NewKubeClient(kubeConfig, kubeMaster string) (*kubernetes.Clientset, error) {
if kubeConfig == "" {
if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
kubeConfig = clientcmd.RecommendedHomeFile
}
}
config, err := clientcmd.BuildConfigFromFlags(kubeMaster, kubeConfig)
if err != nil {
return nil, err
}
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return instrumented_http.NewTransport(rt, &instrumented_http.Callbacks{
PathProcessor: func(path string) string {
parts := strings.Split(path, "/")
return parts[len(parts)-1]
},
})
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
log.Infof("Connected to cluster at %s", config.Host)
return client, nil
}

View File

@ -17,86 +17,73 @@ limitations under the License.
package source
import (
"errors"
"testing"
"github.com/kubernetes-incubator/external-dns/internal/testutils"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
func TestStore(t *testing.T) {
t.Run("RegisterAndLookup", testRegisterAndLookup)
t.Run("LookupMultiple", testLookupMultiple)
type MockClientGenerator struct {
mock.Mock
client kubernetes.Interface
}
// testRegisterAndLookup tests that a Source can be registered and looked up by name.
func testRegisterAndLookup(t *testing.T) {
for _, tc := range []struct {
title string
givenAndExpected map[string]Source
}{
{
"registered source is found by name",
map[string]Source{
"foo": &testutils.MockSource{},
},
},
} {
t.Run(tc.title, func(t *testing.T) {
for k, v := range tc.givenAndExpected {
Register(k, v)
}
for k, v := range tc.givenAndExpected {
assert.Equal(t, v, Lookup(k))
}
})
func (m *MockClientGenerator) KubeClient() (kubernetes.Interface, error) {
args := m.Called()
if args.Error(1) == nil {
m.client = args.Get(0).(kubernetes.Interface)
return m.client, nil
}
return nil, args.Error(1)
}
// testLookupMultiple tests that Sources can be looked up by providing multiple names.
func testLookupMultiple(t *testing.T) {
for _, tc := range []struct {
title string
registered map[string]Source
names []string
expectError bool
}{
{
"multiple registered sources are found by names",
map[string]Source{
"foo": &testutils.MockSource{},
"bar": &testutils.MockSource{},
},
[]string{"foo", "bar"},
false,
},
{
"multiple registered sources, one source not registered",
map[string]Source{
"foo": &testutils.MockSource{},
"bar": &testutils.MockSource{},
},
[]string{"foo", "bar", "baz"},
true,
},
} {
t.Run(tc.title, func(t *testing.T) {
for k, v := range tc.registered {
Register(k, v)
}
lookup, err := LookupMultiple(tc.names)
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Len(t, lookup, len(tc.registered))
for _, source := range tc.registered {
assert.Contains(t, lookup, source)
}
}
})
}
type ByNamesTestSuite struct {
suite.Suite
}
func (suite *ByNamesTestSuite) TestAllInitialized() {
mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(fake.NewSimpleClientset(), nil)
sources, err := ByNames(mockClientGenerator, []string{"service", "ingress", "fake"}, &Config{})
suite.NoError(err, "should not generate errors")
suite.Len(sources, 3, "should generate all three sources")
}
func (suite *ByNamesTestSuite) TestOnlyFake() {
mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(fake.NewSimpleClientset(), nil)
sources, err := ByNames(mockClientGenerator, []string{"fake"}, &Config{})
suite.NoError(err, "should not generate errors")
suite.Len(sources, 1, "should generate all three sources")
suite.Nil(mockClientGenerator.client, "client should not be created")
}
func (suite *ByNamesTestSuite) TestSourceNotFound() {
mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(fake.NewSimpleClientset(), nil)
sources, err := ByNames(mockClientGenerator, []string{"foo"}, &Config{})
suite.Equal(err, ErrSourceNotFound, "should return sourcen not found")
suite.Len(sources, 0, "should not returns any source")
}
func (suite *ByNamesTestSuite) TestKubeClientFails() {
mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(nil, errors.New("foo"))
_, err := ByNames(mockClientGenerator, []string{"service"}, &Config{})
suite.Error(err, "should return an error if client cannot be created")
_, err = ByNames(mockClientGenerator, []string{"ingress"}, &Config{})
suite.Error(err, "should return an error if client cannot be created")
}
func TestByNames(t *testing.T) {
suite.Run(t, new(ByNamesTestSuite))
}