feat(discovery,scrape): rename startup wait options and add DiscoveryReloadOnStartup

- discovery: Rename `SkipInitialWait` to `SkipStartupWait` for clarity.
- discovery: Pass `context.Context` to `flushUpdates` to handle cancellation and avoid leaks.
- scrape: Add `DiscoveryReloadOnStartup` to `Options` to decouple startup discovery from `ScrapeOnShutdown`.
- tests: Refactor `TestTargetSetTargetGroupsPresentOnStartup` and `TestManagerReloader` to use table-driven tests and `synctest` for better stability and coverage.

Signed-off-by: avilevy <avilevy@google.com>
This commit is contained in:
avilevy 2026-03-25 22:00:51 +00:00
parent 405d13d479
commit 0f38319b92
No known key found for this signature in database
4 changed files with 131 additions and 55 deletions

View File

@ -158,10 +158,10 @@ func FeatureRegistry(fr features.Collector) func(*Manager) {
}
}
// SkipInitialWait configures the manager to skip the initial wait on startup.
// SkipStartupWait configures the manager to skip the initial wait on startup.
// This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
// which are sensitive to startup latencies.
func SkipInitialWait() func(*Manager) {
func SkipStartupWait() func(*Manager) {
return func(m *Manager) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -402,7 +402,7 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
}
}
func (m *Manager) flushUpdates(timeout <-chan time.Time) {
func (m *Manager) flushUpdates(ctx context.Context, timeout <-chan time.Time) {
m.metrics.SentUpdates.Inc()
select {
case m.syncCh <- m.allGroups():
@ -411,6 +411,8 @@ func (m *Manager) flushUpdates(timeout <-chan time.Time) {
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}:
case <-ctx.Done():
return
default:
}
}
@ -426,7 +428,7 @@ func (m *Manager) sender() {
if m.skipStartupWait {
select {
case <-m.triggerSend:
m.flushUpdates(ticker.C)
m.flushUpdates(m.ctx, ticker.C)
case <-m.ctx.Done():
return
}
@ -437,7 +439,7 @@ func (m *Manager) sender() {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
select {
case <-m.triggerSend:
m.metrics.SentUpdates.Inc()

View File

@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/testutil"
"github.com/prometheus/prometheus/util/testutil/synctest"
)
func TestMain(m *testing.M) {
@ -783,37 +784,82 @@ func pk(provider, setName string, n int) poolKey {
}
func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, SkipInitialWait())
require.NotNil(t, discoveryManager)
// Set the updatert to a long time so we can verify that the skip worked correctly.
discoveryManager.updatert = 100 * time.Hour
go discoveryManager.Run()
c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
testCases := []struct {
name string
skipInitialWait bool
updatert time.Duration
readTimeout time.Duration
expectedTargets int
}{
{
name: "startup wait with long interval times out",
updatert: 100 * time.Hour,
readTimeout: 10 * time.Millisecond,
},
{
name: "startup wait with short interval succeeds",
updatert: 10 * time.Millisecond,
readTimeout: 100 * time.Millisecond,
expectedTargets: 1,
},
{
name: "skip startup wait",
skipInitialWait: true,
updatert: 100 * time.Hour,
readTimeout: 100 * time.Millisecond,
expectedTargets: 1,
},
}
discoveryManager.ApplyConfig(c)
// Wait for the single bypassed send
syncedTargets := <-discoveryManager.SyncCh()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ctx := t.Context()
// Assertions on the targets received from the channel
require.Len(t, syncedTargets, 1)
require.Len(t, syncedTargets["prometheus"], 1)
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
// Assertions on the manager's internal state
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Len(t, discoveryManager.targets, 1)
var opts []func(*Manager)
if tc.skipInitialWait {
opts = append(opts, SkipStartupWait())
}
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, opts...)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = tc.updatert
go discoveryManager.Run()
c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
},
}
discoveryManager.ApplyConfig(c)
synctest.Wait()
var syncedTargets map[string][]*targetgroup.Group
select {
case syncedTargets = <-discoveryManager.SyncCh():
case <-time.After(tc.readTimeout):
}
if tc.expectedTargets == 0 {
require.Nil(t, syncedTargets)
return
}
require.Len(t, syncedTargets, 1)
require.Len(t, syncedTargets["prometheus"], tc.expectedTargets)
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Len(t, discoveryManager.targets, 1)
})
})
}
}
func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) {

View File

@ -151,6 +151,13 @@ type Options struct {
// NOTE: This final scrape ignores the configured scrape interval.
ScrapeOnShutdown bool
// DiscoveryReloadOnStartup enables discovering targets immediately on start up as opposed
// to waiting for the interval defined in DiscoveryReloadInterval before
// initializing the scrape pools. Disabled by default. Useful for serverless
// flavors of OpenTelemetry contrib's prometheusreceiver where we're
// sensitive to start up delays.
DiscoveryReloadOnStartup bool
// InitialScrapeOffset applies an additional baseline delay before we begin
// scraping targets. By default, Prometheus calculates a specific offset for
// each target to spread the scraping load evenly across the server. Configuring
@ -233,7 +240,7 @@ func (m *Manager) reloader() {
ticker := time.NewTicker(reloadIntervalDuration)
defer ticker.Stop()
if m.opts.ScrapeOnShutdown {
if m.opts.DiscoveryReloadOnStartup {
select {
case <-m.graceShut:
return

View File

@ -1717,68 +1717,89 @@ func TestManager_ScrapeOnShutdown(t *testing.T) {
func TestManagerReloader(t *testing.T) {
for _, tcase := range []struct {
name string
scrapeOnShutdown bool
discoveryReloadInterval time.Duration
updateTarget bool
runDuration time.Duration
expectedSamplesTotal int
name string
discoveryReloadOnStartup bool
scrapeOnShutdown bool
discoveryReloadInterval time.Duration
updateTarget bool
runDuration time.Duration
expectedSamplesTotal int
}{
{
name: "no scrape on shutdown with default interval",
name: "no startup reload default interval",
runDuration: 6 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no scrape on shutdown",
name: "no startup reload short interval",
discoveryReloadInterval: 1 * time.Second,
runDuration: 1 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no scrape on shutdown with default interval and update",
name: "no startup reload default interval with target update",
updateTarget: true,
runDuration: 12 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no scrape on shutdown after ticker",
name: "no startup reload short interval after ticker",
discoveryReloadInterval: 1 * time.Second,
updateTarget: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no scrape on shutdown with default interval after ticker",
name: "no startup reload default interval after ticker with update",
updateTarget: true,
runDuration: 6 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown without updates",
scrapeOnShutdown: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 2,
name: "startup reload",
discoveryReloadOnStartup: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown with updates",
scrapeOnShutdown: true,
updateTarget: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 2,
name: "startup reload with target update",
discoveryReloadOnStartup: true,
updateTarget: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown default interval after ticker",
name: "startup reload with update after ticker",
discoveryReloadOnStartup: true,
runDuration: 6 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no startup reload with scrape on shutdown after reload",
scrapeOnShutdown: true,
runDuration: 6 * time.Second,
expectedSamplesTotal: 2,
},
{
name: "stop before no startup reload",
scrapeOnShutdown: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 0,
},
{
name: "startup reload and scrape on shutdown",
discoveryReloadOnStartup: true,
scrapeOnShutdown: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 2,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{
ScrapeOnShutdown: tcase.scrapeOnShutdown,
DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval),
DiscoveryReloadOnStartup: tcase.discoveryReloadOnStartup,
ScrapeOnShutdown: tcase.scrapeOnShutdown,
DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval),
}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, 0) // Pass 0 to skip setup config
defer cleanupConns()