diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 95461a5aa..68f82378a 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -19,6 +19,7 @@ package endpoint import ( "fmt" "net/netip" + "slices" "sort" "strconv" "strings" @@ -353,7 +354,21 @@ func (e *Endpoint) String() string { return fmt.Sprintf("%s %d IN %s %s %s %s", e.DNSName, e.RecordTTL, e.RecordType, e.SetIdentifier, e.Targets, e.ProviderSpecific) } -// Apply filter to slice of endpoints and return new filtered slice that includes +// UniqueOrderedTargets removes duplicate targets from the Endpoint and sorts them in lexicographical order. +func (e *Endpoint) UniqueOrderedTargets() { + result := make([]string, 0, len(e.Targets)) + existing := make(map[string]bool) + for _, target := range e.Targets { + if _, ok := existing[target]; !ok { + result = append(result, target) + existing[target] = true + } + } + slices.Sort(result) + e.Targets = result +} + +// FilterEndpointsByOwnerID Apply filter to slice of endpoints and return new filtered slice that includes // only endpoints that match. func FilterEndpointsByOwnerID(ownerID string, eps []*Endpoint) []*Endpoint { filtered := []*Endpoint{} diff --git a/endpoint/endpoint_test.go b/endpoint/endpoint_test.go index 0c2a3cbb9..d87aaab3c 100644 --- a/endpoint/endpoint_test.go +++ b/endpoint/endpoint_test.go @@ -925,3 +925,46 @@ func TestCheckEndpoint(t *testing.T) { }) } } + +func TestEndpoint_UniqueOrderedTargets(t *testing.T) { + tests := []struct { + name string + targets []string + expected Targets + want bool + }{ + { + name: "no duplicates", + targets: []string{"b.example.com", "a.example.com"}, + expected: Targets{"a.example.com", "b.example.com"}, + }, + { + name: "with duplicates", + targets: []string{"a.example.com", "b.example.com", "a.example.com"}, + expected: Targets{"a.example.com", "b.example.com"}, + }, + { + name: "already sorted", + targets: []string{"a.example.com", "b.example.com"}, + expected: Targets{"a.example.com", "b.example.com"}, + }, + { + name: "all duplicates", + targets: []string{"a.example.com", "a.example.com", "a.example.com"}, + expected: Targets{"a.example.com"}, + }, + { + name: "empty", + targets: []string{}, + expected: Targets{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ep := &Endpoint{Targets: tt.targets} + ep.UniqueOrderedTargets() + assert.Equal(t, tt.expected, ep.Targets) + }) + } +} diff --git a/source/service.go b/source/service.go index 91cd113d8..2606c0d8f 100644 --- a/source/service.go +++ b/source/service.go @@ -251,6 +251,29 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err sort.Slice(endpoints, func(i, j int) bool { return endpoints[i].Labels[endpoint.ResourceLabelKey] < endpoints[j].Labels[endpoint.ResourceLabelKey] }) + mergedEndpoints := make(map[endpoint.EndpointKey][]*endpoint.Endpoint) + for _, ep := range endpoints { + key := ep.Key() + if existing, ok := mergedEndpoints[key]; ok { + if existing[0].RecordType == endpoint.RecordTypeCNAME { + log.Debugf("CNAME %s with multiple targets found", ep.DNSName) + mergedEndpoints[key] = append(existing, ep) + continue + } + existing[0].Targets = append(existing[0].Targets, ep.Targets...) + existing[0].UniqueOrderedTargets() + mergedEndpoints[key] = existing + } else { + ep.UniqueOrderedTargets() + mergedEndpoints[key] = []*endpoint.Endpoint{ep} + } + } + processed := make([]*endpoint.Endpoint, 0, len(mergedEndpoints)) + for _, ep := range mergedEndpoints { + processed = append(processed, ep...) + } + endpoints = processed + // Use stable sort to not disrupt the order of services sort.SliceStable(endpoints, func(i, j int) bool { if endpoints[i].DNSName != endpoints[j].DNSName { @@ -258,31 +281,6 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err } return endpoints[i].RecordType < endpoints[j].RecordType }) - mergedEndpoints := []*endpoint.Endpoint{} - mergedEndpoints = append(mergedEndpoints, endpoints[0]) - for i := 1; i < len(endpoints); i++ { - lastMergedEndpoint := len(mergedEndpoints) - 1 - if mergedEndpoints[lastMergedEndpoint].DNSName == endpoints[i].DNSName && - mergedEndpoints[lastMergedEndpoint].RecordType == endpoints[i].RecordType && - mergedEndpoints[lastMergedEndpoint].RecordType != endpoint.RecordTypeCNAME && // It is against RFC-1034 for CNAME records to have multiple targets, so skip merging - mergedEndpoints[lastMergedEndpoint].SetIdentifier == endpoints[i].SetIdentifier && - mergedEndpoints[lastMergedEndpoint].RecordTTL == endpoints[i].RecordTTL { - mergedEndpoints[lastMergedEndpoint].Targets = append(mergedEndpoints[lastMergedEndpoint].Targets, endpoints[i].Targets[0]) - } else { - mergedEndpoints = append(mergedEndpoints, endpoints[i]) - } - - if mergedEndpoints[lastMergedEndpoint].DNSName == endpoints[i].DNSName && - mergedEndpoints[lastMergedEndpoint].RecordType == endpoints[i].RecordType && - mergedEndpoints[lastMergedEndpoint].RecordType == endpoint.RecordTypeCNAME { - log.Debugf("CNAME %s with multiple targets found", endpoints[i].DNSName) - } - } - endpoints = mergedEndpoints - } - - for _, ep := range endpoints { - sort.Sort(ep.Targets) } return endpoints, nil diff --git a/source/service_test.go b/source/service_test.go index 885af6959..406bdbc43 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -30,10 +30,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" "sigs.k8s.io/external-dns/endpoint" @@ -3089,7 +3091,7 @@ func TestHeadlessServices(t *testing.T) { t.Parallel() // Create a Kubernetes testing client - kubernetes := fake.NewSimpleClientset() + kubernetes := fake.NewClientset() service := &v1.Service{ Spec: v1.ServiceSpec{ @@ -3196,6 +3198,378 @@ func TestHeadlessServices(t *testing.T) { } } +func TestMultipleHeadlessServicesPointingToPodsOnTheSameNode(t *testing.T) { + kubernetes := fake.NewClientset() + + headless := []*v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "default", + Labels: map[string]string{ + "app": "kafka", + }, + Annotations: map[string]string{ + annotations.HostnameKey: "example.org", + }, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + ClusterIP: v1.ClusterIPNone, + ClusterIPs: []string{v1.ClusterIPNone}, + InternalTrafficPolicy: testutils.ToPtr(v1.ServiceInternalTrafficPolicyCluster), + IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, + IPFamilyPolicy: testutils.ToPtr(v1.IPFamilyPolicySingleStack), + Ports: []v1.ServicePort{ + { + Name: "web", + Port: 80, + Protocol: v1.ProtocolTCP, + TargetPort: intstr.FromInt32(80), + }, + }, + Selector: map[string]string{ + "app": "kafka", + }, + SessionAffinity: v1.ServiceAffinityNone, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-2", + Namespace: "default", + Labels: map[string]string{ + "app": "kafka", + }, + Annotations: map[string]string{ + annotations.HostnameKey: "example.org", + }, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + ClusterIP: v1.ClusterIPNone, + ClusterIPs: []string{v1.ClusterIPNone}, + InternalTrafficPolicy: testutils.ToPtr(v1.ServiceInternalTrafficPolicyCluster), + IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, + IPFamilyPolicy: testutils.ToPtr(v1.IPFamilyPolicySingleStack), + Ports: []v1.ServicePort{ + { + Name: "web", + Port: 80, + Protocol: v1.ProtocolTCP, + TargetPort: intstr.FromInt32(80), + }, + }, + Selector: map[string]string{ + "app": "kafka", + }, + SessionAffinity: v1.ServiceAffinityNone, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{}, + }, + }, + } + + assert.NotNil(t, headless) + + pods := []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-0", + Namespace: "default", + Labels: map[string]string{ + "app": "kafka", + appsv1.PodIndexLabel: "0", + appsv1.ControllerRevisionHashLabelKey: "kafka-b8d79cdb6", + appsv1.StatefulSetPodNameLabel: "kafka-0", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "kafka", + }, + }, + }, + Spec: v1.PodSpec{ + Hostname: "kafka-0", + Subdomain: "kafka", + NodeName: "local-dev-worker", + Containers: []v1.Container{ + { + Name: "nginx", + Ports: []v1.ContainerPort{ + {Name: "web", ContainerPort: 80, Protocol: v1.ProtocolTCP}, + }, + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.244.1.2", + PodIPs: []v1.PodIP{{IP: "10.244.1.2"}}, + HostIP: "172.18.0.2", + HostIPs: []v1.HostIP{{IP: "172.18.0.2"}}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-1", + Namespace: "default", + Labels: map[string]string{ + "app": "kafka", + appsv1.PodIndexLabel: "1", + appsv1.ControllerRevisionHashLabelKey: "kafka-b8d79cdb6", + appsv1.StatefulSetPodNameLabel: "kafka-1", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "kafka", + }, + }, + }, + Spec: v1.PodSpec{ + Hostname: "kafka-1", + Subdomain: "kafka", + NodeName: "local-dev-worker", + Containers: []v1.Container{ + { + Name: "nginx", + Ports: []v1.ContainerPort{ + {Name: "web", ContainerPort: 80, Protocol: v1.ProtocolTCP}, + }, + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.244.1.3", + PodIPs: []v1.PodIP{{IP: "10.244.1.3"}}, + HostIP: "172.18.0.2", + HostIPs: []v1.HostIP{{IP: "172.18.0.2"}}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-2", + Namespace: "default", + Labels: map[string]string{ + "app": "kafka", + appsv1.PodIndexLabel: "2", + appsv1.ControllerRevisionHashLabelKey: "kafka-b8d79cdb6", + appsv1.StatefulSetPodNameLabel: "kafka-2", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "kafka", + }, + }, + }, + Spec: v1.PodSpec{ + Hostname: "kafka-2", + Subdomain: "kafka", + NodeName: "local-dev-worker", + Containers: []v1.Container{ + { + Name: "nginx", + Ports: []v1.ContainerPort{ + {Name: "web", ContainerPort: 80, Protocol: v1.ProtocolTCP}, + }, + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.244.1.4", + PodIPs: []v1.PodIP{{IP: "10.244.1.4"}}, + HostIP: "172.18.0.2", + HostIPs: []v1.HostIP{{IP: "172.18.0.2"}}, + }, + }, + } + assert.Len(t, pods, 3) + + endpoints := []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-xhrc9", + Namespace: "default", + Labels: map[string]string{ + "app": "kafka", + discoveryv1.LabelServiceName: "kafka", + discoveryv1.LabelManagedBy: "endpointslice-controller.k8s.io", + v1.IsHeadlessService: "", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.244.1.2"}, + Hostname: testutils.ToPtr("kafka-0"), + NodeName: testutils.ToPtr("local-dev-worker"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "kafka-0", + Namespace: "default", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: testutils.ToPtr(true), + Serving: testutils.ToPtr(true), + Terminating: testutils.ToPtr(false), + }, + }, + { + Addresses: []string{"10.244.1.3"}, + Hostname: testutils.ToPtr("kafka-1"), + NodeName: testutils.ToPtr("local-dev-worker"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "kafka-1", + Namespace: "default", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: testutils.ToPtr(true), + Serving: testutils.ToPtr(true), + Terminating: testutils.ToPtr(false), + }, + }, + { + Addresses: []string{"10.244.1.4"}, + Hostname: testutils.ToPtr("kafka-2"), + NodeName: testutils.ToPtr("local-dev-worker"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "kafka-2", + Namespace: "default", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: testutils.ToPtr(true), + Serving: testutils.ToPtr(true), + Terminating: testutils.ToPtr(false), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka-2-svwsg", + Namespace: "default", + Labels: map[string]string{ + "app": "kafka", + discoveryv1.LabelServiceName: "kafka-2", + discoveryv1.LabelManagedBy: "endpointslice-controller.k8s.io", + v1.IsHeadlessService: "", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.244.1.2"}, + Hostname: testutils.ToPtr("kafka-0"), + NodeName: testutils.ToPtr("local-dev-worker"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "kafka-0", + Namespace: "default", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: testutils.ToPtr(true), + Serving: testutils.ToPtr(true), + Terminating: testutils.ToPtr(false), + }, + }, + { + Addresses: []string{"10.244.1.3"}, + Hostname: testutils.ToPtr("kafka-1"), + NodeName: testutils.ToPtr("local-dev-worker"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "kafka-1", + Namespace: "default", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: testutils.ToPtr(true), + Serving: testutils.ToPtr(true), + Terminating: testutils.ToPtr(false), + }, + }, + { + Addresses: []string{"10.244.1.4"}, + Hostname: testutils.ToPtr("kafka-2"), + NodeName: testutils.ToPtr("local-dev-worker"), + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "kafka-2", + Namespace: "default", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: testutils.ToPtr(true), + Serving: testutils.ToPtr(true), + Terminating: testutils.ToPtr(false), + }, + }, + }, + }, + } + + for _, svc := range headless { + _, err := kubernetes.CoreV1().Services(svc.Namespace).Create(context.Background(), svc, metav1.CreateOptions{}) + require.NoError(t, err) + } + + for _, pod := range pods { + _, err := kubernetes.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{}) + require.NoError(t, err) + } + + for _, ep := range endpoints { + _, err := kubernetes.DiscoveryV1().EndpointSlices(ep.Namespace).Create(context.Background(), ep, metav1.CreateOptions{}) + require.NoError(t, err) + } + + src, err := NewServiceSource( + t.Context(), + kubernetes, + v1.NamespaceAll, + "", + "", + false, + "", + false, + false, + false, + []string{}, + false, + labels.Everything(), + false, + false, + false, + ) + require.NoError(t, err) + assert.NotNil(t, src) + + got, err := src.Endpoints(context.Background()) + require.NoError(t, err) + + want := []*endpoint.Endpoint{ + // TODO: root domain records should not be created. Address them in a follow-up PR. + {DNSName: "example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.244.1.2", "10.244.1.3", "10.244.1.4"}}, + {DNSName: "kafka-0.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.244.1.2"}}, + {DNSName: "kafka-1.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.244.1.3"}}, + {DNSName: "kafka-2.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.244.1.4"}}, + } + + validateEndpoints(t, got, want) +} + // TestHeadlessServices tests that headless services generate the correct endpoints. func TestHeadlessServicesHostIP(t *testing.T) { t.Parallel()