From eb220862e5ad81835f06fa8e8de29e6c47bf0886 Mon Sep 17 00:00:00 2001 From: avilevy18 <105948922+avilevy18@users.noreply.github.com> Date: Fri, 3 Apr 2026 06:01:49 -0400 Subject: [PATCH] discovery, scrape: Use backoff interval for throttling discovery updates; add DiscoveryReloadOnStartup option for short-lived environments (#18187) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adding scape on shutdown Signed-off-by: avilevy * scrape: replace skipOffsetting to make the test offset deterministic instead of skipping it entirely Signed-off-by: avilevy * renamed calculateScrapeOffset to getScrapeOffset Signed-off-by: avilevy * discovery: Add skipStartupWait to bypass initial discovery delay In short-lived environments like agent mode or serverless, the Prometheus process may only execute for a few seconds. Waiting for the default 5-second `updatert` ticker before sending the first target groups means the process could terminate before collecting any metrics at all. This commit adds a `skipStartupWait` option to the Discovery Manager to bypass this initial delay. When enabled, the sender uses an unthrottled startup loop that instantly forwards all triggers. This ensures both the initial empty update from `ApplyConfig` and the first real targets from discoverers are passed downstream immediately. After the first ticker interval elapses, the sender cleanly breaks out of the startup phase, resets the ticker, and resumes standard operations. Signed-off-by: avilevy * scrape: Bypass initial reload delay for ScrapeOnShutdown In short-lived environments like agent mode or serverless, the default 5-second `DiscoveryReloadInterval` can cause the process to terminate before the scrape manager has a chance to process targets and collect any metrics. Because the discovery manager sends an initial empty update upon configuration followed rapidly by the actual targets, simply waiting for a single reload trigger is insufficient—the real targets would still get trapped behind the ticker delay. This commit introduces an unthrottled startup loop in the `reloader` when `ScrapeOnShutdown` is enabled. It processes all incoming `triggerReload` signals immediately during the first interval. Once the initial tick fires, the `reloader` resets the ticker and falls back into its standard throttled loop, ensuring short-lived processes can discover and scrape targets instantly. Signed-off-by: avilevy * test(scrape): refactor time-based manager tests to use synctest Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests. Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic. - Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble. - Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable. - Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest. - Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven. Signed-off-by: avilevy * Clarify use cases in InitialScrapeOffset comment Signed-off-by: avilevy * test(scrape): use httptest for mock server to respect context cancellation - Replaced manual HTTP string formatting over `net.Pipe` with `httptest.NewUnstartedServer`. - Implemented an in-memory `pipeListener` to allow the server to handle `net.Pipe` connections directly. This preserves `synctest` time isolation without opening real OS ports. - Added explicit `r.Context().Done()` handling in the mock HTTP handler to properly simulate aborted requests and scrape timeouts. - Validates that the request context remains active and is not prematurely cancelled during `ScrapeOnShutdown` scenarios. - Renamed `skipOffsetting` to `skipJitterOffsetting`. - Addressed other PR comments. Signed-off-by: avilevy * tmp Signed-off-by: bwplotka * exp2 Signed-off-by: bwplotka * fix Signed-off-by: bwplotka * scrape: fix scrapeOnShutdown context bug and refactor test helpers The scrapeOnShutdown feature was failing during manager shutdown because the scrape pool context was being cancelled before the final shutdown scrapes could execute. Fix this by delaying context cancellation in scrapePool.stop() until after all scrape loops have stopped. In addition: - Added test cases to verify scrapeOnShutdown works with InitialScrapeOffset. - Refactored network test helper functions from manager_test.go to helpers_test.go. - Addressed other comments. Signed-off-by: avilevy * Update scrape/scrape.go Co-authored-by: Bartlomiej Plotka Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com> * feat(discovery): add SkipInitialWait to bypass initial startup delay This adds a SkipInitialWait option to the discovery Manager, allowing consumers sensitive to startup latency to receive the first batch of discovered targets immediately instead of waiting for the updatert ticker. To support this without breaking the immediate dropped target notifications introduced in #13147, ApplyConfig now uses a keep flag to only trigger immediate downstream syncs for obsolete or updated providers. This prevents sending premature empty target groups for brand-new providers on initial startup. Additionally, the scrape manager's reloader loop is updated to process the initial triggerReload immediately, ensuring the end-to-end pipeline processes initial targets without artificial delays. Signed-off-by: avilevy * scrape: Add TestManagerReloader and refactor discovery triggerSync Adds a new TestManagerReloader test suite using synctest to assert behavior of target updates, discovery reload ticker intervals, and ScrapeOnShutdown flags. Updates setupSynctestManager to allow skipping initial config setup by passing an interval of 0. Also renames the 'keep' variable to 'triggerSync' in ApplyConfig inside discovery/manager.go for clarity, and adds a descriptive comment. Signed-off-by: avilevy * feat(discovery,scrape): rename startup wait options and add DiscoveryReloadOnStartup - discovery: Rename `SkipInitialWait` to `SkipStartupWait` for clarity. - discovery: Pass `context.Context` to `flushUpdates` to handle cancellation and avoid leaks. - scrape: Add `DiscoveryReloadOnStartup` to `Options` to decouple startup discovery from `ScrapeOnShutdown`. - tests: Refactor `TestTargetSetTargetGroupsPresentOnStartup` and `TestManagerReloader` to use table-driven tests and `synctest` for better stability and coverage. Signed-off-by: avilevy * feat(discovery,scrape): importing changes proposed in 043d710 - Refactor sender to use exponential backoff - Replaces `time.NewTicker` in `sender()` with an exponential backoff to prevent panics on non-positive intervals and better throttle updates. - Removes obsolete `skipStartupWait` logic. - Refactors `setupSynctestManager` to use an explicit `initConfig` argument Signed-off-by: avilevy * fix: updating go mod Signed-off-by: avilevy * fixing merge Signed-off-by: avilevy * fixing issue: 2 variables but NewTestMetrics returns 1 value Signed-off-by: avilevy * Update discovery/manager.go Co-authored-by: Bartlomiej Plotka Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com> * Refactor setupSynctestManager initConfig into a separate function Signed-off-by: avilevy --------- Signed-off-by: avilevy Signed-off-by: bwplotka Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com> Co-authored-by: bwplotka --- discovery/manager.go | 22 +++++- discovery/manager_test.go | 79 ++++++++++++++++++++ go.mod | 2 +- scrape/helpers_test.go | 10 ++- scrape/manager.go | 26 +++++-- scrape/manager_test.go | 153 +++++++++++++++++++++++++++++++++++++- 6 files changed, 277 insertions(+), 15 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 8318ff5bd6..fa52e164f7 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff/v5" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/promslog" @@ -101,7 +102,7 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus. targets: make(map[poolKey]map[string]*targetgroup.Group), ctx: ctx, updatert: 5 * time.Second, - triggerSend: make(chan struct{}, 1), + triggerSend: make(chan struct{}, 1), // At least one element to ensure we can do a delayed read. registerer: registerer, sdMetrics: sdMetrics, } @@ -408,24 +409,34 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ } func (m *Manager) sender() { - ticker := time.NewTicker(m.updatert) defer func() { - ticker.Stop() close(m.syncCh) }() + // Some discoverers send updates too often, so we throttle these with a backoff interval that + // increases the interval up to m.updatert delay. + lastSent := time.Now().Add(-1 * m.updatert) + b := &backoff.ExponentialBackOff{ + InitialInterval: 100 * time.Millisecond, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: m.updatert, + } + for { select { case <-m.ctx.Done(): return - case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. + case <-time.After(b.NextBackOff()): select { case <-m.triggerSend: m.metrics.SentUpdates.Inc() select { case m.syncCh <- m.allGroups(): + lastSent = time.Now() default: m.metrics.DelayedUpdates.Inc() m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle") + // Ensure we don't miss this update. select { case m.triggerSend <- struct{}{}: default: @@ -433,6 +444,9 @@ func (m *Manager) sender() { } default: } + if time.Since(lastSent) > m.updatert { + b.Reset() // Nothing happened for a while, start again from low interval for prompt updates. + } } } } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 2e8e4558f6..b98dc1dd35 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -21,6 +21,7 @@ import ( "strconv" "sync" "testing" + "testing/synctest" "time" "github.com/prometheus/client_golang/prometheus" @@ -785,6 +786,84 @@ func pk(provider, setName string, n int) poolKey { } } +func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { + testCases := []struct { + name string + updatert time.Duration + readTimeout time.Duration + expectedTargets int + }{ + { + name: "startup wait with long interval times out", + updatert: 100 * time.Hour, + readTimeout: 10 * time.Millisecond, + }, + { + name: "startup wait with short interval succeeds", + updatert: 10 * time.Millisecond, + readTimeout: 300 * time.Millisecond, + expectedTargets: 1, + }, + { + name: "skip startup wait", + updatert: 100 * time.Hour, + readTimeout: 300 * time.Millisecond, + expectedTargets: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + reg := prometheus.NewRegistry() + sdMetrics := NewTestMetrics(t, reg) + + opts := make([]func(*Manager), 0) + discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, opts...) + require.NotNil(t, discoveryManager) + + discoveryManager.updatert = tc.updatert + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + synctest.Wait() + + timeout := time.After(tc.readTimeout) + var lastSyncedTargets map[string][]*targetgroup.Group + testFor: + for { + select { + case <-timeout: + break testFor + case lastSyncedTargets = <-discoveryManager.SyncCh(): + } + } + + if tc.expectedTargets == 0 { + require.Nil(t, lastSyncedTargets) + return + } + + require.Len(t, lastSyncedTargets, 1) + require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets) + verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Len(t, discoveryManager.targets, 1) + }) + }) + } +} + func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { ctx := t.Context() diff --git a/go.mod b/go.mod index 4df021e21d..4080c5be26 100644 --- a/go.mod +++ b/go.mod @@ -156,7 +156,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 5119a4c66b..8d3e8f74e7 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -309,7 +309,7 @@ func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) { // setupSynctestManager abstracts the boilerplate of creating a mock network, // starting the fake HTTP server, and configuring the scrape manager for synctest. -func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) { +func setupSynctestManager(t *testing.T, opts *Options) (*Manager, *teststorage.Appendable, func()) { t.Helper() app := teststorage.NewAppendable() @@ -345,6 +345,12 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) ( ) require.NoError(t, err) + return scrapeManager, app, cleanup +} + +func applyDefaultSynctestConfig(t *testing.T, scrapeManager *Manager, interval time.Duration) { + t.Helper() + cfg := &config.Config{ GlobalConfig: config.GlobalConfig{ ScrapeInterval: model.Duration(interval), @@ -368,6 +374,4 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) ( }) scrapeManager.reload() - - return scrapeManager, app, cleanup } diff --git a/scrape/manager.go b/scrape/manager.go index 41cfab95e9..fd5cf4460e 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -151,6 +151,13 @@ type Options struct { // NOTE: This final scrape ignores the configured scrape interval. ScrapeOnShutdown bool + // DiscoveryReloadOnStartup enables discovering targets immediately on start up as opposed + // to waiting for the interval defined in DiscoveryReloadInterval before + // initializing the scrape pools. Disabled by default. Useful for serverless + // flavors of OpenTelemetry contrib's prometheusreceiver where we're + // sensitive to start up delays. + DiscoveryReloadOnStartup bool + // InitialScrapeOffset applies an additional baseline delay before we begin // scraping targets. By default, Prometheus calculates a specific offset for // each target to spread the scraping load evenly across the server. Configuring @@ -225,15 +232,24 @@ func (m *Manager) UnregisterMetrics() { } func (m *Manager) reloader() { - reloadIntervalDuration := m.opts.DiscoveryReloadInterval - if reloadIntervalDuration == model.Duration(0) { - reloadIntervalDuration = model.Duration(5 * time.Second) + reloadIntervalDuration := time.Duration(m.opts.DiscoveryReloadInterval) + if reloadIntervalDuration == 0 { + reloadIntervalDuration = 5 * time.Second } - ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) - + ticker := time.NewTicker(reloadIntervalDuration) defer ticker.Stop() + if m.opts.DiscoveryReloadOnStartup { + select { + case <-m.graceShut: + return + case <-m.triggerReload: + m.reload() + } + ticker.Reset(reloadIntervalDuration) + } + for { select { case <-m.graceShut: diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 1fc5b355d6..d61f0dfe62 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -1633,9 +1633,11 @@ func TestManager_InitialScrapeOffset(t *testing.T) { t.Run(tcase.name, func(t *testing.T) { synctest.Test(t, func(t *testing.T) { opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset} - scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts) defer cleanupConns() + applyDefaultSynctestConfig(t, scrapeManager, interval) + // Wait for the scrape manager to block on its timers. synctest.Wait() @@ -1699,9 +1701,11 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { ScrapeOnShutdown: tcase.scrapeOnShutdown, InitialScrapeOffset: tcase.initialScrapeOffset, } - scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts) defer cleanupConns() + applyDefaultSynctestConfig(t, scrapeManager, interval) + // Wait for the initial scrape to happen exactly at t=0. synctest.Wait() @@ -1719,3 +1723,148 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { }) } } + +func TestManagerReloader(t *testing.T) { + for _, tcase := range []struct { + name string + discoveryReloadOnStartup bool + scrapeOnShutdown bool + discoveryReloadInterval time.Duration + updateTarget bool + runDuration time.Duration + expectedSamplesTotal int + }{ + { + name: "no startup reload default interval", + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload short interval", + discoveryReloadInterval: 1 * time.Second, + runDuration: 1 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload default interval with target update", + updateTarget: true, + runDuration: 12 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload short interval after ticker", + discoveryReloadInterval: 1 * time.Second, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload default interval after ticker with update", + updateTarget: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "startup reload", + discoveryReloadOnStartup: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "startup reload with target update", + discoveryReloadOnStartup: true, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "startup reload with update after ticker", + discoveryReloadOnStartup: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload with scrape on shutdown after reload", + scrapeOnShutdown: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 2, + }, + { + name: "stop before no startup reload", + scrapeOnShutdown: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 0, + }, + { + name: "startup reload and scrape on shutdown", + discoveryReloadOnStartup: true, + scrapeOnShutdown: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 2, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{ + DiscoveryReloadOnStartup: tcase.discoveryReloadOnStartup, + ScrapeOnShutdown: tcase.scrapeOnShutdown, + DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval), + } + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts) + defer cleanupConns() + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(10 * time.Second), + ScrapeTimeout: model.Duration(10 * time.Second), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + tsetsCh := make(chan map[string][]*targetgroup.Group) + go scrapeManager.Run(tsetsCh) + + // Send initial target to trigger the first reload via the normal flow. + initialTargetLabels := model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: "test.local", + } + tsetsCh <- map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{initialTargetLabels}, + }}, + } + synctest.Wait() // Wait for Run to process tsetsCh and for reloader to trigger reload. + + if tcase.updateTarget { + newTargetLabels := model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: "test-updated.local", + } + tsetsCh <- map[string][]*targetgroup.Group{ + "test": {{ + Source: "test", + Targets: []model.LabelSet{newTargetLabels}, + }}, + } + synctest.Wait() // Wait for Run to process tsetsCh. + } + + if tcase.runDuration > 0 { + time.Sleep(tcase.runDuration) + synctest.Wait() + } + + scrapeManager.Stop() + synctest.Wait() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal) + }) + }) + } +}