adding a flag to optionally publish hostIP instead of podIP for headless services (#597)

* Added HostPort feature docs

* Fixed some typos

* Fixing hostIP,podIP change

Fixing hostIP,podIP change

Merge artifacts in docs

Naming typo

removing unnecessary files

fix(source): fix misleading log message

Naming typo

removing unnecessary files

* fix(source): fix misleading log message
This commit is contained in:
Arttii 2018-07-26 18:16:32 +02:00 committed by Martin Linkhorst
parent b9f7dd8f8f
commit 874502ebf8
7 changed files with 238 additions and 11 deletions

View File

@ -189,4 +189,3 @@ kafka-1.ksvc.example.org
kafka-2.ksvc.example.org
```

View File

@ -73,6 +73,7 @@ func main() {
CombineFQDNAndAnnotation: cfg.CombineFQDNAndAnnotation,
Compatibility: cfg.Compatibility,
PublishInternal: cfg.PublishInternal,
PublishHostIP: cfg.PublishHostIP,
ConnectorServer: cfg.ConnectorSourceServer,
}

View File

@ -45,6 +45,7 @@ type Config struct {
CombineFQDNAndAnnotation bool
Compatibility string
PublishInternal bool
PublishHostIP bool
ConnectorSourceServer string
Provider string
GoogleProject string
@ -101,6 +102,7 @@ var defaultConfig = &Config{
CombineFQDNAndAnnotation: false,
Compatibility: "",
PublishInternal: false,
PublishHostIP: false,
ConnectorSourceServer: "localhost:8080",
Provider: "",
GoogleProject: "",
@ -190,6 +192,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("combine-fqdn-annotation", "Combine FQDN template and Annotations instead of overwriting").BoolVar(&cfg.CombineFQDNAndAnnotation)
app.Flag("compatibility", "Process annotation semantics from legacy implementations (optional, options: mate, molecule)").Default(defaultConfig.Compatibility).EnumVar(&cfg.Compatibility, "", "mate", "molecule")
app.Flag("publish-internal-services", "Allow external-dns to publish DNS records for ClusterIP services (optional)").BoolVar(&cfg.PublishInternal)
app.Flag("publish-host-ip", "Allow external-dns to publish host-ip for headless services (optional)").BoolVar(&cfg.PublishHostIP)
app.Flag("connector-source-server", "The server to connect for connector source, valid only when using connector source").Default(defaultConfig.ConnectorSourceServer).StringVar(&cfg.ConnectorSourceServer)
// Flags related to providers

Binary file not shown.

View File

@ -51,10 +51,11 @@ type serviceSource struct {
fqdnTemplate *template.Template
combineFQDNAnnotation bool
publishInternal bool
publishHostIP bool
}
// NewServiceSource creates a new serviceSource with the given config.
func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool) (Source, error) {
func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool, publishHostIP bool) (Source, error) {
var (
tmpl *template.Template
err error
@ -76,6 +77,7 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
fqdnTemplate: tmpl,
combineFQDNAnnotation: combineFqdnAnnotation,
publishInternal: publishInternal,
publishHostIP: publishHostIP,
}, nil
}
@ -160,17 +162,32 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
headlessDomain = v.Spec.Hostname + "." + headlessDomain
}
log.Debugf("Generating matching endpoint %s with PodIP %s", headlessDomain, v.Status.PodIP)
// To reduce traffice on the DNS API only add record for running Pods. Good Idea?
if v.Status.Phase == v1.PodRunning {
if ttl.IsConfigured() {
endpoints = append(endpoints, endpoint.NewEndpointWithTTL(headlessDomain, endpoint.RecordTypeA, ttl, v.Status.PodIP))
if sc.publishHostIP == true {
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, v.Status.HostIP)
// To reduce traffice on the DNS API only add record for running Pods. Good Idea?
if v.Status.Phase == v1.PodRunning {
if ttl.IsConfigured() {
endpoints = append(endpoints, endpoint.NewEndpointWithTTL(headlessDomain, endpoint.RecordTypeA, ttl, v.Status.HostIP))
} else {
endpoints = append(endpoints, endpoint.NewEndpoint(headlessDomain, endpoint.RecordTypeA, v.Status.HostIP))
}
} else {
endpoints = append(endpoints, endpoint.NewEndpoint(headlessDomain, endpoint.RecordTypeA, v.Status.PodIP))
log.Debugf("Pod %s is not in running phase", v.Spec.Hostname)
}
} else {
log.Debugf("Pod %s is not in running phase", v.Spec.Hostname)
log.Debugf("Generating matching endpoint %s with PodIP %s", headlessDomain, v.Status.PodIP)
// To reduce traffice on the DNS API only add record for running Pods. Good Idea?
if v.Status.Phase == v1.PodRunning {
if ttl.IsConfigured() {
endpoints = append(endpoints, endpoint.NewEndpointWithTTL(headlessDomain, endpoint.RecordTypeA, ttl, v.Status.PodIP))
} else {
endpoints = append(endpoints, endpoint.NewEndpoint(headlessDomain, endpoint.RecordTypeA, v.Status.PodIP))
}
} else {
log.Debugf("Pod %s is not in running phase", v.Spec.Hostname)
}
}
}
return endpoints

View File

@ -49,6 +49,7 @@ func (suite *ServiceSuite) SetupTest() {
false,
"",
false,
false,
)
suite.fooWithTargets = &v1.Service{
Spec: v1.ServiceSpec{
@ -132,6 +133,7 @@ func testServiceSourceNewServiceSource(t *testing.T) {
false,
"",
false,
false,
)
if ti.expectError {
@ -873,6 +875,7 @@ func testServiceSourceEndpoints(t *testing.T) {
tc.combineFQDNAndAnnotation,
tc.compatibility,
false,
false,
)
require.NoError(t, err)
@ -1006,6 +1009,7 @@ func TestClusterIpServices(t *testing.T) {
false,
tc.compatibility,
true,
false,
)
require.NoError(t, err)
@ -1201,6 +1205,7 @@ func TestNodePortServices(t *testing.T) {
false,
tc.compatibility,
true,
false,
)
require.NoError(t, err)
@ -1400,6 +1405,207 @@ func TestHeadlessServices(t *testing.T) {
false,
tc.compatibility,
true,
false,
)
require.NoError(t, err)
endpoints, err := client.Endpoints()
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
// Validate returned endpoints against desired endpoints.
validateEndpoints(t, endpoints, tc.expected)
})
}
}
// TestHeadlessServices tests that headless services generate the correct endpoints.
func TestHeadlessServicesHostIP(t *testing.T) {
for _, tc := range []struct {
title string
targetNamespace string
svcNamespace string
svcName string
svcType v1.ServiceType
compatibility string
fqdnTemplate string
labels map[string]string
annotations map[string]string
clusterIP string
hostIP string
selector map[string]string
lbs []string
podnames []string
hostnames []string
phases []v1.PodPhase
expected []*endpoint.Endpoint
expectError bool
}{
{
"annotated Headless services return endpoints for each selected Pod",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
"1.1.1.1",
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
},
false,
},
{
"annotated Headless services return endpoints with TTL for each selected Pod",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
ttlAnnotationKey: "1",
},
v1.ClusterIPNone,
"1.1.1.1",
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}, RecordTTL: endpoint.TTL(1)},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}, RecordTTL: endpoint.TTL(1)},
},
false,
},
{
"annotated Headless services return endpoints for each selected Pod, which are in running state",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
"1.1.1.1",
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodFailed},
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
},
false,
},
{
"annotated Headless services return endpoints for pods missing hostname",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
"1.1.1.1",
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"", ""},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]*endpoint.Endpoint{
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
},
false,
},
} {
t.Run(tc.title, func(t *testing.T) {
// Create a Kubernetes testing client
kubernetes := fake.NewSimpleClientset()
service := &v1.Service{
Spec: v1.ServiceSpec{
Type: tc.svcType,
ClusterIP: tc.clusterIP,
Selector: tc.selector,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
Annotations: tc.annotations,
},
Status: v1.ServiceStatus{},
}
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(service)
require.NoError(t, err)
for i, podname := range tc.podnames {
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{},
Hostname: tc.hostnames[i],
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: podname,
Labels: tc.labels,
Annotations: tc.annotations,
},
Status: v1.PodStatus{
HostIP: tc.hostIP,
Phase: tc.phases[i],
},
}
_, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(pod)
require.NoError(t, err)
}
// Create our object under test and get the endpoints.
client, _ := NewServiceSource(
kubernetes,
tc.targetNamespace,
"",
tc.fqdnTemplate,
false,
tc.compatibility,
true,
true,
)
require.NoError(t, err)
@ -1440,7 +1646,7 @@ func BenchmarkServiceEndpoints(b *testing.B) {
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(service)
require.NoError(b, err)
client, err := NewServiceSource(kubernetes, v1.NamespaceAll, "", "", false, "", false)
client, err := NewServiceSource(kubernetes, v1.NamespaceAll, "", "", false, "", false, false)
require.NoError(b, err)
for i := 0; i < b.N; i++ {

View File

@ -41,6 +41,7 @@ type Config struct {
CombineFQDNAndAnnotation bool
Compatibility string
PublishInternal bool
PublishHostIP bool
ConnectorServer string
}
@ -89,7 +90,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err
if err != nil {
return nil, err
}
return NewServiceSource(client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal)
return NewServiceSource(client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP)
case "ingress":
client, err := p.KubeClient()
if err != nil {