feat(source): support --event flags with sources pod and node (#5642)

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
Ivan Ka 2025-07-16 07:50:23 +01:00 committed by GitHub
parent 1956c7e2df
commit 6e1651a21c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 80 additions and 23 deletions

View File

@ -27,7 +27,6 @@ import (
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations"
@ -69,13 +68,7 @@ func NewNodeSource(
nodeInformer := informerFactory.Core().V1().Nodes()
// Add default resource event handler to properly initialize informer.
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("node added")
},
},
)
_, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
informerFactory.Start(ctx.Done())
@ -172,7 +165,8 @@ func (ns *nodeSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error)
return endpointsSlice, nil
}
func (ns *nodeSource) AddEventHandler(_ context.Context, _ func()) {
func (ns *nodeSource) AddEventHandler(_ context.Context, handler func()) {
_, _ = ns.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
// nodeAddress returns the node's externalIP and if that's not found, the node's internalIP

View File

@ -26,7 +26,10 @@ import (
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/mock"
"k8s.io/client-go/kubernetes"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/internal/testutils"
@ -619,6 +622,39 @@ func TestResourceLabelIsSetForEachNodeEndpoint(t *testing.T) {
}
}
func TestNodeSource_AddEventHandler(t *testing.T) {
fakeInformer := new(fakeNodeInformer)
inf := testInformer{}
fakeInformer.On("Informer").Return(&inf)
nSource := &nodeSource{
nodeInformer: fakeInformer,
}
handlerCalled := false
handler := func() { handlerCalled = true }
nSource.AddEventHandler(t.Context(), handler)
fakeInformer.AssertNumberOfCalls(t, "Informer", 1)
assert.False(t, handlerCalled)
assert.Equal(t, 1, inf.times)
}
type fakeNodeInformer struct {
mock.Mock
informer cache.SharedIndexInformer
}
func (f *fakeNodeInformer) Informer() cache.SharedIndexInformer {
args := f.Called()
return args.Get(0).(cache.SharedIndexInformer)
}
func (f *fakeNodeInformer) Lister() corev1lister.NodeLister {
return corev1lister.NewNodeLister(f.Informer().GetIndexer())
}
type nodeListBuilder struct {
nodes []v1.Node
}

View File

@ -29,7 +29,6 @@ import (
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/annotations"
@ -76,18 +75,8 @@ func NewPodSource(
return nil, fmt.Errorf("failed to add indexers to pod informer: %w", err)
}
_, _ = podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
_, _ = nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
_, _ = podInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
_, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
informerFactory.Start(ctx.Done())
@ -114,7 +103,8 @@ func NewPodSource(
}, nil
}
func (*podSource) AddEventHandler(_ context.Context, _ func()) {
func (ps *podSource) AddEventHandler(_ context.Context, handler func()) {
_, _ = ps.podInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
func (ps *podSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {

View File

@ -23,9 +23,13 @@ import (
"testing"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
@ -909,6 +913,39 @@ func TestPodSourceLogs(t *testing.T) {
}
}
func TestPodSource_AddEventHandler(t *testing.T) {
fakeInformer := new(fakePodInformer)
inf := testInformer{}
fakeInformer.On("Informer").Return(&inf)
pSource := &podSource{
podInformer: fakeInformer,
}
handlerCalled := false
handler := func() { handlerCalled = true }
pSource.AddEventHandler(t.Context(), handler)
fakeInformer.AssertNumberOfCalls(t, "Informer", 1)
assert.False(t, handlerCalled)
assert.Equal(t, 1, inf.times)
}
type fakePodInformer struct {
mock.Mock
informer cache.SharedIndexInformer
}
func (f *fakePodInformer) Informer() cache.SharedIndexInformer {
args := f.Called()
return args.Get(0).(cache.SharedIndexInformer)
}
func (f *fakePodInformer) Lister() corev1lister.PodLister {
return corev1lister.NewPodLister(f.Informer().GetIndexer())
}
func nodesFixturesIPv6() []*corev1.Node {
return []*corev1.Node{
{