From 514f651110a9f4c00cdf2e8c8ae6bd5764084a06 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Tue, 25 Nov 2025 19:35:36 -0300 Subject: [PATCH] Use Clock injection in ScrapeManager for testability Signed-off-by: Arthur Silva Sens --- go.mod | 2 +- scrape/helpers_test.go | 8 +++++++ scrape/manager.go | 13 +++++++++++ scrape/manager_test.go | 6 +++++ scrape/scrape.go | 26 +++++++++++++-------- scrape/scrape_test.go | 53 ++++++++++++++++++++++++++++++++---------- 6 files changed, 85 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 55b8d2ce1f..d8f7ab7e0d 100644 --- a/go.mod +++ b/go.mod @@ -233,7 +233,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gotest.tools/v3 v3.0.3 // indirect k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect - k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect + k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/yaml v1.6.0 // indirect diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index ff7a7bf65a..ae0edec596 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -22,10 +22,12 @@ import ( "strings" "sync" "testing" + "time" "github.com/gogo/protobuf/proto" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + testingclock "k8s.io/utils/clock/testing" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -271,3 +273,9 @@ func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte { buf.Write(protoBuf) return buf.Bytes() } + +// newTestFakeClock creates a new fake clock for testing. +// The fake clock starts at the current time and can be advanced manually. +func newTestFakeClock() *testingclock.FakeClock { + return testingclock.NewFakeClock(time.Now()) +} diff --git a/scrape/manager.go b/scrape/manager.go index c63d7d0eae..7abb2ead0d 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" "go.uber.org/atomic" + "k8s.io/utils/clock" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -93,10 +94,22 @@ type Options struct { // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption + // Clock is used for time-related operations. If nil, the real clock is used. + // This is primarily useful for testing to control time progression. + Clock clock.WithTicker + // private option for testability. skipOffsetting bool } +// clock returns the clock from Options, or a real clock if not set. +func (o *Options) clock() clock.WithTicker { + if o.Clock != nil { + return o.Clock + } + return clock.RealClock{} +} + // Manager maintains a set of scrape pools and manages start/stop cycles // when receiving new target groups from the discovery manager. type Manager struct { diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 1ec4875d19..407d1939c0 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -39,6 +39,7 @@ import ( "github.com/stretchr/testify/require" "go.yaml.in/yaml/v2" "google.golang.org/protobuf/types/known/timestamppb" + "k8s.io/utils/clock" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" @@ -1206,6 +1207,11 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A app = nopAppendable{} } + // Tests can pass a fake clock via opts.Clock to control time. + if opts.Clock == nil { + opts.Clock = clock.RealClock{} + } + reg := prometheus.NewRegistry() sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) require.NoError(t, err) diff --git a/scrape/scrape.go b/scrape/scrape.go index db662cb089..3caa7a35a5 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -42,6 +42,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" + "k8s.io/utils/clock" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -226,6 +227,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed sp.validationScheme, sp.escapingScheme, opts.fallbackScrapeProtocol, + options.clock(), ) } sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit)) @@ -976,6 +978,8 @@ type scrapeLoop struct { metrics *scrapeMetrics skipOffsetting bool // For testability. + + clock clock.WithTicker // Clock for time-related operations (testing). } // scrapeCache tracks mappings of exposed metric strings to label sets and @@ -1269,6 +1273,7 @@ func newScrapeLoop(ctx context.Context, validationScheme model.ValidationScheme, escapingScheme model.EscapingScheme, fallbackScrapeProtocol string, + clk clock.WithTicker, ) *scrapeLoop { if l == nil { l = promslog.NewNopLogger() @@ -1325,6 +1330,7 @@ func newScrapeLoop(ctx context.Context, skipOffsetting: skipOffsetting, validationScheme: validationScheme, escapingScheme: escapingScheme, + clock: clk, } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -1343,7 +1349,7 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) { func (sl *scrapeLoop) run(errc chan<- error) { if !sl.skipOffsetting { select { - case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): + case <-sl.clock.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): // Continue after a scraping offset. case <-sl.ctx.Done(): close(sl.stopped) @@ -1353,8 +1359,8 @@ func (sl *scrapeLoop) run(errc chan<- error) { var last time.Time - alignedScrapeTime := time.Now().Round(0) - ticker := time.NewTicker(sl.interval) + alignedScrapeTime := sl.clock.Now().Round(0) + ticker := sl.clock.NewTicker(sl.interval) defer ticker.Stop() mainLoop: @@ -1373,7 +1379,7 @@ mainLoop: // See https://github.com/prometheus/prometheus/issues/7846 // Calling Round ensures the time used is the wall clock, as otherwise .Sub // and .Add on time.Time behave differently (see time package docs). - scrapeTime := time.Now().Round(0) + scrapeTime := sl.clock.Now().Round(0) if AlignScrapeTimestamps { // Tolerance is clamped to maximum 1% of the scrape interval. tolerance := min(sl.interval/100, ScrapeTimestampTolerance) @@ -1396,7 +1402,7 @@ mainLoop: return case <-sl.ctx.Done(): break mainLoop - case <-ticker.C: + case <-ticker.C(): } } @@ -1531,7 +1537,7 @@ func (sl *scrapeLoop) getForcedError() error { return sl.forcedErr } -func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) { +func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker clock.Ticker, interval time.Duration) { // Scraping has stopped. We want to write stale markers but // the target may be recreated, so we wait just over 2 scrape intervals // before creating them. @@ -1549,8 +1555,8 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int select { case <-sl.parentCtx.Done(): return - case <-ticker.C: - staleTime = time.Now() + case <-ticker.C(): + staleTime = sl.clock.Now() } // Wait for when the next scrape would have been, if the target was recreated @@ -1558,14 +1564,14 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int select { case <-sl.parentCtx.Done(): return - case <-ticker.C: + case <-ticker.C(): } // Wait for an extra 10% of the interval, just to be safe. select { case <-sl.parentCtx.Done(): return - case <-time.After(interval / 10): + case <-sl.clock.After(interval / 10): } // Check if end-of-run staleness markers have been disabled while we were waiting. diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 5ccdb80019..6ddf937a75 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -51,6 +51,7 @@ import ( "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/atomic" + "k8s.io/utils/clock" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" @@ -969,6 +970,12 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app } func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string) *scrapeLoop { + return newBasicScrapeLoopWithClock(t, ctx, scraper, app, interval, fallback, clock.RealClock{}) +} + +// newBasicScrapeLoopWithClock creates a scrape loop with a custom clock for testing. +// Use newTestFakeClock() to create a fake clock that can be controlled in tests. +func newBasicScrapeLoopWithClock(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string, clk clock.WithTicker) *scrapeLoop { return newScrapeLoop(ctx, scraper, nil, nil, @@ -999,6 +1006,7 @@ func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper s model.UTF8Validation, model.NoEscaping, fallback, + clk, ) } @@ -1054,14 +1062,15 @@ func nopMutator(l labels.Labels) labels.Labels { return l } func TestScrapeLoopStop(t *testing.T) { var ( - signal = make(chan struct{}, 1) - appender = &collectResultAppender{} - scraper = &testScraper{} - app = func(context.Context) storage.Appender { return appender } + signal = make(chan struct{}, 1) + appender = &collectResultAppender{} + scraper = &testScraper{} + app = func(context.Context) storage.Appender { return appender } + fakeClock = newTestFakeClock() ) // Since we're writing samples directly below we need to provide a protocol fallback. - sl := newBasicScrapeLoopWithFallback(t, context.Background(), scraper, app, 10*time.Millisecond, "text/plain") + sl := newBasicScrapeLoopWithClock(t, context.Background(), scraper, app, 10*time.Millisecond, "text/plain", fakeClock) // Terminate loop after 2 scrapes. numScrapes := 0 @@ -1081,11 +1090,24 @@ func TestScrapeLoopStop(t *testing.T) { signal <- struct{}{} }() - select { - case <-signal: - case <-time.After(5 * time.Second): - require.FailNow(t, "Scrape wasn't stopped.") + // Wait for scrape loop to be waiting on clock, then advance time to trigger scrapes. + // We need at least 2 scrapes to trigger the stop, plus additional time for shutdown. + for { + // Check if the loop has exited + select { + case <-signal: + goto done + default: + } + + // Wait for the loop to be waiting on the clock + require.Eventually(t, func() bool { + return fakeClock.HasWaiters() + }, 100*time.Millisecond, 5*time.Millisecond, "Scrape loop should be waiting on clock") + + fakeClock.Step(10 * time.Millisecond) } +done: // We expected 1 actual sample for each scrape plus 5 for report samples. // At least 2 scrapes were made, plus the final stale markers. @@ -1116,6 +1138,7 @@ func TestScrapeLoopRun(t *testing.T) { scraper = &testScraper{} app = func(context.Context) storage.Appender { return &nopAppender{} } scrapeMetrics = newTestScrapeMetrics(t) + fakeClock = newTestFakeClock() ) ctx, cancel := context.WithCancel(context.Background()) @@ -1149,6 +1172,7 @@ func TestScrapeLoopRun(t *testing.T) { model.UTF8Validation, model.NoEscaping, "", + fakeClock, ) // The loop must terminate during the initial offset if the context @@ -1160,13 +1184,15 @@ func TestScrapeLoopRun(t *testing.T) { signal <- struct{}{} }() - // Wait to make sure we are actually waiting on the offset. - time.Sleep(1 * time.Second) + // Wait for the scrape loop to be waiting on the offset timer. + require.Eventually(t, func() bool { + return fakeClock.HasWaiters() + }, 100*time.Millisecond, 5*time.Millisecond, "Scrape loop should be waiting on offset timer") cancel() select { case <-signal: - case <-time.After(5 * time.Second): + case <-time.After(100 * time.Millisecond): require.FailNow(t, "Cancellation during initial offset failed.") case err := <-errc: require.FailNow(t, "Unexpected error", "err: %s", err) @@ -1186,6 +1212,8 @@ func TestScrapeLoopRun(t *testing.T) { return nil } + // For timeout testing, we use real clock since context.WithTimeout + // uses real time internally. ctx, cancel = context.WithCancel(context.Background()) sl = newBasicScrapeLoop(t, ctx, scraper, app, time.Second) sl.timeout = 100 * time.Millisecond @@ -1298,6 +1326,7 @@ func TestScrapeLoopMetadata(t *testing.T) { model.UTF8Validation, model.NoEscaping, "", + clock.RealClock{}, ) defer cancel()