From 405d13d479c02bb44e25e989f3ade8e37398eab7 Mon Sep 17 00:00:00 2001 From: avilevy Date: Wed, 25 Mar 2026 19:18:22 +0000 Subject: [PATCH] scrape: Add TestManagerReloader and refactor discovery triggerSync Adds a new TestManagerReloader test suite using synctest to assert behavior of target updates, discovery reload ticker intervals, and ScrapeOnShutdown flags. Updates setupSynctestManager to allow skipping initial config setup by passing an interval of 0. Also renames the 'keep' variable to 'triggerSync' in ApplyConfig inside discovery/manager.go for clarity, and adds a descriptive comment. Signed-off-by: avilevy --- discovery/manager.go | 9 +-- scrape/helpers_test.go | 4 ++ scrape/manager_test.go | 124 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 4 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index d788683adb..3bffe1a9cd 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -253,7 +253,8 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { var ( wg sync.WaitGroup newProviders []*Provider - keep bool + // triggerSync shows if we should trigger send to notify downstream of changes. + triggerSync bool ) for _, prov := range m.providers { // Cancel obsolete providers if it has no new subs and it has a cancel function. @@ -268,7 +269,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { prov.cancel() prov.mu.RUnlock() - keep = true // Trigger send to notify downstream of dropped targets + triggerSync = true // Trigger send to notify downstream of dropped targets continue } prov.mu.RUnlock() @@ -280,7 +281,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { m.targetsMtx.Lock() for s := range prov.subs { - keep = true // Trigger send because this is an existing provider (reload) + triggerSync = true // Trigger send because this is an existing provider (reload) refTargets = m.targets[poolKey{s, prov.name}] // Remove obsolete subs' targets. if _, ok := prov.newSubs[s]; !ok { @@ -313,7 +314,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { // See https://github.com/prometheus/prometheus/pull/8639 for details. // This also helps making the downstream managers drop stale targets as soon as possible. // See https://github.com/prometheus/prometheus/pull/13147 for details. - if keep { + if triggerSync { select { case m.triggerSend <- struct{}{}: default: diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 5119a4c66b..705abcf4d5 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -345,6 +345,10 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) ( ) require.NoError(t, err) + if interval <= 0 { + return scrapeManager, app, cleanup + } + cfg := &config.Config{ GlobalConfig: config.GlobalConfig{ ScrapeInterval: model.Duration(interval), diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 37570c2f90..e6150224e0 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -1714,3 +1714,127 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { }) } } + +func TestManagerReloader(t *testing.T) { + for _, tcase := range []struct { + name string + scrapeOnShutdown bool + discoveryReloadInterval time.Duration + updateTarget bool + runDuration time.Duration + expectedSamplesTotal int + }{ + { + name: "no scrape on shutdown with default interval", + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no scrape on shutdown", + discoveryReloadInterval: 1 * time.Second, + runDuration: 1 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no scrape on shutdown with default interval and update", + updateTarget: true, + runDuration: 12 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no scrape on shutdown after ticker", + discoveryReloadInterval: 1 * time.Second, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no scrape on shutdown with default interval after ticker", + updateTarget: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "scrape on shutdown without updates", + scrapeOnShutdown: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 2, + }, + { + name: "scrape on shutdown with updates", + scrapeOnShutdown: true, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 2, + }, + { + name: "scrape on shutdown default interval after ticker", + scrapeOnShutdown: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 2, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{ + ScrapeOnShutdown: tcase.scrapeOnShutdown, + DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval), + } + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, 0) // Pass 0 to skip setup config + defer cleanupConns() + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(10 * time.Second), + ScrapeTimeout: model.Duration(10 * time.Second), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + tsetsCh := make(chan map[string][]*targetgroup.Group) + go scrapeManager.Run(tsetsCh) + + // Send initial target to trigger the first reload via the normal flow. + initialTargetLabels := model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: "test.local", + } + tsetsCh <- map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{initialTargetLabels}, + }}, + } + synctest.Wait() // Wait for Run to process tsetsCh and for reloader to trigger reload. + + if tcase.updateTarget { + newTargetLabels := model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: "test-updated.local", + } + tsetsCh <- map[string][]*targetgroup.Group{ + "test": {{ + Source: "test", + Targets: []model.LabelSet{newTargetLabels}, + }}, + } + synctest.Wait() // Wait for Run to process tsetsCh. + } + + if tcase.runDuration > 0 { + time.Sleep(tcase.runDuration) + synctest.Wait() + } + + scrapeManager.Stop() + synctest.Wait() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal) + }) + }) + } +}