mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-13 00:28:42 +02:00
scrape: Add TestManagerReloader and refactor discovery triggerSync
Adds a new TestManagerReloader test suite using synctest to assert behavior of target updates, discovery reload ticker intervals, and ScrapeOnShutdown flags. Updates setupSynctestManager to allow skipping initial config setup by passing an interval of 0. Also renames the 'keep' variable to 'triggerSync' in ApplyConfig inside discovery/manager.go for clarity, and adds a descriptive comment. Signed-off-by: avilevy <avilevy@google.com>
This commit is contained in:
parent
5a96218714
commit
405d13d479
@ -253,7 +253,8 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
newProviders []*Provider
|
||||
keep bool
|
||||
// triggerSync shows if we should trigger send to notify downstream of changes.
|
||||
triggerSync bool
|
||||
)
|
||||
for _, prov := range m.providers {
|
||||
// Cancel obsolete providers if it has no new subs and it has a cancel function.
|
||||
@ -268,7 +269,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
||||
|
||||
prov.cancel()
|
||||
prov.mu.RUnlock()
|
||||
keep = true // Trigger send to notify downstream of dropped targets
|
||||
triggerSync = true // Trigger send to notify downstream of dropped targets
|
||||
continue
|
||||
}
|
||||
prov.mu.RUnlock()
|
||||
@ -280,7 +281,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
||||
|
||||
m.targetsMtx.Lock()
|
||||
for s := range prov.subs {
|
||||
keep = true // Trigger send because this is an existing provider (reload)
|
||||
triggerSync = true // Trigger send because this is an existing provider (reload)
|
||||
refTargets = m.targets[poolKey{s, prov.name}]
|
||||
// Remove obsolete subs' targets.
|
||||
if _, ok := prov.newSubs[s]; !ok {
|
||||
@ -313,7 +314,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
||||
// See https://github.com/prometheus/prometheus/pull/8639 for details.
|
||||
// This also helps making the downstream managers drop stale targets as soon as possible.
|
||||
// See https://github.com/prometheus/prometheus/pull/13147 for details.
|
||||
if keep {
|
||||
if triggerSync {
|
||||
select {
|
||||
case m.triggerSend <- struct{}{}:
|
||||
default:
|
||||
|
||||
@ -345,6 +345,10 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
if interval <= 0 {
|
||||
return scrapeManager, app, cleanup
|
||||
}
|
||||
|
||||
cfg := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
ScrapeInterval: model.Duration(interval),
|
||||
|
||||
@ -1714,3 +1714,127 @@ func TestManager_ScrapeOnShutdown(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManagerReloader(t *testing.T) {
|
||||
for _, tcase := range []struct {
|
||||
name string
|
||||
scrapeOnShutdown bool
|
||||
discoveryReloadInterval time.Duration
|
||||
updateTarget bool
|
||||
runDuration time.Duration
|
||||
expectedSamplesTotal int
|
||||
}{
|
||||
{
|
||||
name: "no scrape on shutdown with default interval",
|
||||
runDuration: 6 * time.Second,
|
||||
expectedSamplesTotal: 1,
|
||||
},
|
||||
{
|
||||
name: "no scrape on shutdown",
|
||||
discoveryReloadInterval: 1 * time.Second,
|
||||
runDuration: 1 * time.Second,
|
||||
expectedSamplesTotal: 1,
|
||||
},
|
||||
{
|
||||
name: "no scrape on shutdown with default interval and update",
|
||||
updateTarget: true,
|
||||
runDuration: 12 * time.Second,
|
||||
expectedSamplesTotal: 1,
|
||||
},
|
||||
{
|
||||
name: "no scrape on shutdown after ticker",
|
||||
discoveryReloadInterval: 1 * time.Second,
|
||||
updateTarget: true,
|
||||
runDuration: 2 * time.Second,
|
||||
expectedSamplesTotal: 1,
|
||||
},
|
||||
{
|
||||
name: "no scrape on shutdown with default interval after ticker",
|
||||
updateTarget: true,
|
||||
runDuration: 6 * time.Second,
|
||||
expectedSamplesTotal: 1,
|
||||
},
|
||||
{
|
||||
name: "scrape on shutdown without updates",
|
||||
scrapeOnShutdown: true,
|
||||
runDuration: 2 * time.Second,
|
||||
expectedSamplesTotal: 2,
|
||||
},
|
||||
{
|
||||
name: "scrape on shutdown with updates",
|
||||
scrapeOnShutdown: true,
|
||||
updateTarget: true,
|
||||
runDuration: 2 * time.Second,
|
||||
expectedSamplesTotal: 2,
|
||||
},
|
||||
{
|
||||
name: "scrape on shutdown default interval after ticker",
|
||||
scrapeOnShutdown: true,
|
||||
runDuration: 6 * time.Second,
|
||||
expectedSamplesTotal: 2,
|
||||
},
|
||||
} {
|
||||
t.Run(tcase.name, func(t *testing.T) {
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
opts := &Options{
|
||||
ScrapeOnShutdown: tcase.scrapeOnShutdown,
|
||||
DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval),
|
||||
}
|
||||
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, 0) // Pass 0 to skip setup config
|
||||
defer cleanupConns()
|
||||
|
||||
cfg := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
ScrapeInterval: model.Duration(10 * time.Second),
|
||||
ScrapeTimeout: model.Duration(10 * time.Second),
|
||||
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
|
||||
},
|
||||
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
|
||||
}
|
||||
cfgText, err := yaml.Marshal(*cfg)
|
||||
require.NoError(t, err)
|
||||
cfg = loadConfiguration(t, string(cfgText))
|
||||
require.NoError(t, scrapeManager.ApplyConfig(cfg))
|
||||
|
||||
tsetsCh := make(chan map[string][]*targetgroup.Group)
|
||||
go scrapeManager.Run(tsetsCh)
|
||||
|
||||
// Send initial target to trigger the first reload via the normal flow.
|
||||
initialTargetLabels := model.LabelSet{
|
||||
model.SchemeLabel: "http",
|
||||
model.AddressLabel: "test.local",
|
||||
}
|
||||
tsetsCh <- map[string][]*targetgroup.Group{
|
||||
"test": {{
|
||||
Targets: []model.LabelSet{initialTargetLabels},
|
||||
}},
|
||||
}
|
||||
synctest.Wait() // Wait for Run to process tsetsCh and for reloader to trigger reload.
|
||||
|
||||
if tcase.updateTarget {
|
||||
newTargetLabels := model.LabelSet{
|
||||
model.SchemeLabel: "http",
|
||||
model.AddressLabel: "test-updated.local",
|
||||
}
|
||||
tsetsCh <- map[string][]*targetgroup.Group{
|
||||
"test": {{
|
||||
Source: "test",
|
||||
Targets: []model.LabelSet{newTargetLabels},
|
||||
}},
|
||||
}
|
||||
synctest.Wait() // Wait for Run to process tsetsCh.
|
||||
}
|
||||
|
||||
if tcase.runDuration > 0 {
|
||||
time.Sleep(tcase.runDuration)
|
||||
synctest.Wait()
|
||||
}
|
||||
|
||||
scrapeManager.Stop()
|
||||
synctest.Wait()
|
||||
|
||||
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user