feat(events): publish k8s events

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
ivan katliarchuk 2025-07-21 09:20:27 +01:00
parent ed6fd2fb03
commit bf31828542
No known key found for this signature in database
GPG Key ID: 601CDBBBB76E47BE
7 changed files with 70 additions and 55 deletions

View File

@ -121,17 +121,7 @@ func Execute() {
os.Exit(0)
}
eventsController, err := events.NewEventController(events.NewConfig(
events.WithKubeConfig(cfg.KubeConfig, cfg.APIServerURL, cfg.RequestTimeout),
events.WithEmitEvents(cfg.EmitEvents),
events.WithDryRun(cfg.DryRun),
))
if err != nil {
log.Fatal(err)
}
eventsController.Run(ctx)
ctrl, err := buildController(cfg, endpointsSource, prvdr, domainFilter, eventsController)
ctrl, err := buildController(ctx, cfg, endpointsSource, prvdr, domainFilter)
if err != nil {
log.Fatal(err)
}
@ -357,11 +347,11 @@ func buildProvider(
}
func buildController(
ctx context.Context,
cfg *externaldns.Config,
src source.Source,
p provider.Provider,
filter *endpoint.DomainFilter,
eventCtrl events.EventEmitter,
) (*Controller, error) {
policy, ok := plan.Policies[cfg.Policy]
if !ok {
@ -371,6 +361,19 @@ func buildController(
if err != nil {
return nil, err
}
eventsCfg := events.NewConfig(
events.WithKubeConfig(cfg.KubeConfig, cfg.APIServerURL, cfg.RequestTimeout),
events.WithEmitEvents(cfg.EmitEvents),
events.WithDryRun(cfg.DryRun))
var eventsCtrl *events.Controller
if eventsCfg.IsEnabled() {
eventsCtrl, err = events.NewEventController(eventsCfg)
if err != nil {
log.Fatal(err)
}
eventsCtrl.Run(ctx)
}
return &Controller{
Source: src,
Registry: reg,
@ -380,7 +383,7 @@ func buildController(
ManagedRecordTypes: cfg.ManagedDNSRecordTypes,
ExcludeRecordTypes: cfg.ExcludeDNSRecordTypes,
MinEventSyncInterval: cfg.MinEventSyncInterval,
EventController: eventCtrl,
EventController: eventsCtrl,
}, nil
}

View File

@ -27,7 +27,6 @@ import (
"time"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
@ -72,7 +71,10 @@ users:
err = os.WriteFile(mockKubeCfgPath, []byte(fmt.Sprintf(kubeCfgTemplate, svr.URL)), os.FileMode(0755))
require.NoError(t, err)
cfg := NewConfig(WithKubeConfig(mockKubeCfgPath, svr.URL, 0))
cfg := NewConfig(
WithKubeConfig(mockKubeCfgPath, svr.URL, 0),
WithEmitEvents([]string{string(RecordReady)}),
)
ctrl, err := NewEventController(cfg)
require.NoError(t, err)
require.NotNil(t, ctrl)
@ -86,19 +88,9 @@ func TestController_Run_NoEmitEvents(t *testing.T) {
emitEvents: sets.New[Reason](),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO: move internal/testutils/log.go to internal/testutils/log/log.go package to prevent circular dependency issues
logger, hook := test.NewNullLogger()
log.AddHook(hook)
log.SetOutput(logger.Out)
log.SetLevel(log.DebugLevel)
ctrl.Run(ctx)
require.Contains(t, hook.LastEntry().Message, "--emit-events is not defined")
hook.Reset()
require.NotPanics(t, func() {
ctrl.Run(t.Context())
})
}
func TestController_Run_EmitEvents(t *testing.T) {

View File

@ -61,7 +61,7 @@ func NewEventController(cfg *Config) (*Controller, error) {
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{Name: controllerName},
)
// TODO: to externalize this as simlar to source.GetRestConfig
// TODO: to externalize this as similar to source.GetRestConfig
rConfig, err := GetRestConfig(cfg.kubeConfig, cfg.apiServerURL)
if err != nil {
return nil, err
@ -89,7 +89,6 @@ func NewEventController(cfg *Config) (*Controller, error) {
func (ec *Controller) Run(ctx context.Context) {
if len(ec.emitEvents) == 0 {
log.Debug("--emit-events is not defined, will not emit any events")
return
}
go ec.run(ctx)

View File

@ -24,7 +24,7 @@ import (
"time"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apiv1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -41,8 +41,8 @@ const (
RecordDeleted Reason = "RecordDeleted"
RecordError Reason = "RecordError"
EventTypeNormal EventType = EventType(corev1.EventTypeNormal)
EventTypeWarning EventType = EventType(corev1.EventTypeWarning)
EventTypeNormal EventType = EventType(apiv1.EventTypeNormal)
EventTypeWarning EventType = EventType(apiv1.EventTypeWarning)
)
var (
@ -155,7 +155,7 @@ func (e *Event) event() *eventsv1.Event {
Type: string(e.eType),
}
if e.ref.UID != "" {
ref := e.ref.v1ObjectRef()
ref := e.ref.objectRef()
event.Related = ref
event.Regarding = *ref
}
@ -220,8 +220,12 @@ func NewConfig(opts ...ConfigOption) *Config {
return c
}
func (r *ObjectReference) v1ObjectRef() *corev1.ObjectReference {
return &corev1.ObjectReference{
func (c *Config) IsEnabled() bool {
return len(c.emitEvents) > 0
}
func (r *ObjectReference) objectRef() *apiv1.ObjectReference {
return &apiv1.ObjectReference{
Kind: r.Kind,
Namespace: r.Namespace,
Name: r.Name,

View File

@ -22,7 +22,7 @@ import (
"time"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
apiv1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
@ -92,34 +92,34 @@ func TestEvent_NewEvents(t *testing.T) {
},
{
name: "event without uuid",
event: NewEvent(NewObjectReference(&v1.Pod{
event: NewEvent(NewObjectReference(&apiv1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
APIVersion: "apiv1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "fake-pod",
Namespace: v1.NamespaceDefault,
Namespace: apiv1.NamespaceDefault,
},
}, "fake"), "", ActionCreate, RecordReady),
asserts: func(e *eventsv1.Event) {
require.NotNil(t, e)
require.Contains(t, e.Name, "fake-pod.")
require.Equal(t, v1.NamespaceDefault, e.Namespace)
require.Equal(t, apiv1.NamespaceDefault, e.Namespace)
require.Nil(t, e.Related)
require.Equal(t, v1.ObjectReference{}, e.Regarding)
require.Equal(t, apiv1.ObjectReference{}, e.Regarding)
},
},
{
name: "event with uuid",
event: NewEvent(NewObjectReference(&v1.Pod{
event: NewEvent(NewObjectReference(&apiv1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
APIVersion: "apiv1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "fake-pod",
Namespace: v1.NamespaceDefault,
Namespace: apiv1.NamespaceDefault,
UID: "9de3fc19-8aeb-4e76-865d-ada955403103",
},
}, "fake"), "", ActionCreate, RecordReady),
@ -152,9 +152,9 @@ func TestEvent_Transpose(t *testing.T) {
require.Equal(t, string(ActionCreate), event.Action)
require.Equal(t, string(RecordReady), event.Reason)
require.Equal(t, "test message", event.Note)
require.Equal(t, "Normal", event.Type)
require.Contains(t, event.ReportingInstance, "")
// require.Equal(t, controllerName, event.ReportingController)
require.Equal(t, apiv1.EventTypeNormal, event.Type)
require.Equal(t, controllerName, event.ReportingController)
require.Contains(t, event.ReportingInstance, controllerName+"/source/")
longMsg := strings.Repeat("a", 2000)
ev.message = longMsg
@ -170,26 +170,43 @@ func TestWithEmitEvents(t *testing.T) {
name string
input []string
expected sets.Set[Reason]
assert func(c *Config)
}{
{
name: "valid events",
input: []string{string(RecordReady), string(RecordError)},
expected: sets.New[Reason](RecordReady, RecordError),
assert: func(c *Config) {
require.Equal(t, sets.New[Reason](RecordReady, RecordError), c.emitEvents)
require.True(t, c.IsEnabled())
},
},
{
name: "invalid event",
input: []string{"InvalidEvent"},
expected: sets.New[Reason](),
assert: func(c *Config) {
require.Equal(t, sets.New[Reason](), c.emitEvents)
require.False(t, c.IsEnabled())
},
},
{
name: "mixed valid and invalid",
input: []string{string(RecordReady), "InvalidEvent"},
expected: sets.New[Reason](RecordReady),
assert: func(c *Config) {
require.Equal(t, sets.New[Reason](RecordReady), c.emitEvents)
require.True(t, c.IsEnabled())
},
},
{
name: "empty input",
input: []string{},
expected: nil,
assert: func(c *Config) {
require.NotNil(t, c)
require.False(t, c.IsEnabled())
},
},
}
@ -198,7 +215,7 @@ func TestWithEmitEvents(t *testing.T) {
cfg := &Config{}
opt := WithEmitEvents(tt.input)
opt(cfg)
require.Equal(t, tt.expected, cfg.emitEvents)
tt.assert(cfg)
})
}
}

View File

@ -102,7 +102,7 @@ type GoogleProvider struct {
provider.BaseProvider
// The Google project to work in
project string
// Enabled dry-run will print any modifying actions rather than execute them.
// IsEnabled dry-run will print any modifying actions rather than execute them.
dryRun bool
// Max batch size to submit to Google Cloud DNS per transaction.
batchChangeSize int

View File

@ -830,27 +830,27 @@ func (suite *NewPDNSProviderTestSuite) TestPDNSProviderCreateTLS() {
ClientCertKeyFilePath: "../../internal/testresources/client-cert-key.pem",
}), "Disabled TLS Config with additional flags should raise no error")
suite.NoError(newProvider(TLSConfig{}), "Enabled TLS Config without --tls-ca should raise no error")
suite.NoError(newProvider(TLSConfig{}), "IsEnabled TLS Config without --tls-ca should raise no error")
suite.NoError(newProvider(TLSConfig{
CAFilePath: "../../internal/testresources/ca.pem",
}), "Enabled TLS Config with --tls-ca should raise no error")
}), "IsEnabled TLS Config with --tls-ca should raise no error")
suite.Error(newProvider(TLSConfig{
CAFilePath: "../../internal/testresources/ca.pem",
ClientCertFilePath: "../../internal/testresources/client-cert.pem",
}), "Enabled TLS Config with --tls-client-cert only should raise an error")
}), "IsEnabled TLS Config with --tls-client-cert only should raise an error")
suite.Error(newProvider(TLSConfig{
CAFilePath: "../../internal/testresources/ca.pem",
ClientCertKeyFilePath: "../../internal/testresources/client-cert-key.pem",
}), "Enabled TLS Config with --tls-client-cert-key only should raise an error")
}), "IsEnabled TLS Config with --tls-client-cert-key only should raise an error")
suite.NoError(newProvider(TLSConfig{
CAFilePath: "../../internal/testresources/ca.pem",
ClientCertFilePath: "../../internal/testresources/client-cert.pem",
ClientCertKeyFilePath: "../../internal/testresources/client-cert-key.pem",
}), "Enabled TLS Config with all flags should raise no error")
}), "IsEnabled TLS Config with all flags should raise no error")
}
func (suite *NewPDNSProviderTestSuite) TestPDNSRRSetToEndpoints() {