fix(source/service): make sure only unique targets pushed to registry (#5614)

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
Ivan Ka 2025-07-08 07:05:27 +01:00 committed by GitHub
parent 252a5e016c
commit 9045e45bc3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 457 additions and 27 deletions

View File

@ -19,6 +19,7 @@ package endpoint
import (
"fmt"
"net/netip"
"slices"
"sort"
"strconv"
"strings"
@ -353,7 +354,21 @@ func (e *Endpoint) String() string {
return fmt.Sprintf("%s %d IN %s %s %s %s", e.DNSName, e.RecordTTL, e.RecordType, e.SetIdentifier, e.Targets, e.ProviderSpecific)
}
// Apply filter to slice of endpoints and return new filtered slice that includes
// UniqueOrderedTargets removes duplicate targets from the Endpoint and sorts them in lexicographical order.
func (e *Endpoint) UniqueOrderedTargets() {
result := make([]string, 0, len(e.Targets))
existing := make(map[string]bool)
for _, target := range e.Targets {
if _, ok := existing[target]; !ok {
result = append(result, target)
existing[target] = true
}
}
slices.Sort(result)
e.Targets = result
}
// FilterEndpointsByOwnerID Apply filter to slice of endpoints and return new filtered slice that includes
// only endpoints that match.
func FilterEndpointsByOwnerID(ownerID string, eps []*Endpoint) []*Endpoint {
filtered := []*Endpoint{}

View File

@ -925,3 +925,46 @@ func TestCheckEndpoint(t *testing.T) {
})
}
}
func TestEndpoint_UniqueOrderedTargets(t *testing.T) {
tests := []struct {
name string
targets []string
expected Targets
want bool
}{
{
name: "no duplicates",
targets: []string{"b.example.com", "a.example.com"},
expected: Targets{"a.example.com", "b.example.com"},
},
{
name: "with duplicates",
targets: []string{"a.example.com", "b.example.com", "a.example.com"},
expected: Targets{"a.example.com", "b.example.com"},
},
{
name: "already sorted",
targets: []string{"a.example.com", "b.example.com"},
expected: Targets{"a.example.com", "b.example.com"},
},
{
name: "all duplicates",
targets: []string{"a.example.com", "a.example.com", "a.example.com"},
expected: Targets{"a.example.com"},
},
{
name: "empty",
targets: []string{},
expected: Targets{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ep := &Endpoint{Targets: tt.targets}
ep.UniqueOrderedTargets()
assert.Equal(t, tt.expected, ep.Targets)
})
}
}

View File

@ -251,6 +251,29 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err
sort.Slice(endpoints, func(i, j int) bool {
return endpoints[i].Labels[endpoint.ResourceLabelKey] < endpoints[j].Labels[endpoint.ResourceLabelKey]
})
mergedEndpoints := make(map[endpoint.EndpointKey][]*endpoint.Endpoint)
for _, ep := range endpoints {
key := ep.Key()
if existing, ok := mergedEndpoints[key]; ok {
if existing[0].RecordType == endpoint.RecordTypeCNAME {
log.Debugf("CNAME %s with multiple targets found", ep.DNSName)
mergedEndpoints[key] = append(existing, ep)
continue
}
existing[0].Targets = append(existing[0].Targets, ep.Targets...)
existing[0].UniqueOrderedTargets()
mergedEndpoints[key] = existing
} else {
ep.UniqueOrderedTargets()
mergedEndpoints[key] = []*endpoint.Endpoint{ep}
}
}
processed := make([]*endpoint.Endpoint, 0, len(mergedEndpoints))
for _, ep := range mergedEndpoints {
processed = append(processed, ep...)
}
endpoints = processed
// Use stable sort to not disrupt the order of services
sort.SliceStable(endpoints, func(i, j int) bool {
if endpoints[i].DNSName != endpoints[j].DNSName {
@ -258,31 +281,6 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err
}
return endpoints[i].RecordType < endpoints[j].RecordType
})
mergedEndpoints := []*endpoint.Endpoint{}
mergedEndpoints = append(mergedEndpoints, endpoints[0])
for i := 1; i < len(endpoints); i++ {
lastMergedEndpoint := len(mergedEndpoints) - 1
if mergedEndpoints[lastMergedEndpoint].DNSName == endpoints[i].DNSName &&
mergedEndpoints[lastMergedEndpoint].RecordType == endpoints[i].RecordType &&
mergedEndpoints[lastMergedEndpoint].RecordType != endpoint.RecordTypeCNAME && // It is against RFC-1034 for CNAME records to have multiple targets, so skip merging
mergedEndpoints[lastMergedEndpoint].SetIdentifier == endpoints[i].SetIdentifier &&
mergedEndpoints[lastMergedEndpoint].RecordTTL == endpoints[i].RecordTTL {
mergedEndpoints[lastMergedEndpoint].Targets = append(mergedEndpoints[lastMergedEndpoint].Targets, endpoints[i].Targets[0])
} else {
mergedEndpoints = append(mergedEndpoints, endpoints[i])
}
if mergedEndpoints[lastMergedEndpoint].DNSName == endpoints[i].DNSName &&
mergedEndpoints[lastMergedEndpoint].RecordType == endpoints[i].RecordType &&
mergedEndpoints[lastMergedEndpoint].RecordType == endpoint.RecordTypeCNAME {
log.Debugf("CNAME %s with multiple targets found", endpoints[i].DNSName)
}
}
endpoints = mergedEndpoints
}
for _, ep := range endpoints {
sort.Sort(ep.Targets)
}
return endpoints, nil

View File

@ -30,10 +30,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/endpoint"
@ -3089,7 +3091,7 @@ func TestHeadlessServices(t *testing.T) {
t.Parallel()
// Create a Kubernetes testing client
kubernetes := fake.NewSimpleClientset()
kubernetes := fake.NewClientset()
service := &v1.Service{
Spec: v1.ServiceSpec{
@ -3196,6 +3198,378 @@ func TestHeadlessServices(t *testing.T) {
}
}
func TestMultipleHeadlessServicesPointingToPodsOnTheSameNode(t *testing.T) {
kubernetes := fake.NewClientset()
headless := []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "default",
Labels: map[string]string{
"app": "kafka",
},
Annotations: map[string]string{
annotations.HostnameKey: "example.org",
},
},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: v1.ClusterIPNone,
ClusterIPs: []string{v1.ClusterIPNone},
InternalTrafficPolicy: testutils.ToPtr(v1.ServiceInternalTrafficPolicyCluster),
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
IPFamilyPolicy: testutils.ToPtr(v1.IPFamilyPolicySingleStack),
Ports: []v1.ServicePort{
{
Name: "web",
Port: 80,
Protocol: v1.ProtocolTCP,
TargetPort: intstr.FromInt32(80),
},
},
Selector: map[string]string{
"app": "kafka",
},
SessionAffinity: v1.ServiceAffinityNone,
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-2",
Namespace: "default",
Labels: map[string]string{
"app": "kafka",
},
Annotations: map[string]string{
annotations.HostnameKey: "example.org",
},
},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: v1.ClusterIPNone,
ClusterIPs: []string{v1.ClusterIPNone},
InternalTrafficPolicy: testutils.ToPtr(v1.ServiceInternalTrafficPolicyCluster),
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
IPFamilyPolicy: testutils.ToPtr(v1.IPFamilyPolicySingleStack),
Ports: []v1.ServicePort{
{
Name: "web",
Port: 80,
Protocol: v1.ProtocolTCP,
TargetPort: intstr.FromInt32(80),
},
},
Selector: map[string]string{
"app": "kafka",
},
SessionAffinity: v1.ServiceAffinityNone,
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{},
},
},
}
assert.NotNil(t, headless)
pods := []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-0",
Namespace: "default",
Labels: map[string]string{
"app": "kafka",
appsv1.PodIndexLabel: "0",
appsv1.ControllerRevisionHashLabelKey: "kafka-b8d79cdb6",
appsv1.StatefulSetPodNameLabel: "kafka-0",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: "kafka",
},
},
},
Spec: v1.PodSpec{
Hostname: "kafka-0",
Subdomain: "kafka",
NodeName: "local-dev-worker",
Containers: []v1.Container{
{
Name: "nginx",
Ports: []v1.ContainerPort{
{Name: "web", ContainerPort: 80, Protocol: v1.ProtocolTCP},
},
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "10.244.1.2",
PodIPs: []v1.PodIP{{IP: "10.244.1.2"}},
HostIP: "172.18.0.2",
HostIPs: []v1.HostIP{{IP: "172.18.0.2"}},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-1",
Namespace: "default",
Labels: map[string]string{
"app": "kafka",
appsv1.PodIndexLabel: "1",
appsv1.ControllerRevisionHashLabelKey: "kafka-b8d79cdb6",
appsv1.StatefulSetPodNameLabel: "kafka-1",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: "kafka",
},
},
},
Spec: v1.PodSpec{
Hostname: "kafka-1",
Subdomain: "kafka",
NodeName: "local-dev-worker",
Containers: []v1.Container{
{
Name: "nginx",
Ports: []v1.ContainerPort{
{Name: "web", ContainerPort: 80, Protocol: v1.ProtocolTCP},
},
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "10.244.1.3",
PodIPs: []v1.PodIP{{IP: "10.244.1.3"}},
HostIP: "172.18.0.2",
HostIPs: []v1.HostIP{{IP: "172.18.0.2"}},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-2",
Namespace: "default",
Labels: map[string]string{
"app": "kafka",
appsv1.PodIndexLabel: "2",
appsv1.ControllerRevisionHashLabelKey: "kafka-b8d79cdb6",
appsv1.StatefulSetPodNameLabel: "kafka-2",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: "kafka",
},
},
},
Spec: v1.PodSpec{
Hostname: "kafka-2",
Subdomain: "kafka",
NodeName: "local-dev-worker",
Containers: []v1.Container{
{
Name: "nginx",
Ports: []v1.ContainerPort{
{Name: "web", ContainerPort: 80, Protocol: v1.ProtocolTCP},
},
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "10.244.1.4",
PodIPs: []v1.PodIP{{IP: "10.244.1.4"}},
HostIP: "172.18.0.2",
HostIPs: []v1.HostIP{{IP: "172.18.0.2"}},
},
},
}
assert.Len(t, pods, 3)
endpoints := []*discoveryv1.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-xhrc9",
Namespace: "default",
Labels: map[string]string{
"app": "kafka",
discoveryv1.LabelServiceName: "kafka",
discoveryv1.LabelManagedBy: "endpointslice-controller.k8s.io",
v1.IsHeadlessService: "",
},
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []string{"10.244.1.2"},
Hostname: testutils.ToPtr("kafka-0"),
NodeName: testutils.ToPtr("local-dev-worker"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "kafka-0",
Namespace: "default",
},
Conditions: discoveryv1.EndpointConditions{
Ready: testutils.ToPtr(true),
Serving: testutils.ToPtr(true),
Terminating: testutils.ToPtr(false),
},
},
{
Addresses: []string{"10.244.1.3"},
Hostname: testutils.ToPtr("kafka-1"),
NodeName: testutils.ToPtr("local-dev-worker"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "kafka-1",
Namespace: "default",
},
Conditions: discoveryv1.EndpointConditions{
Ready: testutils.ToPtr(true),
Serving: testutils.ToPtr(true),
Terminating: testutils.ToPtr(false),
},
},
{
Addresses: []string{"10.244.1.4"},
Hostname: testutils.ToPtr("kafka-2"),
NodeName: testutils.ToPtr("local-dev-worker"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "kafka-2",
Namespace: "default",
},
Conditions: discoveryv1.EndpointConditions{
Ready: testutils.ToPtr(true),
Serving: testutils.ToPtr(true),
Terminating: testutils.ToPtr(false),
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-2-svwsg",
Namespace: "default",
Labels: map[string]string{
"app": "kafka",
discoveryv1.LabelServiceName: "kafka-2",
discoveryv1.LabelManagedBy: "endpointslice-controller.k8s.io",
v1.IsHeadlessService: "",
},
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []string{"10.244.1.2"},
Hostname: testutils.ToPtr("kafka-0"),
NodeName: testutils.ToPtr("local-dev-worker"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "kafka-0",
Namespace: "default",
},
Conditions: discoveryv1.EndpointConditions{
Ready: testutils.ToPtr(true),
Serving: testutils.ToPtr(true),
Terminating: testutils.ToPtr(false),
},
},
{
Addresses: []string{"10.244.1.3"},
Hostname: testutils.ToPtr("kafka-1"),
NodeName: testutils.ToPtr("local-dev-worker"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "kafka-1",
Namespace: "default",
},
Conditions: discoveryv1.EndpointConditions{
Ready: testutils.ToPtr(true),
Serving: testutils.ToPtr(true),
Terminating: testutils.ToPtr(false),
},
},
{
Addresses: []string{"10.244.1.4"},
Hostname: testutils.ToPtr("kafka-2"),
NodeName: testutils.ToPtr("local-dev-worker"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "kafka-2",
Namespace: "default",
},
Conditions: discoveryv1.EndpointConditions{
Ready: testutils.ToPtr(true),
Serving: testutils.ToPtr(true),
Terminating: testutils.ToPtr(false),
},
},
},
},
}
for _, svc := range headless {
_, err := kubernetes.CoreV1().Services(svc.Namespace).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)
}
for _, pod := range pods {
_, err := kubernetes.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)
}
for _, ep := range endpoints {
_, err := kubernetes.DiscoveryV1().EndpointSlices(ep.Namespace).Create(context.Background(), ep, metav1.CreateOptions{})
require.NoError(t, err)
}
src, err := NewServiceSource(
t.Context(),
kubernetes,
v1.NamespaceAll,
"",
"",
false,
"",
false,
false,
false,
[]string{},
false,
labels.Everything(),
false,
false,
false,
)
require.NoError(t, err)
assert.NotNil(t, src)
got, err := src.Endpoints(context.Background())
require.NoError(t, err)
want := []*endpoint.Endpoint{
// TODO: root domain records should not be created. Address them in a follow-up PR.
{DNSName: "example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.244.1.2", "10.244.1.3", "10.244.1.4"}},
{DNSName: "kafka-0.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.244.1.2"}},
{DNSName: "kafka-1.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.244.1.3"}},
{DNSName: "kafka-2.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"10.244.1.4"}},
}
validateEndpoints(t, got, want)
}
// TestHeadlessServices tests that headless services generate the correct endpoints.
func TestHeadlessServicesHostIP(t *testing.T) {
t.Parallel()