chore(source): reorganise sources and wrappers (#5598)

* chore(source): reorganise sources and wrappers

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(source): reorganise sources and wrappers

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(source): reorganise sources and wrappers

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

---------

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
Ivan Ka 2025-07-03 10:55:26 +01:00 committed by GitHub
parent 2e50ddb72a
commit dfb64ae813
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 162 additions and 37 deletions

View File

@ -33,6 +33,8 @@ import (
log "github.com/sirupsen/logrus"
"k8s.io/klog/v2"
"sigs.k8s.io/external-dns/source/wrappers"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
"sigs.k8s.io/external-dns/pkg/apis/externaldns/validation"
@ -423,11 +425,11 @@ func buildSource(ctx context.Context, cfg *externaldns.Config) (source.Source, e
return nil, err
}
// Combine multiple sources into a single, deduplicated source.
combinedSource := source.NewDedupSource(source.NewMultiSource(sources, sourceCfg.DefaultTargets, sourceCfg.ForceDefaultTargets))
combinedSource := wrappers.NewDedupSource(wrappers.NewMultiSource(sources, sourceCfg.DefaultTargets, sourceCfg.ForceDefaultTargets))
// Filter targets
targetFilter := endpoint.NewTargetNetFilterWithExclusions(cfg.TargetNetFilter, cfg.ExcludeTargetNets)
combinedSource = source.NewNAT64Source(combinedSource, cfg.NAT64Networks)
combinedSource = source.NewTargetFilterSource(combinedSource, targetFilter)
combinedSource = wrappers.NewNAT64Source(combinedSource, cfg.NAT64Networks)
combinedSource = wrappers.NewTargetFilterSource(combinedSource, targetFilter)
return combinedSource, nil
}

View File

@ -81,6 +81,17 @@ func endpointsForHostname(hostname string, targets endpoint.Targets, ttl endpoin
return endpoints
}
func EndpointsForHostname(
hostname string,
targets endpoint.Targets,
ttl endpoint.TTL,
providerSpecific endpoint.ProviderSpecific,
setIdentifier string,
resource string,
) []*endpoint.Endpoint {
return endpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier, resource)
}
func EndpointTargetsFromServices(svcInformer coreinformers.ServiceInformer, namespace string, selector map[string]string) (endpoint.Targets, error) {
targets := endpoint.Targets{}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package source
package wrappers
import (
"context"
@ -22,16 +22,18 @@ import (
log "github.com/sirupsen/logrus"
"sigs.k8s.io/external-dns/source"
"sigs.k8s.io/external-dns/endpoint"
)
// dedupSource is a Source that removes duplicate endpoints from its wrapped source.
type dedupSource struct {
source Source
source source.Source
}
// NewDedupSource creates a new dedupSource wrapping the provided Source.
func NewDedupSource(source Source) Source {
func NewDedupSource(source source.Source) source.Source {
return &dedupSource{source: source}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package source
package wrappers
import (
"context"
@ -22,10 +22,11 @@ import (
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
"sigs.k8s.io/external-dns/source"
)
// Validates that dedupSource is a Source
var _ Source = &dedupSource{}
var _ source.Source = &dedupSource{}
func TestDedup(t *testing.T) {
t.Run("Endpoints", testDedupEndpoints)

View File

@ -14,20 +14,21 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package source
package wrappers
import (
"context"
"strings"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source"
log "github.com/sirupsen/logrus"
)
// multiSource is a Source that merges the endpoints of its nested Sources.
type multiSource struct {
children []Source
children []source.Source
defaultTargets []string
forceDefaultTargets bool
}
@ -48,20 +49,20 @@ func (ms *multiSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, err
continue
}
for i := range endpoints {
hasSourceTargets := len(endpoints[i].Targets) > 0
for _, ep := range endpoints {
hasSourceTargets := len(ep.Targets) > 0
if ms.forceDefaultTargets || !hasSourceTargets {
eps := endpointsForHostname(endpoints[i].DNSName, ms.defaultTargets, endpoints[i].RecordTTL, endpoints[i].ProviderSpecific, endpoints[i].SetIdentifier, "")
for _, ep := range eps {
ep.Labels = endpoints[i].Labels
eps := source.EndpointsForHostname(ep.DNSName, ms.defaultTargets, ep.RecordTTL, ep.ProviderSpecific, ep.SetIdentifier, "")
for _, e := range eps {
e.Labels = ep.Labels
}
result = append(result, eps...)
continue
}
log.Warnf("Source provided targets for %q (%s), ignoring default targets [%s] due to new behavior. Use --force-default-targets to revert to old behavior.", endpoints[i].DNSName, endpoints[i].RecordType, strings.Join(ms.defaultTargets, ", "))
result = append(result, endpoints[i])
log.Warnf("Source provided targets for %q (%s), ignoring default targets [%s] due to new behavior. Use --force-default-targets to revert to old behavior.", ep.DNSName, ep.RecordType, strings.Join(ms.defaultTargets, ", "))
result = append(result, ep)
}
}
@ -75,6 +76,6 @@ func (ms *multiSource) AddEventHandler(ctx context.Context, handler func()) {
}
// NewMultiSource creates a new multiSource.
func NewMultiSource(children []Source, defaultTargets []string, forceDefaultTargets bool) Source {
func NewMultiSource(children []source.Source, defaultTargets []string, forceDefaultTargets bool) source.Source {
return &multiSource{children: children, defaultTargets: defaultTargets, forceDefaultTargets: forceDefaultTargets}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package source
package wrappers
import (
"context"
@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sigs.k8s.io/external-dns/source"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
@ -39,7 +40,7 @@ func TestMultiSource(t *testing.T) {
// testMultiSourceImplementsSource tests that multiSource is a valid Source.
func testMultiSourceImplementsSource(t *testing.T) {
assert.Implements(t, (*Source)(nil), new(multiSource))
assert.Implements(t, (*source.Source)(nil), new(multiSource))
}
// testMultiSourceEndpoints tests merged endpoints from children are returned.
@ -78,7 +79,7 @@ func testMultiSourceEndpoints(t *testing.T) {
t.Parallel()
// Prepare the nested mock sources.
sources := make([]Source, 0, len(tc.nestedEndpoints))
sources := make([]source.Source, 0, len(tc.nestedEndpoints))
// Populate the nested mock sources.
for _, endpoints := range tc.nestedEndpoints {
@ -116,7 +117,7 @@ func testMultiSourceEndpointsWithError(t *testing.T) {
src.On("Endpoints").Return(nil, errSomeError)
// Create our object under test and get the endpoints.
source := NewMultiSource([]Source{src}, nil, false)
source := NewMultiSource([]source.Source{src}, nil, false)
// Get endpoints from our source.
_, err := source.Endpoints(context.Background())
@ -155,7 +156,7 @@ func testMultiSourceEndpointsDefaultTargets(t *testing.T) {
src.On("Endpoints").Return(sourceEndpoints, nil)
// Test with forceDefaultTargets=false (default behavior)
source := NewMultiSource([]Source{src}, defaultTargets, false)
source := NewMultiSource([]source.Source{src}, defaultTargets, false)
endpoints, err := source.Endpoints(context.Background())
require.NoError(t, err)
@ -185,7 +186,7 @@ func testMultiSourceEndpointsDefaultTargets(t *testing.T) {
src.On("Endpoints").Return(sourceEndpoints, nil)
// Test with forceDefaultTargets=false (default behavior)
source := NewMultiSource([]Source{src}, defaultTargets, false)
source := NewMultiSource([]source.Source{src}, defaultTargets, false)
endpoints, err := source.Endpoints(context.Background())
require.NoError(t, err)
@ -223,7 +224,7 @@ func testMultiSourceEndpointsDefaultTargets(t *testing.T) {
src.On("Endpoints").Return(sourceEndpoints, nil)
// Test with forceDefaultTargets=true (legacy behavior)
source := NewMultiSource([]Source{src}, defaultTargets, true)
source := NewMultiSource([]source.Source{src}, defaultTargets, true)
endpoints, err := source.Endpoints(context.Background())
require.NoError(t, err)
@ -258,7 +259,7 @@ func testMultiSourceEndpointsDefaultTargets(t *testing.T) {
src.On("Endpoints").Return(sourceEndpoints, nil)
// Test with forceDefaultTargets=true
source := NewMultiSource([]Source{src}, defaultTargets, true)
source := NewMultiSource([]source.Source{src}, defaultTargets, true)
endpoints, err := source.Endpoints(context.Background())
require.NoError(t, err)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package source
package wrappers
import (
"context"
@ -22,16 +22,17 @@ import (
"net/netip"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source"
)
// nat64Source is a Source that adds A endpoints for AAAA records including an NAT64 address.
type nat64Source struct {
source Source
source source.Source
nat64Prefixes []string
}
// NewNAT64Source creates a new nat64Source wrapping the provided Source.
func NewNAT64Source(source Source, nat64Prefixes []string) Source {
func NewNAT64Source(source source.Source, nat64Prefixes []string) source.Source {
return &nat64Source{source: source, nat64Prefixes: nat64Prefixes}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package source
package wrappers
import (
"context"
@ -22,10 +22,11 @@ import (
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
"sigs.k8s.io/external-dns/source"
)
// Validates that dedupSource is a Source
var _ Source = &nat64Source{}
var _ source.Source = &nat64Source{}
func TestNAT64Source(t *testing.T) {
t.Run("Endpoints", testNat64Source)

View File

@ -0,0 +1,102 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wrappers
import (
"reflect"
"sort"
"testing"
"sigs.k8s.io/external-dns/endpoint"
)
func sortEndpoints(endpoints []*endpoint.Endpoint) {
for _, ep := range endpoints {
sort.Strings([]string(ep.Targets))
}
sort.Slice(endpoints, func(i, k int) bool {
// Sort by DNSName, RecordType, and Targets
ei, ek := endpoints[i], endpoints[k]
if ei.DNSName != ek.DNSName {
return ei.DNSName < ek.DNSName
}
if ei.RecordType != ek.RecordType {
return ei.RecordType < ek.RecordType
}
// Targets are sorted ahead of time.
for j, ti := range ei.Targets {
if j >= len(ek.Targets) {
return true
}
if tk := ek.Targets[j]; ti != tk {
return ti < tk
}
}
return false
})
}
func validateEndpoints(t *testing.T, endpoints, expected []*endpoint.Endpoint) {
t.Helper()
if len(endpoints) != len(expected) {
t.Fatalf("expected %d endpoints, got %d", len(expected), len(endpoints))
}
// Make sure endpoints are sorted - validateEndpoint() depends on it.
sortEndpoints(endpoints)
sortEndpoints(expected)
for i := range endpoints {
validateEndpoint(t, endpoints[i], expected[i])
}
}
func validateEndpoint(t *testing.T, endpoint, expected *endpoint.Endpoint) {
t.Helper()
if endpoint.DNSName != expected.DNSName {
t.Errorf("DNSName expected %q, got %q", expected.DNSName, endpoint.DNSName)
}
if !endpoint.Targets.Same(expected.Targets) {
t.Errorf("Targets expected %q, got %q", expected.Targets, endpoint.Targets)
}
if endpoint.RecordTTL != expected.RecordTTL {
t.Errorf("RecordTTL expected %v, got %v", expected.RecordTTL, endpoint.RecordTTL)
}
// if a non-empty record type is expected, check that it matches.
if endpoint.RecordType != expected.RecordType {
t.Errorf("RecordType expected %q, got %q", expected.RecordType, endpoint.RecordType)
}
// if non-empty labels are expected, check that they match.
if expected.Labels != nil && !reflect.DeepEqual(endpoint.Labels, expected.Labels) {
t.Errorf("Labels expected %s, got %s", expected.Labels, endpoint.Labels)
}
if (len(expected.ProviderSpecific) != 0 || len(endpoint.ProviderSpecific) != 0) &&
!reflect.DeepEqual(endpoint.ProviderSpecific, expected.ProviderSpecific) {
t.Errorf("ProviderSpecific expected %s, got %s", expected.ProviderSpecific, endpoint.ProviderSpecific)
}
if endpoint.SetIdentifier != expected.SetIdentifier {
t.Errorf("SetIdentifier expected %q, got %q", expected.SetIdentifier, endpoint.SetIdentifier)
}
}

View File

@ -14,24 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package source
package wrappers
import (
"context"
log "github.com/sirupsen/logrus"
source2 "sigs.k8s.io/external-dns/source"
"sigs.k8s.io/external-dns/endpoint"
)
// targetFilterSource is a Source that removes endpoints matching the target filter from its wrapped source.
type targetFilterSource struct {
source Source
source source2.Source
targetFilter endpoint.TargetFilterInterface
}
// NewTargetFilterSource creates a new targetFilterSource wrapping the provided Source.
func NewTargetFilterSource(source Source, targetFilter endpoint.TargetFilterInterface) Source {
func NewTargetFilterSource(source source2.Source, targetFilter endpoint.TargetFilterInterface) source2.Source {
return &targetFilterSource{source: source, targetFilter: targetFilter}
}

View File

@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package source
package wrappers
import (
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"sigs.k8s.io/external-dns/source"
"sigs.k8s.io/external-dns/endpoint"
)
@ -55,7 +56,7 @@ func (e *echoSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
}
// NewEchoSource creates a new echoSource.
func NewEchoSource(endpoints []*endpoint.Endpoint) Source {
func NewEchoSource(endpoints []*endpoint.Endpoint) source.Source {
return &echoSource{endpoints: endpoints}
}
@ -90,7 +91,7 @@ func TestTargetFilterSource(t *testing.T) {
// TestTargetFilterSourceImplementsSource tests that targetFilterSource is a valid Source.
func TestTargetFilterSourceImplementsSource(t *testing.T) {
var _ Source = &targetFilterSource{}
var _ source.Source = &targetFilterSource{}
}
func TestTargetFilterSourceEndpoints(t *testing.T) {