Merge pull request #1005 from devkid/feature/headless-services-publishnotreadyaddresses

Headless service: retrieve endpoints via Endpoints resource; evaluate spec.publishNotReadyAddresses
This commit is contained in:
Kubernetes Prow Robot 2020-03-04 05:37:46 -08:00 committed by GitHub
commit a563022e8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 318 additions and 215 deletions

View File

@ -95,10 +95,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -147,10 +147,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -108,10 +108,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -172,10 +172,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -194,10 +194,7 @@ metadata:
name: externaldns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]
@ -268,10 +265,7 @@ metadata:
name: externaldns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -221,10 +221,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]
@ -298,10 +295,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -75,10 +75,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -48,10 +48,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -128,10 +128,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -93,7 +93,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]

View File

@ -69,10 +69,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -60,10 +60,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -72,10 +72,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -116,10 +116,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -56,10 +56,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -109,10 +109,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -54,10 +54,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -65,10 +65,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -222,10 +222,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -85,10 +85,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -51,10 +51,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -76,7 +76,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -80,10 +80,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -76,10 +76,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -144,6 +144,8 @@ rules:
- ""
resources:
- services
- endpoints
- pods
verbs:
- get
- watch

View File

@ -67,10 +67,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

View File

@ -97,10 +97,7 @@ metadata:
name: external-dns
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: [""]
resources: ["pods"]
resources: ["services","endpoints","pods"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions"]
resources: ["ingresses"]

39
main.go
View File

@ -71,25 +71,26 @@ func main() {
// Create a source.Config from the flags passed by the user.
sourceCfg := &source.Config{
Namespace: cfg.Namespace,
AnnotationFilter: cfg.AnnotationFilter,
FQDNTemplate: cfg.FQDNTemplate,
CombineFQDNAndAnnotation: cfg.CombineFQDNAndAnnotation,
IgnoreHostnameAnnotation: cfg.IgnoreHostnameAnnotation,
Compatibility: cfg.Compatibility,
PublishInternal: cfg.PublishInternal,
PublishHostIP: cfg.PublishHostIP,
ConnectorServer: cfg.ConnectorSourceServer,
CRDSourceAPIVersion: cfg.CRDSourceAPIVersion,
CRDSourceKind: cfg.CRDSourceKind,
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
ServiceTypeFilter: cfg.ServiceTypeFilter,
IstioIngressGatewayServices: cfg.IstioIngressGatewayServices,
CFAPIEndpoint: cfg.CFAPIEndpoint,
CFUsername: cfg.CFUsername,
CFPassword: cfg.CFPassword,
ContourLoadBalancerService: cfg.ContourLoadBalancerService,
Namespace: cfg.Namespace,
AnnotationFilter: cfg.AnnotationFilter,
FQDNTemplate: cfg.FQDNTemplate,
CombineFQDNAndAnnotation: cfg.CombineFQDNAndAnnotation,
IgnoreHostnameAnnotation: cfg.IgnoreHostnameAnnotation,
Compatibility: cfg.Compatibility,
PublishInternal: cfg.PublishInternal,
PublishHostIP: cfg.PublishHostIP,
AlwaysPublishNotReadyAddresses: cfg.AlwaysPublishNotReadyAddresses,
ConnectorServer: cfg.ConnectorSourceServer,
CRDSourceAPIVersion: cfg.CRDSourceAPIVersion,
CRDSourceKind: cfg.CRDSourceKind,
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
ServiceTypeFilter: cfg.ServiceTypeFilter,
IstioIngressGatewayServices: cfg.IstioIngressGatewayServices,
CFAPIEndpoint: cfg.CFAPIEndpoint,
CFUsername: cfg.CFUsername,
CFPassword: cfg.CFPassword,
ContourLoadBalancerService: cfg.ContourLoadBalancerService,
}
// Lookup all the selected sources by names and pass them the desired configuration.

View File

@ -51,6 +51,7 @@ type Config struct {
Compatibility string
PublishInternal bool
PublishHostIP bool
AlwaysPublishNotReadyAddresses bool
ConnectorSourceServer string
Provider string
GoogleProject string
@ -295,6 +296,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("compatibility", "Process annotation semantics from legacy implementations (optional, options: mate, molecule)").Default(defaultConfig.Compatibility).EnumVar(&cfg.Compatibility, "", "mate", "molecule")
app.Flag("publish-internal-services", "Allow external-dns to publish DNS records for ClusterIP services (optional)").BoolVar(&cfg.PublishInternal)
app.Flag("publish-host-ip", "Allow external-dns to publish host-ip for headless services (optional)").BoolVar(&cfg.PublishHostIP)
app.Flag("always-publish-not-ready-addresses", "Always publish also not ready addresses for headless services (optional)").BoolVar(&cfg.AlwaysPublishNotReadyAddresses)
app.Flag("connector-source-server", "The server to connect for connector source, valid only when using connector source").Default(defaultConfig.ConnectorSourceServer).StringVar(&cfg.ConnectorSourceServer)
app.Flag("crd-source-apiversion", "API version of the CRD for crd source, e.g. `externaldns.k8s.io/v1alpha1`, valid only when using crd source").Default(defaultConfig.CRDSourceAPIVersion).StringVar(&cfg.CRDSourceAPIVersion)
app.Flag("crd-source-kind", "Kind of the CRD for the crd source in API group and version specified by crd-source-apiversion").Default(defaultConfig.CRDSourceKind).StringVar(&cfg.CRDSourceKind)

View File

@ -51,22 +51,25 @@ type serviceSource struct {
client kubernetes.Interface
namespace string
annotationFilter string
// process Services with legacy annotations
compatibility string
fqdnTemplate *template.Template
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
publishInternal bool
publishHostIP bool
serviceInformer coreinformers.ServiceInformer
podInformer coreinformers.PodInformer
nodeInformer coreinformers.NodeInformer
serviceTypeFilter map[string]struct{}
runner *async.BoundedFrequencyRunner
compatibility string
fqdnTemplate *template.Template
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
publishInternal bool
publishHostIP bool
alwaysPublishNotReadyAddresses bool
serviceInformer coreinformers.ServiceInformer
endpointsInformer coreinformers.EndpointsInformer
podInformer coreinformers.PodInformer
nodeInformer coreinformers.NodeInformer
serviceTypeFilter map[string]struct{}
runner *async.BoundedFrequencyRunner
}
// NewServiceSource creates a new serviceSource with the given config.
func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool, publishHostIP bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool) (Source, error) {
func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool, publishHostIP bool, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool) (Source, error) {
var (
tmpl *template.Template
err error
@ -84,6 +87,7 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
endpointsInformer := informerFactory.Core().V1().Endpoints()
podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()
@ -94,6 +98,12 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
},
},
)
endpointsInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -126,19 +136,21 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}
return &serviceSource{
client: kubeClient,
namespace: namespace,
annotationFilter: annotationFilter,
compatibility: compatibility,
fqdnTemplate: tmpl,
combineFQDNAnnotation: combineFqdnAnnotation,
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
publishInternal: publishInternal,
publishHostIP: publishHostIP,
serviceInformer: serviceInformer,
podInformer: podInformer,
nodeInformer: nodeInformer,
serviceTypeFilter: serviceTypes,
client: kubeClient,
namespace: namespace,
annotationFilter: annotationFilter,
compatibility: compatibility,
fqdnTemplate: tmpl,
combineFQDNAnnotation: combineFqdnAnnotation,
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
publishInternal: publishInternal,
publishHostIP: publishHostIP,
alwaysPublishNotReadyAddresses: alwaysPublishNotReadyAddresses,
serviceInformer: serviceInformer,
endpointsInformer: endpointsInformer,
podInformer: podInformer,
nodeInformer: nodeInformer,
serviceTypeFilter: serviceTypes,
}, nil
}
@ -207,6 +219,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}
// extractHeadlessEndpoints extracts endpoints from a headless service using the "Endpoints" Kubernetes API resource
func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint
@ -219,6 +232,12 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
return nil
}
endpointsObject, err := sc.endpointsInformer.Lister().Endpoints(svc.Namespace).Get(svc.GetName())
if err != nil {
log.Errorf("Get endpoints of service[%s] error:%v", svc.GetName(), err)
return endpoints
}
pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
@ -226,32 +245,47 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
}
targetsByHeadlessDomain := make(map[string][]string)
for _, v := range pods {
headlessDomains := []string{hostname}
if v.Spec.Hostname != "" {
headlessDomains = append(headlessDomains, fmt.Sprintf("%s.%s", v.Spec.Hostname, hostname))
for _, subset := range endpointsObject.Subsets {
addresses := subset.Addresses
if svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses {
addresses = append(addresses, subset.NotReadyAddresses...)
}
for _, headlessDomain := range headlessDomains {
if sc.publishHostIP {
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, v.Status.HostIP)
// To reduce traffic on the DNS API only add record for running Pods. Good Idea?
if v.Status.Phase == v1.PodRunning {
targetsByHeadlessDomain[headlessDomain] = append(targetsByHeadlessDomain[headlessDomain], v.Status.HostIP)
} else {
log.Debugf("Pod %s is not in running phase", v.Spec.Hostname)
}
} else {
log.Debugf("Generating matching endpoint %s with PodIP %s", headlessDomain, v.Status.PodIP)
// To reduce traffice on the DNS API only add record for running Pods. Good Idea?
if v.Status.Phase == v1.PodRunning {
targetsByHeadlessDomain[headlessDomain] = append(targetsByHeadlessDomain[headlessDomain], v.Status.PodIP)
} else {
log.Debugf("Pod %s is not in running phase", v.Spec.Hostname)
for _, address := range addresses {
// find pod for this address
if address.TargetRef.APIVersion != "" || address.TargetRef.Kind != "Pod" {
log.Debugf("Skipping address because its target is not a pod: %v", address)
continue
}
var pod *v1.Pod
for _, v := range pods {
if v.Name == address.TargetRef.Name {
pod = v
break
}
}
}
if pod == nil {
log.Errorf("Pod %s not found for address %v", address.TargetRef.Name, address)
continue
}
headlessDomains := []string{hostname}
if pod.Spec.Hostname != "" {
headlessDomains = append(headlessDomains, fmt.Sprintf("%s.%s", pod.Spec.Hostname, hostname))
}
for _, headlessDomain := range headlessDomains {
var ep string
if sc.publishHostIP {
ep = pod.Status.HostIP
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, ep)
} else {
ep = address.IP
log.Debugf("Generating matching endpoint %s with EndpointAddress IP %s", headlessDomain, ep)
}
targetsByHeadlessDomain[headlessDomain] = append(targetsByHeadlessDomain[headlessDomain], ep)
}
}
}
headlessDomains := []string{}

View File

@ -51,6 +51,7 @@ func (suite *ServiceSuite) SetupTest() {
"",
false,
false,
false,
[]string{},
false,
)
@ -143,6 +144,7 @@ func testServiceSourceNewServiceSource(t *testing.T) {
"",
false,
false,
false,
ti.serviceTypesFilter,
false,
)
@ -1107,6 +1109,7 @@ func testServiceSourceEndpoints(t *testing.T) {
tc.compatibility,
false,
false,
false,
tc.serviceTypesFilter,
tc.ignoreHostnameAnnotation,
)
@ -1277,6 +1280,7 @@ func TestClusterIpServices(t *testing.T) {
tc.compatibility,
true,
false,
false,
[]string{},
tc.ignoreHostnameAnnotation,
)
@ -1608,6 +1612,7 @@ func TestNodePortServices(t *testing.T) {
tc.compatibility,
true,
false,
false,
[]string{},
tc.ignoreHostnameAnnotation,
)
@ -1645,7 +1650,8 @@ func TestHeadlessServices(t *testing.T) {
lbs []string
podnames []string
hostnames []string
phases []v1.PodPhase
podsReady []bool
publishNotReadyAddresses bool
expected []*endpoint.Endpoint
expectError bool
}{
@ -1670,7 +1676,8 @@ func TestHeadlessServices(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.2"}},
@ -1699,7 +1706,8 @@ func TestHeadlessServices(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]bool{true, true},
false,
[]*endpoint.Endpoint{},
false,
},
@ -1725,7 +1733,8 @@ func TestHeadlessServices(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}, RecordTTL: endpoint.TTL(1)},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.2"}, RecordTTL: endpoint.TTL(1)},
@ -1754,13 +1763,44 @@ func TestHeadlessServices(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodFailed},
[]bool{true, false},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
},
false,
},
{
"annotated Headless services return endpoints for all Pod if publishNotReadyAddresses is set",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
false,
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
[]string{"1.1.1.1", "1.1.1.2"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]bool{true, false},
true,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.2"}},
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}},
},
false,
},
{
"annotated Headless services return endpoints for pods missing hostname",
"",
@ -1782,7 +1822,8 @@ func TestHeadlessServices(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"", ""},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}},
},
@ -1822,9 +1863,10 @@ func TestHeadlessServices(t *testing.T) {
service := &v1.Service{
Spec: v1.ServiceSpec{
Type: tc.svcType,
ClusterIP: tc.clusterIP,
Selector: tc.selector,
Type: tc.svcType,
ClusterIP: tc.clusterIP,
Selector: tc.selector,
PublishNotReadyAddresses: tc.publishNotReadyAddresses,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
@ -1837,6 +1879,7 @@ func TestHeadlessServices(t *testing.T) {
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(service)
require.NoError(t, err)
var addresses, notReadyAddresses []v1.EndpointAddress
for i, podname := range tc.podnames {
pod := &v1.Pod{
Spec: v1.PodSpec{
@ -1851,13 +1894,41 @@ func TestHeadlessServices(t *testing.T) {
},
Status: v1.PodStatus{
PodIP: tc.podIPs[i],
Phase: tc.phases[i],
},
}
_, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(pod)
require.NoError(t, err)
address := v1.EndpointAddress{
IP: tc.podIPs[i],
TargetRef: &v1.ObjectReference{
APIVersion: "",
Kind: "Pod",
Name: podname,
},
}
if tc.podsReady[i] {
addresses = append(addresses, address)
} else {
notReadyAddresses = append(notReadyAddresses, address)
}
}
endpointsObject := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
},
Subsets: []v1.EndpointSubset{
{
Addresses: addresses,
NotReadyAddresses: notReadyAddresses,
},
},
}
_, err = kubernetes.CoreV1().Endpoints(tc.svcNamespace).Create(endpointsObject)
require.NoError(t, err)
// Create our object under test and get the endpoints.
client, _ := NewServiceSource(
@ -1869,6 +1940,7 @@ func TestHeadlessServices(t *testing.T) {
tc.compatibility,
true,
false,
false,
[]string{},
tc.ignoreHostnameAnnotation,
)
@ -1906,7 +1978,8 @@ func TestHeadlessServicesHostIP(t *testing.T) {
lbs []string
podnames []string
hostnames []string
phases []v1.PodPhase
podsReady []bool
publishNotReadyAddresses bool
expected []*endpoint.Endpoint
expectError bool
}{
@ -1931,7 +2004,8 @@ func TestHeadlessServicesHostIP(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.2"}},
@ -1960,7 +2034,8 @@ func TestHeadlessServicesHostIP(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]bool{true, true},
false,
[]*endpoint.Endpoint{},
false,
},
@ -1986,7 +2061,8 @@ func TestHeadlessServicesHostIP(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}, RecordTTL: endpoint.TTL(1)},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.2"}, RecordTTL: endpoint.TTL(1)},
@ -2015,13 +2091,44 @@ func TestHeadlessServicesHostIP(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodFailed},
[]bool{true, false},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
},
false,
},
{
"annotated Headless services return endpoints for all Pod if publishNotReadyAddresses is set",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
false,
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
[]string{"1.1.1.1", "1.1.1.2"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]bool{true, false},
true,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.2"}},
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}},
},
false,
},
{
"annotated Headless services return endpoints for pods missing hostname",
"",
@ -2043,7 +2150,8 @@ func TestHeadlessServicesHostIP(t *testing.T) {
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"", ""},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}},
},
@ -2056,9 +2164,10 @@ func TestHeadlessServicesHostIP(t *testing.T) {
service := &v1.Service{
Spec: v1.ServiceSpec{
Type: tc.svcType,
ClusterIP: tc.clusterIP,
Selector: tc.selector,
Type: tc.svcType,
ClusterIP: tc.clusterIP,
Selector: tc.selector,
PublishNotReadyAddresses: tc.publishNotReadyAddresses,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
@ -2071,6 +2180,8 @@ func TestHeadlessServicesHostIP(t *testing.T) {
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(service)
require.NoError(t, err)
var addresses []v1.EndpointAddress
var notReadyAddresses []v1.EndpointAddress
for i, podname := range tc.podnames {
pod := &v1.Pod{
Spec: v1.PodSpec{
@ -2085,13 +2196,41 @@ func TestHeadlessServicesHostIP(t *testing.T) {
},
Status: v1.PodStatus{
HostIP: tc.hostIPs[i],
Phase: tc.phases[i],
},
}
_, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(pod)
require.NoError(t, err)
address := v1.EndpointAddress{
IP: "4.3.2.1",
TargetRef: &v1.ObjectReference{
APIVersion: "",
Kind: "Pod",
Name: podname,
},
}
if tc.podsReady[i] {
addresses = append(addresses, address)
} else {
notReadyAddresses = append(notReadyAddresses, address)
}
}
endpointsObject := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
},
Subsets: []v1.EndpointSubset{
{
Addresses: addresses,
NotReadyAddresses: notReadyAddresses,
},
},
}
_, err = kubernetes.CoreV1().Endpoints(tc.svcNamespace).Create(endpointsObject)
require.NoError(t, err)
// Create our object under test and get the endpoints.
client, _ := NewServiceSource(
@ -2103,6 +2242,7 @@ func TestHeadlessServicesHostIP(t *testing.T) {
tc.compatibility,
true,
true,
false,
[]string{},
tc.ignoreHostnameAnnotation,
)
@ -2207,6 +2347,7 @@ func TestExternalServices(t *testing.T) {
tc.compatibility,
true,
false,
false,
[]string{},
tc.ignoreHostnameAnnotation,
)
@ -2249,7 +2390,7 @@ func BenchmarkServiceEndpoints(b *testing.B) {
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(service)
require.NoError(b, err)
client, err := NewServiceSource(kubernetes, v1.NamespaceAll, "", "", false, "", false, false, []string{}, false)
client, err := NewServiceSource(kubernetes, v1.NamespaceAll, "", "", false, "", false, false, false, []string{}, false)
require.NoError(b, err)
for i := 0; i < b.N; i++ {

View File

@ -40,25 +40,26 @@ var ErrSourceNotFound = errors.New("source not found")
// Config holds shared configuration options for all Sources.
type Config struct {
Namespace string
AnnotationFilter string
FQDNTemplate string
CombineFQDNAndAnnotation bool
IgnoreHostnameAnnotation bool
Compatibility string
PublishInternal bool
PublishHostIP bool
ConnectorServer string
CRDSourceAPIVersion string
CRDSourceKind string
KubeConfig string
KubeMaster string
ServiceTypeFilter []string
IstioIngressGatewayServices []string
CFAPIEndpoint string
CFUsername string
CFPassword string
ContourLoadBalancerService string
Namespace string
AnnotationFilter string
FQDNTemplate string
CombineFQDNAndAnnotation bool
IgnoreHostnameAnnotation bool
Compatibility string
PublishInternal bool
PublishHostIP bool
AlwaysPublishNotReadyAddresses bool
ConnectorServer string
CRDSourceAPIVersion string
CRDSourceKind string
KubeConfig string
KubeMaster string
ServiceTypeFilter []string
IstioIngressGatewayServices []string
CFAPIEndpoint string
CFUsername string
CFPassword string
ContourLoadBalancerService string
}
// ClientGenerator provides clients
@ -164,7 +165,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err
if err != nil {
return nil, err
}
return NewServiceSource(client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP, cfg.ServiceTypeFilter, cfg.IgnoreHostnameAnnotation)
return NewServiceSource(client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP, cfg.AlwaysPublishNotReadyAddresses, cfg.ServiceTypeFilter, cfg.IgnoreHostnameAnnotation)
case "ingress":
client, err := p.KubeClient()
if err != nil {