From 0f38319b928e062682416632738bec7b568f0673 Mon Sep 17 00:00:00 2001 From: avilevy Date: Wed, 25 Mar 2026 22:00:51 +0000 Subject: [PATCH] feat(discovery,scrape): rename startup wait options and add DiscoveryReloadOnStartup - discovery: Rename `SkipInitialWait` to `SkipStartupWait` for clarity. - discovery: Pass `context.Context` to `flushUpdates` to handle cancellation and avoid leaks. - scrape: Add `DiscoveryReloadOnStartup` to `Options` to decouple startup discovery from `ScrapeOnShutdown`. - tests: Refactor `TestTargetSetTargetGroupsPresentOnStartup` and `TestManagerReloader` to use table-driven tests and `synctest` for better stability and coverage. Signed-off-by: avilevy --- discovery/manager.go | 12 +++-- discovery/manager_test.go | 98 ++++++++++++++++++++++++++++----------- scrape/manager.go | 9 +++- scrape/manager_test.go | 67 +++++++++++++++++--------- 4 files changed, 131 insertions(+), 55 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 3bffe1a9cd..29fd825412 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -158,10 +158,10 @@ func FeatureRegistry(fr features.Collector) func(*Manager) { } } -// SkipInitialWait configures the manager to skip the initial wait on startup. +// SkipStartupWait configures the manager to skip the initial wait on startup. // This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver // which are sensitive to startup latencies. -func SkipInitialWait() func(*Manager) { +func SkipStartupWait() func(*Manager) { return func(m *Manager) { m.mtx.Lock() defer m.mtx.Unlock() @@ -402,7 +402,7 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ } } -func (m *Manager) flushUpdates(timeout <-chan time.Time) { +func (m *Manager) flushUpdates(ctx context.Context, timeout <-chan time.Time) { m.metrics.SentUpdates.Inc() select { case m.syncCh <- m.allGroups(): @@ -411,6 +411,8 @@ func (m *Manager) flushUpdates(timeout <-chan time.Time) { m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle") select { case m.triggerSend <- struct{}{}: + case <-ctx.Done(): + return default: } } @@ -426,7 +428,7 @@ func (m *Manager) sender() { if m.skipStartupWait { select { case <-m.triggerSend: - m.flushUpdates(ticker.C) + m.flushUpdates(m.ctx, ticker.C) case <-m.ctx.Done(): return } @@ -437,7 +439,7 @@ func (m *Manager) sender() { select { case <-m.ctx.Done(): return - case <-ticker.C: + case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. select { case <-m.triggerSend: m.metrics.SentUpdates.Inc() diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 4b252e5725..3fb3fae145 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestMain(m *testing.M) { @@ -783,37 +784,82 @@ func pk(provider, setName string, n int) poolKey { } func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { - ctx := t.Context() - - reg := prometheus.NewRegistry() - _, sdMetrics := NewTestMetrics(t, reg) - - discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, SkipInitialWait()) - require.NotNil(t, discoveryManager) - - // Set the updatert to a long time so we can verify that the skip worked correctly. - discoveryManager.updatert = 100 * time.Hour - go discoveryManager.Run() - - c := map[string]Configs{ - "prometheus": { - staticConfig("foo:9090"), + testCases := []struct { + name string + skipInitialWait bool + updatert time.Duration + readTimeout time.Duration + expectedTargets int + }{ + { + name: "startup wait with long interval times out", + updatert: 100 * time.Hour, + readTimeout: 10 * time.Millisecond, + }, + { + name: "startup wait with short interval succeeds", + updatert: 10 * time.Millisecond, + readTimeout: 100 * time.Millisecond, + expectedTargets: 1, + }, + { + name: "skip startup wait", + skipInitialWait: true, + updatert: 100 * time.Hour, + readTimeout: 100 * time.Millisecond, + expectedTargets: 1, }, } - discoveryManager.ApplyConfig(c) - // Wait for the single bypassed send - syncedTargets := <-discoveryManager.SyncCh() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() - // Assertions on the targets received from the channel - require.Len(t, syncedTargets, 1) - require.Len(t, syncedTargets["prometheus"], 1) - verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + reg := prometheus.NewRegistry() + _, sdMetrics := NewTestMetrics(t, reg) - // Assertions on the manager's internal state - p := pk("static", "prometheus", 0) - verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) - require.Len(t, discoveryManager.targets, 1) + var opts []func(*Manager) + if tc.skipInitialWait { + opts = append(opts, SkipStartupWait()) + } + + discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, opts...) + require.NotNil(t, discoveryManager) + + discoveryManager.updatert = tc.updatert + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + synctest.Wait() + + var syncedTargets map[string][]*targetgroup.Group + select { + case syncedTargets = <-discoveryManager.SyncCh(): + case <-time.After(tc.readTimeout): + } + + if tc.expectedTargets == 0 { + require.Nil(t, syncedTargets) + return + } + + require.Len(t, syncedTargets, 1) + require.Len(t, syncedTargets["prometheus"], tc.expectedTargets) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Len(t, discoveryManager.targets, 1) + }) + }) + } } func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { diff --git a/scrape/manager.go b/scrape/manager.go index 4ea8c37532..07615a7bd8 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -151,6 +151,13 @@ type Options struct { // NOTE: This final scrape ignores the configured scrape interval. ScrapeOnShutdown bool + // DiscoveryReloadOnStartup enables discovering targets immediately on start up as opposed + // to waiting for the interval defined in DiscoveryReloadInterval before + // initializing the scrape pools. Disabled by default. Useful for serverless + // flavors of OpenTelemetry contrib's prometheusreceiver where we're + // sensitive to start up delays. + DiscoveryReloadOnStartup bool + // InitialScrapeOffset applies an additional baseline delay before we begin // scraping targets. By default, Prometheus calculates a specific offset for // each target to spread the scraping load evenly across the server. Configuring @@ -233,7 +240,7 @@ func (m *Manager) reloader() { ticker := time.NewTicker(reloadIntervalDuration) defer ticker.Stop() - if m.opts.ScrapeOnShutdown { + if m.opts.DiscoveryReloadOnStartup { select { case <-m.graceShut: return diff --git a/scrape/manager_test.go b/scrape/manager_test.go index e6150224e0..c2a4669d18 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -1717,68 +1717,89 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { func TestManagerReloader(t *testing.T) { for _, tcase := range []struct { - name string - scrapeOnShutdown bool - discoveryReloadInterval time.Duration - updateTarget bool - runDuration time.Duration - expectedSamplesTotal int + name string + discoveryReloadOnStartup bool + scrapeOnShutdown bool + discoveryReloadInterval time.Duration + updateTarget bool + runDuration time.Duration + expectedSamplesTotal int }{ { - name: "no scrape on shutdown with default interval", + name: "no startup reload default interval", runDuration: 6 * time.Second, expectedSamplesTotal: 1, }, { - name: "no scrape on shutdown", + name: "no startup reload short interval", discoveryReloadInterval: 1 * time.Second, runDuration: 1 * time.Second, expectedSamplesTotal: 1, }, { - name: "no scrape on shutdown with default interval and update", + name: "no startup reload default interval with target update", updateTarget: true, runDuration: 12 * time.Second, expectedSamplesTotal: 1, }, { - name: "no scrape on shutdown after ticker", + name: "no startup reload short interval after ticker", discoveryReloadInterval: 1 * time.Second, updateTarget: true, runDuration: 2 * time.Second, expectedSamplesTotal: 1, }, { - name: "no scrape on shutdown with default interval after ticker", + name: "no startup reload default interval after ticker with update", updateTarget: true, runDuration: 6 * time.Second, expectedSamplesTotal: 1, }, { - name: "scrape on shutdown without updates", - scrapeOnShutdown: true, - runDuration: 2 * time.Second, - expectedSamplesTotal: 2, + name: "startup reload", + discoveryReloadOnStartup: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, }, { - name: "scrape on shutdown with updates", - scrapeOnShutdown: true, - updateTarget: true, - runDuration: 2 * time.Second, - expectedSamplesTotal: 2, + name: "startup reload with target update", + discoveryReloadOnStartup: true, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, }, { - name: "scrape on shutdown default interval after ticker", + name: "startup reload with update after ticker", + discoveryReloadOnStartup: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload with scrape on shutdown after reload", scrapeOnShutdown: true, runDuration: 6 * time.Second, expectedSamplesTotal: 2, }, + { + name: "stop before no startup reload", + scrapeOnShutdown: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 0, + }, + { + name: "startup reload and scrape on shutdown", + discoveryReloadOnStartup: true, + scrapeOnShutdown: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 2, + }, } { t.Run(tcase.name, func(t *testing.T) { synctest.Test(t, func(t *testing.T) { opts := &Options{ - ScrapeOnShutdown: tcase.scrapeOnShutdown, - DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval), + DiscoveryReloadOnStartup: tcase.discoveryReloadOnStartup, + ScrapeOnShutdown: tcase.scrapeOnShutdown, + DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval), } scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, 0) // Pass 0 to skip setup config defer cleanupConns()