diff --git a/discovery/manager.go b/discovery/manager.go index d788683adb..3bffe1a9cd 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -253,7 +253,8 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { var ( wg sync.WaitGroup newProviders []*Provider - keep bool + // triggerSync shows if we should trigger send to notify downstream of changes. + triggerSync bool ) for _, prov := range m.providers { // Cancel obsolete providers if it has no new subs and it has a cancel function. @@ -268,7 +269,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { prov.cancel() prov.mu.RUnlock() - keep = true // Trigger send to notify downstream of dropped targets + triggerSync = true // Trigger send to notify downstream of dropped targets continue } prov.mu.RUnlock() @@ -280,7 +281,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { m.targetsMtx.Lock() for s := range prov.subs { - keep = true // Trigger send because this is an existing provider (reload) + triggerSync = true // Trigger send because this is an existing provider (reload) refTargets = m.targets[poolKey{s, prov.name}] // Remove obsolete subs' targets. if _, ok := prov.newSubs[s]; !ok { @@ -313,7 +314,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { // See https://github.com/prometheus/prometheus/pull/8639 for details. // This also helps making the downstream managers drop stale targets as soon as possible. // See https://github.com/prometheus/prometheus/pull/13147 for details. - if keep { + if triggerSync { select { case m.triggerSend <- struct{}{}: default: diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 5119a4c66b..705abcf4d5 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -345,6 +345,10 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) ( ) require.NoError(t, err) + if interval <= 0 { + return scrapeManager, app, cleanup + } + cfg := &config.Config{ GlobalConfig: config.GlobalConfig{ ScrapeInterval: model.Duration(interval), diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 37570c2f90..e6150224e0 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -1714,3 +1714,127 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { }) } } + +func TestManagerReloader(t *testing.T) { + for _, tcase := range []struct { + name string + scrapeOnShutdown bool + discoveryReloadInterval time.Duration + updateTarget bool + runDuration time.Duration + expectedSamplesTotal int + }{ + { + name: "no scrape on shutdown with default interval", + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no scrape on shutdown", + discoveryReloadInterval: 1 * time.Second, + runDuration: 1 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no scrape on shutdown with default interval and update", + updateTarget: true, + runDuration: 12 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no scrape on shutdown after ticker", + discoveryReloadInterval: 1 * time.Second, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no scrape on shutdown with default interval after ticker", + updateTarget: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "scrape on shutdown without updates", + scrapeOnShutdown: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 2, + }, + { + name: "scrape on shutdown with updates", + scrapeOnShutdown: true, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 2, + }, + { + name: "scrape on shutdown default interval after ticker", + scrapeOnShutdown: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 2, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{ + ScrapeOnShutdown: tcase.scrapeOnShutdown, + DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval), + } + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, 0) // Pass 0 to skip setup config + defer cleanupConns() + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(10 * time.Second), + ScrapeTimeout: model.Duration(10 * time.Second), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + tsetsCh := make(chan map[string][]*targetgroup.Group) + go scrapeManager.Run(tsetsCh) + + // Send initial target to trigger the first reload via the normal flow. + initialTargetLabels := model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: "test.local", + } + tsetsCh <- map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{initialTargetLabels}, + }}, + } + synctest.Wait() // Wait for Run to process tsetsCh and for reloader to trigger reload. + + if tcase.updateTarget { + newTargetLabels := model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: "test-updated.local", + } + tsetsCh <- map[string][]*targetgroup.Group{ + "test": {{ + Source: "test", + Targets: []model.LabelSet{newTargetLabels}, + }}, + } + synctest.Wait() // Wait for Run to process tsetsCh. + } + + if tcase.runDuration > 0 { + time.Sleep(tcase.runDuration) + synctest.Wait() + } + + scrapeManager.Stop() + synctest.Wait() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal) + }) + }) + } +}