From 0e478d1f3f928151ef6fcc37cf9a810929b7cbc7 Mon Sep 17 00:00:00 2001 From: avilevy Date: Sat, 7 Mar 2026 01:54:04 +0000 Subject: [PATCH] test(scrape): refactor time-based manager tests to use synctest Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests. Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic. - Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble. - Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable. - Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest. - Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven. Signed-off-by: avilevy --- scrape/helpers_test.go | 2 +- scrape/manager.go | 40 ++++-- scrape/manager_test.go | 300 ++++++++++++++++++++++++++--------------- scrape/scrape.go | 16 ++- scrape/scrape_test.go | 3 +- 5 files changed, 233 insertions(+), 128 deletions(-) diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index ecc9672a9b..ac7964ee51 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -99,7 +99,7 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo validationScheme: model.UTF8Validation, symbolTable: labels.NewSymbolTable(), appendMetadataToWAL: true, // Tests assumes it's enabled, unless explicitly turned off. - initialScrapeOffset: nil, + initialScrapeOffset: time.Duration(0), scrapeOnShutdown: false, } for _, o := range opts { diff --git a/scrape/manager.go b/scrape/manager.go index 7e3560a7a0..41d1c24fd5 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -126,19 +126,27 @@ type Options struct { // FeatureRegistry is the registry for tracking enabled/disabled features. FeatureRegistry features.Collector - // Option to allow a final scrape before the manager is shutdown. This is useful - // for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver - // which might require a final scrape of targets before the instance is shutdown. + // private option for testability. + skipOffsetting bool + + // Option to allow a final scrape before the manager closes. This is useful + // for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless + // job scenarios, allowing an extra scrape for the short-living edge cases. // - // Note: This final scrape ignores the configured scrape interval. If the time - // elapsed since the last scrape is short, some backends (e.g. Google Cloud Monitoring) - // may reject the data points due to timestamps being too close together. + // NOTE: This final scrape ignores the configured scrape interval. ScrapeOnShutdown bool - // initialScrapeOffset is a private option strictly for testing. It overrides - // the standard scrape offset to manually control execution timing during - // test runs. - initialScrapeOffset *time.Duration + // InitialScrapeOffset applies an additional baseline delay before we begin + // scraping targets. By default, Prometheus calculates a specific offset for + // each target to spread the scraping load evenly across the server. Configuring + // this option adds a fixed duration to that target-specific offset. This allows + // tuning the initial startup delay without overriding the underlying target + // jitter, preserving proper load balancing across the scraper pools. + // + // NOTE: This option is not used by the standard Prometheus server. It was + // created for use in agent mode or in OTel's prometheusreceiver when + // used in serverless job scenarios. + InitialScrapeOffset time.Duration } // Manager maintains a set of scrape pools and manages start/stop cycles @@ -327,8 +335,16 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { m.scrapeFailureLoggers = scrapeFailureLoggers - if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil { - return err + // Skip offset seed calculation during tests. + // setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using + // a global singleflight goroutine. This cross-boundary communication breaks + // synctest's isolation bubble and causes a fatal panic. + if m.opts.skipOffsetting { + m.offsetSeed = 0 + } else { + if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil { + return err + } } // Cleanup and reload pool if the configuration has changed. diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 92f14068d1..534bbeefa6 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -14,11 +14,14 @@ package scrape import ( + "bufio" "bytes" "context" "errors" "fmt" + "io" "maps" + "net" "net/http" "net/http/httptest" "net/url" @@ -33,9 +36,11 @@ import ( "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + config_util "github.com/prometheus/common/config" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" + "github.com/prometheus/prometheus/util/testutil/synctest" "github.com/stretchr/testify/require" "go.yaml.in/yaml/v2" "google.golang.org/protobuf/types/known/timestamppb" @@ -765,10 +770,9 @@ func TestManagerSTZeroIngestion(t *testing.T) { encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, stTs) app := teststorage.NewAppendable() - noOffSet := time.Duration(0) discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: testSTZeroIngest, - initialScrapeOffset: &noOffSet, + skipOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -952,10 +956,9 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) { defer cancel() app := teststorage.NewAppendable() - noOffSet := time.Duration(0) discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion, - initialScrapeOffset: &noOffSet, + skipOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -1065,10 +1068,9 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) { ctx := t.Context() app := teststorage.NewAppendable() - noOffSet := time.Duration(0) discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: true, - initialScrapeOffset: &noOffSet, + skipOffsetting: true, }, app, nil) defer scrapeManager.Stop() @@ -1600,122 +1602,204 @@ scrape_configs: } } -func TestManagerStopAfterScrapeAttempt(t *testing.T) { - noOffset := 0 * time.Nanosecond - largeOffset := 99 * time.Hour - oneSecondOffset := 1 * time.Second - tenSecondOffset := 10 * time.Second +// setupSynctestManager abstracts the boilerplate of creating a mock network, +// starting the fake HTTP server, and configuring the scrape manager for synctest. +func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) { + t.Helper() + app := teststorage.NewAppendable() + + srvConn, cliConn := net.Pipe() + + cleanup := func() { + srvConn.Close() + cliConn.Close() + } + + go startFakeHTTPServer(t, srvConn) + + if opts == nil { + opts = &Options{} + } + opts.skipOffsetting = true // Eliminates random jitter, making timing exact + opts.HTTPClientOptions = []config_util.HTTPClientOption{ + config_util.WithDialContextFunc(func(ctx context.Context, network, addr string) (net.Conn, error) { + return cliConn, nil + }), + } + + scrapeManager, err := NewManager( + opts, + promslog.New(&promslog.Config{}), + nil, nil, app, prometheus.NewRegistry(), + ) + require.NoError(t, err) + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(interval), + ScrapeTimeout: model.Duration(interval), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: "http", + model.AddressLabel: "test.local", + }}, + }}, + }) + + scrapeManager.reload() + + return scrapeManager, app, cleanup +} + +// Helper function to act as a fake HTTP server over a net.Conn +func startFakeHTTPServer(t *testing.T, conn net.Conn) { + t.Helper() + reader := bufio.NewReader(conn) + for { + req, err := http.ReadRequest(reader) + if err != nil { + // net.Pipe returns io.ErrClosedPipe when closed during test teardown. + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { + return + } + t.Errorf("fake HTTP server failed to read request: %v", err) + return + } + + _, err = io.Copy(io.Discard, req.Body) + req.Body.Close() + if err != nil { + t.Errorf("fake HTTP server failed to read request body: %v", err) + return + } + + body := "expected_metric 1\n" + + response := fmt.Sprintf("HTTP/1.1 200 OK\r\n"+ + "Content-Type: text/plain; version=0.0.4\r\n"+ + "Content-Length: %d\r\n"+ + "\r\n"+ + "%s", len(body), body) + + _, err = conn.Write([]byte(response)) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { + return + } + t.Errorf("fake HTTP server failed to write response: %v", err) + return + } + } +} + +func TestManager_InitialScrapeOffset(t *testing.T) { interval := 10 * time.Second + for _, tcase := range []struct { name string - scrapeOnShutdown bool - stopDelay time.Duration + initialScrapeOffset time.Duration + runDuration time.Duration expectedSamples int - initialScrapeOffset *time.Duration }{ { - name: "no scrape on stop, with offset of 10s", - initialScrapeOffset: &tenSecondOffset, - stopDelay: 5 * time.Second, - expectedSamples: 0, - scrapeOnShutdown: false, + name: "zero offset scrapes immediately", + expectedSamples: 1, }, { - name: "no scrape on stop, no offset", - initialScrapeOffset: &noOffset, - stopDelay: 5 * time.Second, + name: "zero offset scrapes twice after one interval", + runDuration: interval, + expectedSamples: 2, + }, + { + name: "large offset prevents immediate scrape", + initialScrapeOffset: 1 * time.Hour, + runDuration: 59 * time.Minute, + }, + { + name: "scrape happens exactly when large offset elapses", + initialScrapeOffset: 1 * time.Hour, + runDuration: 1 * time.Hour, expectedSamples: 1, - scrapeOnShutdown: false, - }, - { - name: "scrape on stop, no offset", - initialScrapeOffset: &noOffset, - stopDelay: 5 * time.Second, - expectedSamples: 2, - scrapeOnShutdown: true, - }, - { - name: "scrape on stop, with large offset", - initialScrapeOffset: &largeOffset, - stopDelay: 5 * time.Second, - expectedSamples: 1, - scrapeOnShutdown: true, - }, - { - name: "scrape on stop after 5s, with offset of 1s", - initialScrapeOffset: &oneSecondOffset, - stopDelay: 5 * time.Second, - expectedSamples: 2, - scrapeOnShutdown: true, - }, - { - name: "scrape on stop after 5s, with offset of 10s", - initialScrapeOffset: &tenSecondOffset, - stopDelay: 5 * time.Second, - expectedSamples: 1, - scrapeOnShutdown: true, }, } { t.Run(tcase.name, func(t *testing.T) { - t.Parallel() - app := teststorage.NewAppendable() - // Setup scrape manager. - scrapeManager, err := NewManager( - &Options{ - ScrapeOnShutdown: tcase.scrapeOnShutdown, - initialScrapeOffset: tcase.initialScrapeOffset, - }, - promslog.New(&promslog.Config{}), - nil, - nil, - app, - prometheus.NewRegistry(), - ) - require.NoError(t, err) - cfg := &config.Config{ - GlobalConfig: config.GlobalConfig{ - ScrapeInterval: model.Duration(interval), - ScrapeTimeout: model.Duration(interval), - ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, - }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, - } - cfgText, err := yaml.Marshal(*cfg) - require.NoError(t, err) - cfg = loadConfiguration(t, string(cfgText)) - require.NoError(t, scrapeManager.ApplyConfig(cfg)) + synctest.Test(t, func(t *testing.T) { + opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset} + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + defer cleanupConns() - // Start fake HTTP target to scrape returning a single metric. - server := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte("expected_metric 1\n")) - }), - ) - defer server.Close() - serverURL, err := url.Parse(server.URL) - require.NoError(t, err) - // Add fake target directly into tsets + reload. Normally users would use - // Manager.Run and wait for minimum 5s refresh interval. - scrapeManager.updateTsets(map[string][]*targetgroup.Group{ - "test": { - { - Targets: []model.LabelSet{{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }}, - }, - }, + // Wait for the scrape manager to block on its timers + synctest.Wait() + + // Fast-forward the fake clock by the test case's run duration + time.Sleep(tcase.runDuration) + synctest.Wait() + + // Stop the manager to clean up background goroutines + scrapeManager.Stop() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples) + }) + }) + } +} + +func TestManager_ScrapeOnShutdown(t *testing.T) { + interval := 10 * time.Second + + for _, tcase := range []struct { + name string + scrapeOnShutdown bool + runDuration time.Duration + expectedSamplesTotal int + }{ + { + name: "no scrape on shutdown", + scrapeOnShutdown: false, + expectedSamplesTotal: 1, + }, + { + name: "scrape on shutdown", + scrapeOnShutdown: true, + expectedSamplesTotal: 2, + }, + { + name: "scrape on shutdown after some scrapes", + scrapeOnShutdown: true, + runDuration: interval, + expectedSamplesTotal: 3, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{ScrapeOnShutdown: tcase.scrapeOnShutdown} + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + defer cleanupConns() + + // Wait for the initial scrape to happen exactly at t=0 + synctest.Wait() + + // Fast-forward fake time to simulate scheduled scrapes before shutdown + if tcase.runDuration > 0 { + time.Sleep(tcase.runDuration) + synctest.Wait() + } + + // Stop the manager. This triggers the ScrapeOnShutdown logic synchronously. + scrapeManager.Stop() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal) }) - scrapeManager.offsetSeed = uint64(0) - scrapeManager.reload() - - // Wait for the defined stop delay, before stopping. - time.Sleep(tcase.stopDelay) - scrapeManager.Stop() - - // Verify results. - require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples) }) } } diff --git a/scrape/scrape.go b/scrape/scrape.go index 9f31e637e8..b2af698043 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -267,6 +267,9 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() + // Cancel the pool's base context only after all loops have stopped. + // This ensures that loops performing a final ScrapeOnShutdown have + // a valid, uncanceled appender context to successfully write their data. sp.cancel() sp.client.CloseIdleConnections() @@ -875,8 +878,9 @@ type scrapeLoop struct { reportExtraMetrics bool appendMetadataToWAL bool passMetadataInContext bool + skipOffsetting bool // For testability. scrapeOnShutdown bool - initialScrapeOffset *time.Duration // For testing only: overrides the computed scrape offset. + initialScrapeOffset time.Duration // error injection through setForcedError. forcedErr error forcedErrMtx sync.Mutex @@ -1229,8 +1233,9 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels, appendMetadataToWAL: opts.sp.options.AppendMetadata, passMetadataInContext: opts.sp.options.PassMetadataInContext, + skipOffsetting: opts.sp.options.skipOffsetting, scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown, - initialScrapeOffset: opts.sp.options.initialScrapeOffset, + initialScrapeOffset: opts.sp.options.InitialScrapeOffset, } } @@ -1244,10 +1249,11 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) { } func getScrapeOffset(sl *scrapeLoop) time.Duration { - if sl.initialScrapeOffset == nil { - return sl.scraper.offset(sl.interval, sl.offsetSeed) + offset := sl.scraper.offset(sl.interval, sl.offsetSeed) + if sl.skipOffsetting { + offset = time.Duration(0) } - return *sl.initialScrapeOffset + return sl.initialScrapeOffset + offset } func (sl *scrapeLoop) run(errc chan<- error) { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 2d02818238..cab2b2918a 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -6652,8 +6652,7 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) { } sa := selectAppendable(s, appV2) - noOffSet := time.Duration(0) - sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{initialScrapeOffset: &noOffSet}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop()