mirror of
https://github.com/kubernetes-sigs/external-dns.git
synced 2025-08-07 01:56:57 +02:00
Improve MinEventInterval compliance with docs
**Description** In the command line arguments, we see `min-event-sync-interval` as "The minimum interval between two consecutive synchronizations triggered from kubernetes events" In the code, it actually acts a different way. It imposes a certain dealy between syncs. While this is compliant with the "minimum delay between 2 consecutive synchronizations", it has side-effects in case of large delays. In particular, when trying to fine-tune external-dns to match the provider rate-limits. In this case, it may be interesting to restrict the rate of reconciling actions happening by having a high `min-event-sync-interval`, while keeping a low latency for initial events. This would allow to maximise the bulk effect of high change rate while keeping fast enough reaction for isolated changes. **Checklist** - [X] Unit tests updated - [X] End user documentation updated > End user documentation matches the updated behaviour with more > accuracy Change-Id: Ibcea707974a095a2d5861a3974b4c79e5a15b00e
This commit is contained in:
parent
3b35ea6ee7
commit
2007a49f1f
@ -189,8 +189,10 @@ type Controller struct {
|
|||||||
DomainFilter endpoint.DomainFilter
|
DomainFilter endpoint.DomainFilter
|
||||||
// The nextRunAt used for throttling and batching reconciliation
|
// The nextRunAt used for throttling and batching reconciliation
|
||||||
nextRunAt time.Time
|
nextRunAt time.Time
|
||||||
// The nextRunAtMux is for atomic updating of nextRunAt
|
// The runAtMutex is for atomic updating of nextRunAt and lastRunAt
|
||||||
nextRunAtMux sync.Mutex
|
runAtMutex sync.Mutex
|
||||||
|
// The lastRunAt used for throttling and batching reconciliation
|
||||||
|
lastRunAt time.Time
|
||||||
// MangedRecordTypes are DNS record types that will be considered for management.
|
// MangedRecordTypes are DNS record types that will be considered for management.
|
||||||
ManagedRecordTypes []string
|
ManagedRecordTypes []string
|
||||||
// ExcludeRecordTypes are DNS record types that will be excluded from management.
|
// ExcludeRecordTypes are DNS record types that will be excluded from management.
|
||||||
@ -203,6 +205,10 @@ type Controller struct {
|
|||||||
func (c *Controller) RunOnce(ctx context.Context) error {
|
func (c *Controller) RunOnce(ctx context.Context) error {
|
||||||
lastReconcileTimestamp.SetToCurrentTime()
|
lastReconcileTimestamp.SetToCurrentTime()
|
||||||
|
|
||||||
|
c.runAtMutex.Lock()
|
||||||
|
c.lastRunAt = time.Now()
|
||||||
|
c.runAtMutex.Unlock()
|
||||||
|
|
||||||
records, err := c.Registry.Records(ctx)
|
records, err := c.Registry.Records(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
registryErrorsTotal.Inc()
|
registryErrorsTotal.Inc()
|
||||||
@ -264,6 +270,24 @@ func (c *Controller) RunOnce(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func earliest(r time.Time, times ...time.Time) time.Time {
|
||||||
|
for _, t := range times {
|
||||||
|
if t.Before(r) {
|
||||||
|
r = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func latest(r time.Time, times ...time.Time) time.Time {
|
||||||
|
for _, t := range times {
|
||||||
|
if t.After(r) {
|
||||||
|
r = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
// Counts the intersections of A and AAAA records in endpoint and registry.
|
// Counts the intersections of A and AAAA records in endpoint and registry.
|
||||||
func countMatchingAddressRecords(endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint) (int, int) {
|
func countMatchingAddressRecords(endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint) (int, int) {
|
||||||
recordsMap := make(map[string]map[string]struct{})
|
recordsMap := make(map[string]map[string]struct{})
|
||||||
@ -306,18 +330,20 @@ func countAddressRecords(endpoints []*endpoint.Endpoint) (int, int) {
|
|||||||
|
|
||||||
// ScheduleRunOnce makes sure execution happens at most once per interval.
|
// ScheduleRunOnce makes sure execution happens at most once per interval.
|
||||||
func (c *Controller) ScheduleRunOnce(now time.Time) {
|
func (c *Controller) ScheduleRunOnce(now time.Time) {
|
||||||
c.nextRunAtMux.Lock()
|
c.runAtMutex.Lock()
|
||||||
defer c.nextRunAtMux.Unlock()
|
defer c.runAtMutex.Unlock()
|
||||||
// schedule only if a reconciliation is not already planned
|
c.nextRunAt = latest(
|
||||||
// to happen in the following c.MinEventSyncInterval
|
c.lastRunAt.Add(c.MinEventSyncInterval),
|
||||||
if !c.nextRunAt.Before(now.Add(c.MinEventSyncInterval)) {
|
earliest(
|
||||||
c.nextRunAt = now.Add(c.MinEventSyncInterval)
|
now.Add(5*time.Second),
|
||||||
}
|
c.nextRunAt,
|
||||||
|
),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) ShouldRunOnce(now time.Time) bool {
|
func (c *Controller) ShouldRunOnce(now time.Time) bool {
|
||||||
c.nextRunAtMux.Lock()
|
c.runAtMutex.Lock()
|
||||||
defer c.nextRunAtMux.Unlock()
|
defer c.runAtMutex.Unlock()
|
||||||
if now.Before(c.nextRunAt) {
|
if now.Before(c.nextRunAt) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -278,15 +278,17 @@ func valueFromMetric(metric prometheus.Gauge) uint64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestShouldRunOnce(t *testing.T) {
|
func TestShouldRunOnce(t *testing.T) {
|
||||||
ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 5 * time.Second}
|
ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 15 * time.Second}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
// First run of Run loop should execute RunOnce
|
// First run of Run loop should execute RunOnce
|
||||||
assert.True(t, ctrl.ShouldRunOnce(now))
|
assert.True(t, ctrl.ShouldRunOnce(now))
|
||||||
|
assert.Equal(t, now.Add(10*time.Minute), ctrl.nextRunAt)
|
||||||
|
|
||||||
// Second run should not
|
// Second run should not
|
||||||
assert.False(t, ctrl.ShouldRunOnce(now))
|
assert.False(t, ctrl.ShouldRunOnce(now))
|
||||||
|
ctrl.lastRunAt = now
|
||||||
|
|
||||||
now = now.Add(10 * time.Second)
|
now = now.Add(10 * time.Second)
|
||||||
// Changes happen in ingresses or services
|
// Changes happen in ingresses or services
|
||||||
@ -316,12 +318,17 @@ func TestShouldRunOnce(t *testing.T) {
|
|||||||
assert.False(t, ctrl.ShouldRunOnce(now))
|
assert.False(t, ctrl.ShouldRunOnce(now))
|
||||||
|
|
||||||
// Multiple ingresses or services changes, closer than MinInterval from each other
|
// Multiple ingresses or services changes, closer than MinInterval from each other
|
||||||
|
ctrl.lastRunAt = now
|
||||||
firstChangeTime := now
|
firstChangeTime := now
|
||||||
secondChangeTime := firstChangeTime.Add(time.Second)
|
secondChangeTime := firstChangeTime.Add(time.Second)
|
||||||
// First change
|
// First change
|
||||||
ctrl.ScheduleRunOnce(firstChangeTime)
|
ctrl.ScheduleRunOnce(firstChangeTime)
|
||||||
// Second change
|
// Second change
|
||||||
ctrl.ScheduleRunOnce(secondChangeTime)
|
ctrl.ScheduleRunOnce(secondChangeTime)
|
||||||
|
|
||||||
|
// Executions should be spaced by at least MinEventSyncInterval
|
||||||
|
assert.False(t, ctrl.ShouldRunOnce(now.Add(5*time.Second)))
|
||||||
|
|
||||||
// Should not postpone the reconciliation further than firstChangeTime + MinInterval
|
// Should not postpone the reconciliation further than firstChangeTime + MinInterval
|
||||||
now = now.Add(ctrl.MinEventSyncInterval)
|
now = now.Add(ctrl.MinEventSyncInterval)
|
||||||
assert.True(t, ctrl.ShouldRunOnce(now))
|
assert.True(t, ctrl.ShouldRunOnce(now))
|
||||||
|
Loading…
Reference in New Issue
Block a user