From ba7eb733e8d5a7ddfd68f261f05f53c37cb4ffdb Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 5 Sep 2018 14:32:47 +0300 Subject: [PATCH] tidy up the discovery logs,updating loops and selects (#4556) * tidy up the discovery logs,updating loops and selects few objects renamings removed a very noise debug log on the k8s discovery. It would be usefull to show some summary rather than every update as this is impossible to follow. added most comments as debug logs so each block becomes self explanatory. when the discovery receiving channel is full will retry again on the next cycle. Signed-off-by: Krasi Georgiev * add noop logger for the SD manager tests. Signed-off-by: Krasi Georgiev * spelling nits Signed-off-by: Krasi Georgiev --- discovery/kubernetes/kubernetes.go | 1 - discovery/manager.go | 33 ++++++++++++++++-------------- discovery/manager_test.go | 7 ++++--- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 4ab431fc5a..2b2bb9ae77 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -383,7 +383,6 @@ func send(ctx context.Context, l log.Logger, role Role, ch chan<- []*targetgroup if tg == nil { return } - level.Debug(l).Log("msg", "kubernetes discovery update", "role", string(role), "tg", fmt.Sprintf("%#v", tg)) select { case <-ctx.Done(): case ch <- []*targetgroup.Group{tg}: diff --git a/discovery/manager.go b/discovery/manager.go index 11be4b4c37..bb4409fead 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -162,47 +162,50 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) { m.discoverCancel = append(m.discoverCancel, cancel) go p.d.Run(ctx, updates) - go m.runProvider(ctx, p, updates) + go m.updater(ctx, p, updates) } -func (m *Manager) runProvider(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { +func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - updateReceived := make(chan struct{}, 1) + triggerUpdate := make(chan struct{}, 1) for { select { case <-ctx.Done(): return case tgs, ok := <-updates: - // Handle the case that a target provider(E.g. StaticProvider) exits and - // closes the channel before the context is done. - // This will prevent sending the updates to the receiver so we send them before exiting. if !ok { + level.Debug(m.logger).Log("msg", "discoverer channel closed, sending the last update", "provider", p.name) select { - case m.syncCh <- m.allGroups(): - default: - level.Debug(m.logger).Log("msg", "discovery receiver's channel was full") + case m.syncCh <- m.allGroups(): // Waiting until the receiver can accept the last update. + level.Debug(m.logger).Log("msg", "discoverer exited", "provider", p.name) + return + case <-ctx.Done(): + return } - return + } for _, s := range p.subs { m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) } - // Signal that there was an update. select { - case updateReceived <- struct{}{}: + case triggerUpdate <- struct{}{}: default: } - case <-ticker.C: // Some discoverers send updates too often so we send these to the receiver once every 5 seconds. + case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. select { - case <-updateReceived: // Send only when there is a new update. + case <-triggerUpdate: select { case m.syncCh <- m.allGroups(): default: - level.Debug(m.logger).Log("msg", "discovery receiver's channel was full") + level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle", "provider", p.name) + select { + case triggerUpdate <- struct{}{}: + default: + } } default: } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 402c75b9d7..d9566201f9 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" sd_config "github.com/prometheus/prometheus/discovery/config" @@ -657,7 +658,7 @@ func TestTargetUpdatesOrder(t *testing.T) { for testIndex, testCase := range testCases { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) + discoveryManager := NewManager(ctx, log.NewNopLogger()) var totalUpdatesCount int @@ -743,7 +744,7 @@ scrape_configs: } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) + discoveryManager := NewManager(ctx, log.NewNopLogger()) go discoveryManager.Run() c := make(map[string]sd_config.ServiceDiscoveryConfig) @@ -849,7 +850,7 @@ scrape_configs: } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) + discoveryManager := NewManager(ctx, log.NewNopLogger()) go discoveryManager.Run() c := make(map[string]sd_config.ServiceDiscoveryConfig)