test(scrape): refactor time-based manager tests to use synctest

Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests.

Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic.

- Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble.
- Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable.
- Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest.
- Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven.

Signed-off-by: avilevy <avilevy@google.com>
This commit is contained in:
avilevy 2026-03-07 01:54:04 +00:00
parent df7bd65f58
commit 0e478d1f3f
No known key found for this signature in database
5 changed files with 233 additions and 128 deletions

View File

@ -99,7 +99,7 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo
validationScheme: model.UTF8Validation,
symbolTable: labels.NewSymbolTable(),
appendMetadataToWAL: true, // Tests assumes it's enabled, unless explicitly turned off.
initialScrapeOffset: nil,
initialScrapeOffset: time.Duration(0),
scrapeOnShutdown: false,
}
for _, o := range opts {

View File

@ -126,19 +126,27 @@ type Options struct {
// FeatureRegistry is the registry for tracking enabled/disabled features.
FeatureRegistry features.Collector
// Option to allow a final scrape before the manager is shutdown. This is useful
// for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
// which might require a final scrape of targets before the instance is shutdown.
// private option for testability.
skipOffsetting bool
// Option to allow a final scrape before the manager closes. This is useful
// for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless
// job scenarios, allowing an extra scrape for the short-living edge cases.
//
// Note: This final scrape ignores the configured scrape interval. If the time
// elapsed since the last scrape is short, some backends (e.g. Google Cloud Monitoring)
// may reject the data points due to timestamps being too close together.
// NOTE: This final scrape ignores the configured scrape interval.
ScrapeOnShutdown bool
// initialScrapeOffset is a private option strictly for testing. It overrides
// the standard scrape offset to manually control execution timing during
// test runs.
initialScrapeOffset *time.Duration
// InitialScrapeOffset applies an additional baseline delay before we begin
// scraping targets. By default, Prometheus calculates a specific offset for
// each target to spread the scraping load evenly across the server. Configuring
// this option adds a fixed duration to that target-specific offset. This allows
// tuning the initial startup delay without overriding the underlying target
// jitter, preserving proper load balancing across the scraper pools.
//
// NOTE: This option is not used by the standard Prometheus server. It was
// created for use in agent mode or in OTel's prometheusreceiver when
// used in serverless job scenarios.
InitialScrapeOffset time.Duration
}
// Manager maintains a set of scrape pools and manages start/stop cycles
@ -327,8 +335,16 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.scrapeFailureLoggers = scrapeFailureLoggers
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
// Skip offset seed calculation during tests.
// setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using
// a global singleflight goroutine. This cross-boundary communication breaks
// synctest's isolation bubble and causes a fatal panic.
if m.opts.skipOffsetting {
m.offsetSeed = 0
} else {
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}
}
// Cleanup and reload pool if the configuration has changed.

View File

@ -14,11 +14,14 @@
package scrape
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"maps"
"net"
"net/http"
"net/http/httptest"
"net/url"
@ -33,9 +36,11 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/util/testutil/synctest"
"github.com/stretchr/testify/require"
"go.yaml.in/yaml/v2"
"google.golang.org/protobuf/types/known/timestamppb"
@ -765,10 +770,9 @@ func TestManagerSTZeroIngestion(t *testing.T) {
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, stTs)
app := teststorage.NewAppendable()
noOffSet := time.Duration(0)
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: testSTZeroIngest,
initialScrapeOffset: &noOffSet,
skipOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -952,10 +956,9 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
defer cancel()
app := teststorage.NewAppendable()
noOffSet := time.Duration(0)
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
initialScrapeOffset: &noOffSet,
skipOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -1065,10 +1068,9 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
ctx := t.Context()
app := teststorage.NewAppendable()
noOffSet := time.Duration(0)
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: true,
initialScrapeOffset: &noOffSet,
skipOffsetting: true,
}, app, nil)
defer scrapeManager.Stop()
@ -1600,122 +1602,204 @@ scrape_configs:
}
}
func TestManagerStopAfterScrapeAttempt(t *testing.T) {
noOffset := 0 * time.Nanosecond
largeOffset := 99 * time.Hour
oneSecondOffset := 1 * time.Second
tenSecondOffset := 10 * time.Second
// setupSynctestManager abstracts the boilerplate of creating a mock network,
// starting the fake HTTP server, and configuring the scrape manager for synctest.
func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) {
t.Helper()
app := teststorage.NewAppendable()
srvConn, cliConn := net.Pipe()
cleanup := func() {
srvConn.Close()
cliConn.Close()
}
go startFakeHTTPServer(t, srvConn)
if opts == nil {
opts = &Options{}
}
opts.skipOffsetting = true // Eliminates random jitter, making timing exact
opts.HTTPClientOptions = []config_util.HTTPClientOption{
config_util.WithDialContextFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
return cliConn, nil
}),
}
scrapeManager, err := NewManager(
opts,
promslog.New(&promslog.Config{}),
nil, nil, app, prometheus.NewRegistry(),
)
require.NoError(t, err)
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
ScrapeTimeout: model.Duration(interval),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}
cfgText, err := yaml.Marshal(*cfg)
require.NoError(t, err)
cfg = loadConfiguration(t, string(cfgText))
require.NoError(t, scrapeManager.ApplyConfig(cfg))
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: "http",
model.AddressLabel: "test.local",
}},
}},
})
scrapeManager.reload()
return scrapeManager, app, cleanup
}
// Helper function to act as a fake HTTP server over a net.Conn
func startFakeHTTPServer(t *testing.T, conn net.Conn) {
t.Helper()
reader := bufio.NewReader(conn)
for {
req, err := http.ReadRequest(reader)
if err != nil {
// net.Pipe returns io.ErrClosedPipe when closed during test teardown.
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
return
}
t.Errorf("fake HTTP server failed to read request: %v", err)
return
}
_, err = io.Copy(io.Discard, req.Body)
req.Body.Close()
if err != nil {
t.Errorf("fake HTTP server failed to read request body: %v", err)
return
}
body := "expected_metric 1\n"
response := fmt.Sprintf("HTTP/1.1 200 OK\r\n"+
"Content-Type: text/plain; version=0.0.4\r\n"+
"Content-Length: %d\r\n"+
"\r\n"+
"%s", len(body), body)
_, err = conn.Write([]byte(response))
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
return
}
t.Errorf("fake HTTP server failed to write response: %v", err)
return
}
}
}
func TestManager_InitialScrapeOffset(t *testing.T) {
interval := 10 * time.Second
for _, tcase := range []struct {
name string
scrapeOnShutdown bool
stopDelay time.Duration
initialScrapeOffset time.Duration
runDuration time.Duration
expectedSamples int
initialScrapeOffset *time.Duration
}{
{
name: "no scrape on stop, with offset of 10s",
initialScrapeOffset: &tenSecondOffset,
stopDelay: 5 * time.Second,
expectedSamples: 0,
scrapeOnShutdown: false,
name: "zero offset scrapes immediately",
expectedSamples: 1,
},
{
name: "no scrape on stop, no offset",
initialScrapeOffset: &noOffset,
stopDelay: 5 * time.Second,
name: "zero offset scrapes twice after one interval",
runDuration: interval,
expectedSamples: 2,
},
{
name: "large offset prevents immediate scrape",
initialScrapeOffset: 1 * time.Hour,
runDuration: 59 * time.Minute,
},
{
name: "scrape happens exactly when large offset elapses",
initialScrapeOffset: 1 * time.Hour,
runDuration: 1 * time.Hour,
expectedSamples: 1,
scrapeOnShutdown: false,
},
{
name: "scrape on stop, no offset",
initialScrapeOffset: &noOffset,
stopDelay: 5 * time.Second,
expectedSamples: 2,
scrapeOnShutdown: true,
},
{
name: "scrape on stop, with large offset",
initialScrapeOffset: &largeOffset,
stopDelay: 5 * time.Second,
expectedSamples: 1,
scrapeOnShutdown: true,
},
{
name: "scrape on stop after 5s, with offset of 1s",
initialScrapeOffset: &oneSecondOffset,
stopDelay: 5 * time.Second,
expectedSamples: 2,
scrapeOnShutdown: true,
},
{
name: "scrape on stop after 5s, with offset of 10s",
initialScrapeOffset: &tenSecondOffset,
stopDelay: 5 * time.Second,
expectedSamples: 1,
scrapeOnShutdown: true,
},
} {
t.Run(tcase.name, func(t *testing.T) {
t.Parallel()
app := teststorage.NewAppendable()
// Setup scrape manager.
scrapeManager, err := NewManager(
&Options{
ScrapeOnShutdown: tcase.scrapeOnShutdown,
initialScrapeOffset: tcase.initialScrapeOffset,
},
promslog.New(&promslog.Config{}),
nil,
nil,
app,
prometheus.NewRegistry(),
)
require.NoError(t, err)
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
ScrapeTimeout: model.Duration(interval),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}
cfgText, err := yaml.Marshal(*cfg)
require.NoError(t, err)
cfg = loadConfiguration(t, string(cfgText))
require.NoError(t, scrapeManager.ApplyConfig(cfg))
synctest.Test(t, func(t *testing.T) {
opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
defer cleanupConns()
// Start fake HTTP target to scrape returning a single metric.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("expected_metric 1\n"))
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {
{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
},
},
// Wait for the scrape manager to block on its timers
synctest.Wait()
// Fast-forward the fake clock by the test case's run duration
time.Sleep(tcase.runDuration)
synctest.Wait()
// Stop the manager to clean up background goroutines
scrapeManager.Stop()
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples)
})
})
}
}
func TestManager_ScrapeOnShutdown(t *testing.T) {
interval := 10 * time.Second
for _, tcase := range []struct {
name string
scrapeOnShutdown bool
runDuration time.Duration
expectedSamplesTotal int
}{
{
name: "no scrape on shutdown",
scrapeOnShutdown: false,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown",
scrapeOnShutdown: true,
expectedSamplesTotal: 2,
},
{
name: "scrape on shutdown after some scrapes",
scrapeOnShutdown: true,
runDuration: interval,
expectedSamplesTotal: 3,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{ScrapeOnShutdown: tcase.scrapeOnShutdown}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
defer cleanupConns()
// Wait for the initial scrape to happen exactly at t=0
synctest.Wait()
// Fast-forward fake time to simulate scheduled scrapes before shutdown
if tcase.runDuration > 0 {
time.Sleep(tcase.runDuration)
synctest.Wait()
}
// Stop the manager. This triggers the ScrapeOnShutdown logic synchronously.
scrapeManager.Stop()
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal)
})
scrapeManager.offsetSeed = uint64(0)
scrapeManager.reload()
// Wait for the defined stop delay, before stopping.
time.Sleep(tcase.stopDelay)
scrapeManager.Stop()
// Verify results.
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples)
})
}
}

View File

@ -267,6 +267,9 @@ func (sp *scrapePool) stop() {
sp.targetMtx.Unlock()
wg.Wait()
// Cancel the pool's base context only after all loops have stopped.
// This ensures that loops performing a final ScrapeOnShutdown have
// a valid, uncanceled appender context to successfully write their data.
sp.cancel()
sp.client.CloseIdleConnections()
@ -875,8 +878,9 @@ type scrapeLoop struct {
reportExtraMetrics bool
appendMetadataToWAL bool
passMetadataInContext bool
skipOffsetting bool // For testability.
scrapeOnShutdown bool
initialScrapeOffset *time.Duration // For testing only: overrides the computed scrape offset.
initialScrapeOffset time.Duration
// error injection through setForcedError.
forcedErr error
forcedErrMtx sync.Mutex
@ -1229,8 +1233,9 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels,
appendMetadataToWAL: opts.sp.options.AppendMetadata,
passMetadataInContext: opts.sp.options.PassMetadataInContext,
skipOffsetting: opts.sp.options.skipOffsetting,
scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown,
initialScrapeOffset: opts.sp.options.initialScrapeOffset,
initialScrapeOffset: opts.sp.options.InitialScrapeOffset,
}
}
@ -1244,10 +1249,11 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
}
func getScrapeOffset(sl *scrapeLoop) time.Duration {
if sl.initialScrapeOffset == nil {
return sl.scraper.offset(sl.interval, sl.offsetSeed)
offset := sl.scraper.offset(sl.interval, sl.offsetSeed)
if sl.skipOffsetting {
offset = time.Duration(0)
}
return *sl.initialScrapeOffset
return sl.initialScrapeOffset + offset
}
func (sl *scrapeLoop) run(errc chan<- error) {

View File

@ -6652,8 +6652,7 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) {
}
sa := selectAppendable(s, appV2)
noOffSet := time.Duration(0)
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{initialScrapeOffset: &noOffSet}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()