diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 0350c63bb..d7af428cc 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -29,6 +29,8 @@ const ( RecordTypeCNAME = "CNAME" // RecordTypeTXT is a RecordType enum value RecordTypeTXT = "TXT" + // RecordTypeSRV is a RecordType enum value + RecordTypeSRV = "SRV" ) // TTL is a structure defining the TTL of a DNS record diff --git a/provider/recordfilter.go b/provider/recordfilter.go index ca757b6db..8b5c31178 100644 --- a/provider/recordfilter.go +++ b/provider/recordfilter.go @@ -17,10 +17,10 @@ limitations under the License. package provider // supportedRecordType returns true only for supported record types. -// Currently only A, CNAME and TXT record types are supported. +// Currently A, CNAME, SRV, and TXT record types are supported. func supportedRecordType(recordType string) bool { switch recordType { - case "A", "CNAME", "TXT": + case "A", "CNAME", "SRV", "TXT": return true default: return false diff --git a/source/service.go b/source/service.go index ce4710371..825797632 100644 --- a/source/service.go +++ b/source/service.go @@ -90,6 +90,12 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) { return nil, err } + // get the ip addresses of all the nodes and cache them for this run + nodeTargets, err := sc.extractNodeTargets() + if err != nil { + return nil, err + } + endpoints := []*endpoint.Endpoint{} for _, svc := range services.Items { @@ -101,7 +107,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) { continue } - svcEndpoints := sc.endpoints(&svc) + svcEndpoints := sc.endpoints(&svc, nodeTargets) // process legacy annotations if no endpoints were returned and compatibility mode is enabled. if len(svcEndpoints) == 0 && sc.compatibility != "" { @@ -110,7 +116,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) { // apply template if none of the above is found if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil { - sEndpoints, err := sc.endpointsFromTemplate(&svc) + sEndpoints, err := sc.endpointsFromTemplate(&svc, nodeTargets) if err != nil { return nil, err } @@ -169,7 +175,8 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri return endpoints } -func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.Endpoint, error) { + +func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service, nodeTargets endpoint.Targets) ([]*endpoint.Endpoint, error) { var endpoints []*endpoint.Endpoint // Process the whole template string @@ -181,19 +188,19 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.End hostnameList := strings.Split(strings.Replace(buf.String(), " ", "", -1), ",") for _, hostname := range hostnameList { - endpoints = append(endpoints, sc.generateEndpoints(svc, hostname)...) + endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, nodeTargets)...) } return endpoints, nil } // endpointsFromService extracts the endpoints from a service object -func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint { +func (sc *serviceSource) endpoints(svc *v1.Service, nodeTargets endpoint.Targets) []*endpoint.Endpoint { var endpoints []*endpoint.Endpoint hostnameList := getHostnamesFromAnnotations(svc.Annotations) for _, hostname := range hostnameList { - endpoints = append(endpoints, sc.generateEndpoints(svc, hostname)...) + endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, nodeTargets)...) } return endpoints @@ -236,7 +243,7 @@ func (sc *serviceSource) setResourceLabel(service v1.Service, endpoints []*endpo } } -func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string) []*endpoint.Endpoint { +func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, nodeTargets endpoint.Targets) []*endpoint.Endpoint { hostname = strings.TrimSuffix(hostname, ".") ttl, err := getTTLFromAnnotations(svc.Annotations) if err != nil { @@ -272,7 +279,10 @@ func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string) []* if svc.Spec.ClusterIP == v1.ClusterIPNone { endpoints = append(endpoints, sc.extractHeadlessEndpoints(svc, hostname, ttl)...) } - + case v1.ServiceTypeNodePort: + // add the nodeTargets and extract an SRV endpoint + targets = append(targets, nodeTargets...) + endpoints = append(endpoints, sc.extractNodePortEndpoints(svc, nodeTargets, hostname, ttl)...) } for _, t := range targets { @@ -316,3 +326,68 @@ func extractLoadBalancerTargets(svc *v1.Service) endpoint.Targets { return targets } + +func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) { + var ( + internalIPs endpoint.Targets + externalIPs endpoint.Targets + ) + + nodes, err := sc.client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, node := range nodes.Items { + for _, address := range node.Status.Addresses { + switch address.Type { + case v1.NodeExternalIP: + externalIPs = append(externalIPs, address.Address) + case v1.NodeInternalIP: + internalIPs = append(internalIPs, address.Address) + } + } + } + + if len(externalIPs) > 0 { + return externalIPs, nil + } + + return internalIPs, nil +} + +func (sc *serviceSource) extractNodePortEndpoints(svc *v1.Service, nodeTargets endpoint.Targets, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint { + var endpoints []*endpoint.Endpoint + + for _, port := range svc.Spec.Ports { + if port.NodePort > 0 { + // build a target with a priority of 0, weight of 0, and pointing the given port on the given host + target := fmt.Sprintf("0 50 %d %s", port.NodePort, hostname) + + // figure out the portname + portName := port.Name + if portName == "" { + portName = fmt.Sprintf("%d", port.NodePort) + } + + // figure out the protocol + protocol := strings.ToLower(string(port.Protocol)) + if protocol == "" { + protocol = "tcp" + } + + recordName := fmt.Sprintf("_%s._%s.%s", portName, protocol, hostname) + + var ep *endpoint.Endpoint + if ttl.IsConfigured() { + ep = endpoint.NewEndpointWithTTL(recordName, endpoint.RecordTypeSRV, ttl, target) + } else { + ep = endpoint.NewEndpoint(recordName, endpoint.RecordTypeSRV, target) + } + + endpoints = append(endpoints, ep) + } + } + + return endpoints +} diff --git a/source/service_test.go b/source/service_test.go index 47f68b4c7..8ce4db13a 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -1022,6 +1022,201 @@ func TestClusterIpServices(t *testing.T) { } } +// testNodePortServices tests that various services generate the correct endpoints. +func TestNodePortServices(t *testing.T) { + for _, tc := range []struct { + title string + targetNamespace string + annotationFilter string + svcNamespace string + svcName string + svcType v1.ServiceType + compatibility string + fqdnTemplate string + labels map[string]string + annotations map[string]string + lbs []string + expected []*endpoint.Endpoint + expectError bool + nodes []*v1.Node + }{ + { + "annotated NodePort services return an endpoint with IP addresses of the cluster's nodes", + "", + "", + "testing", + "foo", + v1.ServiceTypeNodePort, + "", + "", + map[string]string{}, + map[string]string{ + hostnameAnnotationKey: "foo.example.org.", + }, + nil, + []*endpoint.Endpoint{ + {DNSName: "_30192._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV}, + {DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1", "54.10.11.2"}, RecordType: endpoint.RecordTypeA}, + }, + false, + []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.1"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.1"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.2"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.2"}, + }, + }, + }}, + }, + { + "non-annotated NodePort services with set fqdnTemplate return an endpoint with target IP", + "", + "", + "testing", + "foo", + v1.ServiceTypeNodePort, + "", + "{{.Name}}.bar.example.com", + map[string]string{}, + map[string]string{}, + nil, + []*endpoint.Endpoint{ + {DNSName: "_30192._tcp.foo.bar.example.com", Targets: endpoint.Targets{"0 50 30192 foo.bar.example.com"}, RecordType: endpoint.RecordTypeSRV}, + {DNSName: "foo.bar.example.com", Targets: endpoint.Targets{"54.10.11.1", "54.10.11.2"}, RecordType: endpoint.RecordTypeA}, + }, + false, + []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.1"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.1"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.2"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.2"}, + }, + }, + }}, + }, + { + "annotated NodePort services return an endpoint with IP addresses of the private cluster's nodes", + "", + "", + "testing", + "foo", + v1.ServiceTypeNodePort, + "", + "", + map[string]string{}, + map[string]string{ + hostnameAnnotationKey: "foo.example.org.", + }, + nil, + []*endpoint.Endpoint{ + {DNSName: "_30192._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV}, + {DNSName: "foo.example.org", Targets: endpoint.Targets{"10.0.1.1", "10.0.1.2"}, RecordType: endpoint.RecordTypeA}, + }, + false, + []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.0.1.1"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.0.1.2"}, + }, + }, + }}, + }, + } { + t.Run(tc.title, func(t *testing.T) { + // Create a Kubernetes testing client + kubernetes := fake.NewSimpleClientset() + + // Create the nodes + for _, node := range tc.nodes { + if _, err := kubernetes.Core().Nodes().Create(node); err != nil { + t.Fatal(err) + } + } + + // Create a service to test against + service := &v1.Service{ + Spec: v1.ServiceSpec{ + Type: tc.svcType, + Ports: []v1.ServicePort{ + { + NodePort: 30192, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: tc.svcNamespace, + Name: tc.svcName, + Labels: tc.labels, + Annotations: tc.annotations, + }, + } + + _, err := kubernetes.CoreV1().Services(service.Namespace).Create(service) + require.NoError(t, err) + + // Create our object under test and get the endpoints. + client, _ := NewServiceSource( + kubernetes, + tc.targetNamespace, + tc.annotationFilter, + tc.fqdnTemplate, + false, + tc.compatibility, + true, + ) + require.NoError(t, err) + + endpoints, err := client.Endpoints() + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + // Validate returned endpoints against desired endpoints. + validateEndpoints(t, endpoints, tc.expected) + }) + } +} + // TestHeadlessServices tests that headless services generate the correct endpoints. func TestHeadlessServices(t *testing.T) { for _, tc := range []struct {