mirror of
https://github.com/prometheus/prometheus.git
synced 2025-12-07 10:31:03 +01:00
Use Clock injection in ScrapeManager for testability
Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
This commit is contained in:
parent
e43f1bafca
commit
514f651110
2
go.mod
2
go.mod
@ -233,7 +233,7 @@ require (
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gotest.tools/v3 v3.0.3 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
|
||||
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect
|
||||
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
|
||||
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
|
||||
sigs.k8s.io/randfill v1.0.0 // indirect
|
||||
sigs.k8s.io/yaml v1.6.0 // indirect
|
||||
|
||||
@ -22,10 +22,12 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/stretchr/testify/require"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
@ -271,3 +273,9 @@ func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte {
|
||||
buf.Write(protoBuf)
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
// newTestFakeClock creates a new fake clock for testing.
|
||||
// The fake clock starts at the current time and can be advanced manually.
|
||||
func newTestFakeClock() *testingclock.FakeClock {
|
||||
return testingclock.NewFakeClock(time.Now())
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/promslog"
|
||||
"go.uber.org/atomic"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
@ -93,10 +94,22 @@ type Options struct {
|
||||
// Optional HTTP client options to use when scraping.
|
||||
HTTPClientOptions []config_util.HTTPClientOption
|
||||
|
||||
// Clock is used for time-related operations. If nil, the real clock is used.
|
||||
// This is primarily useful for testing to control time progression.
|
||||
Clock clock.WithTicker
|
||||
|
||||
// private option for testability.
|
||||
skipOffsetting bool
|
||||
}
|
||||
|
||||
// clock returns the clock from Options, or a real clock if not set.
|
||||
func (o *Options) clock() clock.WithTicker {
|
||||
if o.Clock != nil {
|
||||
return o.Clock
|
||||
}
|
||||
return clock.RealClock{}
|
||||
}
|
||||
|
||||
// Manager maintains a set of scrape pools and manages start/stop cycles
|
||||
// when receiving new target groups from the discovery manager.
|
||||
type Manager struct {
|
||||
|
||||
@ -39,6 +39,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.yaml.in/yaml/v2"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
@ -1206,6 +1207,11 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A
|
||||
app = nopAppendable{}
|
||||
}
|
||||
|
||||
// Tests can pass a fake clock via opts.Clock to control time.
|
||||
if opts.Clock == nil {
|
||||
opts.Clock = clock.RealClock{}
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -42,6 +42,7 @@ import (
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
@ -226,6 +227,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
||||
sp.validationScheme,
|
||||
sp.escapingScheme,
|
||||
opts.fallbackScrapeProtocol,
|
||||
options.clock(),
|
||||
)
|
||||
}
|
||||
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
|
||||
@ -976,6 +978,8 @@ type scrapeLoop struct {
|
||||
metrics *scrapeMetrics
|
||||
|
||||
skipOffsetting bool // For testability.
|
||||
|
||||
clock clock.WithTicker // Clock for time-related operations (testing).
|
||||
}
|
||||
|
||||
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
||||
@ -1269,6 +1273,7 @@ func newScrapeLoop(ctx context.Context,
|
||||
validationScheme model.ValidationScheme,
|
||||
escapingScheme model.EscapingScheme,
|
||||
fallbackScrapeProtocol string,
|
||||
clk clock.WithTicker,
|
||||
) *scrapeLoop {
|
||||
if l == nil {
|
||||
l = promslog.NewNopLogger()
|
||||
@ -1325,6 +1330,7 @@ func newScrapeLoop(ctx context.Context,
|
||||
skipOffsetting: skipOffsetting,
|
||||
validationScheme: validationScheme,
|
||||
escapingScheme: escapingScheme,
|
||||
clock: clk,
|
||||
}
|
||||
sl.ctx, sl.cancel = context.WithCancel(ctx)
|
||||
|
||||
@ -1343,7 +1349,7 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
|
||||
func (sl *scrapeLoop) run(errc chan<- error) {
|
||||
if !sl.skipOffsetting {
|
||||
select {
|
||||
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
|
||||
case <-sl.clock.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
|
||||
// Continue after a scraping offset.
|
||||
case <-sl.ctx.Done():
|
||||
close(sl.stopped)
|
||||
@ -1353,8 +1359,8 @@ func (sl *scrapeLoop) run(errc chan<- error) {
|
||||
|
||||
var last time.Time
|
||||
|
||||
alignedScrapeTime := time.Now().Round(0)
|
||||
ticker := time.NewTicker(sl.interval)
|
||||
alignedScrapeTime := sl.clock.Now().Round(0)
|
||||
ticker := sl.clock.NewTicker(sl.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
mainLoop:
|
||||
@ -1373,7 +1379,7 @@ mainLoop:
|
||||
// See https://github.com/prometheus/prometheus/issues/7846
|
||||
// Calling Round ensures the time used is the wall clock, as otherwise .Sub
|
||||
// and .Add on time.Time behave differently (see time package docs).
|
||||
scrapeTime := time.Now().Round(0)
|
||||
scrapeTime := sl.clock.Now().Round(0)
|
||||
if AlignScrapeTimestamps {
|
||||
// Tolerance is clamped to maximum 1% of the scrape interval.
|
||||
tolerance := min(sl.interval/100, ScrapeTimestampTolerance)
|
||||
@ -1396,7 +1402,7 @@ mainLoop:
|
||||
return
|
||||
case <-sl.ctx.Done():
|
||||
break mainLoop
|
||||
case <-ticker.C:
|
||||
case <-ticker.C():
|
||||
}
|
||||
}
|
||||
|
||||
@ -1531,7 +1537,7 @@ func (sl *scrapeLoop) getForcedError() error {
|
||||
return sl.forcedErr
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
|
||||
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker clock.Ticker, interval time.Duration) {
|
||||
// Scraping has stopped. We want to write stale markers but
|
||||
// the target may be recreated, so we wait just over 2 scrape intervals
|
||||
// before creating them.
|
||||
@ -1549,8 +1555,8 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
||||
select {
|
||||
case <-sl.parentCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
staleTime = time.Now()
|
||||
case <-ticker.C():
|
||||
staleTime = sl.clock.Now()
|
||||
}
|
||||
|
||||
// Wait for when the next scrape would have been, if the target was recreated
|
||||
@ -1558,14 +1564,14 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
||||
select {
|
||||
case <-sl.parentCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
case <-ticker.C():
|
||||
}
|
||||
|
||||
// Wait for an extra 10% of the interval, just to be safe.
|
||||
select {
|
||||
case <-sl.parentCtx.Done():
|
||||
return
|
||||
case <-time.After(interval / 10):
|
||||
case <-sl.clock.After(interval / 10):
|
||||
}
|
||||
|
||||
// Check if end-of-run staleness markers have been disabled while we were waiting.
|
||||
|
||||
@ -51,6 +51,7 @@ import (
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.uber.org/atomic"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
@ -969,6 +970,12 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
|
||||
}
|
||||
|
||||
func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string) *scrapeLoop {
|
||||
return newBasicScrapeLoopWithClock(t, ctx, scraper, app, interval, fallback, clock.RealClock{})
|
||||
}
|
||||
|
||||
// newBasicScrapeLoopWithClock creates a scrape loop with a custom clock for testing.
|
||||
// Use newTestFakeClock() to create a fake clock that can be controlled in tests.
|
||||
func newBasicScrapeLoopWithClock(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string, clk clock.WithTicker) *scrapeLoop {
|
||||
return newScrapeLoop(ctx,
|
||||
scraper,
|
||||
nil, nil,
|
||||
@ -999,6 +1006,7 @@ func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper s
|
||||
model.UTF8Validation,
|
||||
model.NoEscaping,
|
||||
fallback,
|
||||
clk,
|
||||
)
|
||||
}
|
||||
|
||||
@ -1054,14 +1062,15 @@ func nopMutator(l labels.Labels) labels.Labels { return l }
|
||||
|
||||
func TestScrapeLoopStop(t *testing.T) {
|
||||
var (
|
||||
signal = make(chan struct{}, 1)
|
||||
appender = &collectResultAppender{}
|
||||
scraper = &testScraper{}
|
||||
app = func(context.Context) storage.Appender { return appender }
|
||||
signal = make(chan struct{}, 1)
|
||||
appender = &collectResultAppender{}
|
||||
scraper = &testScraper{}
|
||||
app = func(context.Context) storage.Appender { return appender }
|
||||
fakeClock = newTestFakeClock()
|
||||
)
|
||||
|
||||
// Since we're writing samples directly below we need to provide a protocol fallback.
|
||||
sl := newBasicScrapeLoopWithFallback(t, context.Background(), scraper, app, 10*time.Millisecond, "text/plain")
|
||||
sl := newBasicScrapeLoopWithClock(t, context.Background(), scraper, app, 10*time.Millisecond, "text/plain", fakeClock)
|
||||
|
||||
// Terminate loop after 2 scrapes.
|
||||
numScrapes := 0
|
||||
@ -1081,11 +1090,24 @@ func TestScrapeLoopStop(t *testing.T) {
|
||||
signal <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-signal:
|
||||
case <-time.After(5 * time.Second):
|
||||
require.FailNow(t, "Scrape wasn't stopped.")
|
||||
// Wait for scrape loop to be waiting on clock, then advance time to trigger scrapes.
|
||||
// We need at least 2 scrapes to trigger the stop, plus additional time for shutdown.
|
||||
for {
|
||||
// Check if the loop has exited
|
||||
select {
|
||||
case <-signal:
|
||||
goto done
|
||||
default:
|
||||
}
|
||||
|
||||
// Wait for the loop to be waiting on the clock
|
||||
require.Eventually(t, func() bool {
|
||||
return fakeClock.HasWaiters()
|
||||
}, 100*time.Millisecond, 5*time.Millisecond, "Scrape loop should be waiting on clock")
|
||||
|
||||
fakeClock.Step(10 * time.Millisecond)
|
||||
}
|
||||
done:
|
||||
|
||||
// We expected 1 actual sample for each scrape plus 5 for report samples.
|
||||
// At least 2 scrapes were made, plus the final stale markers.
|
||||
@ -1116,6 +1138,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
||||
scraper = &testScraper{}
|
||||
app = func(context.Context) storage.Appender { return &nopAppender{} }
|
||||
scrapeMetrics = newTestScrapeMetrics(t)
|
||||
fakeClock = newTestFakeClock()
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -1149,6 +1172,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
||||
model.UTF8Validation,
|
||||
model.NoEscaping,
|
||||
"",
|
||||
fakeClock,
|
||||
)
|
||||
|
||||
// The loop must terminate during the initial offset if the context
|
||||
@ -1160,13 +1184,15 @@ func TestScrapeLoopRun(t *testing.T) {
|
||||
signal <- struct{}{}
|
||||
}()
|
||||
|
||||
// Wait to make sure we are actually waiting on the offset.
|
||||
time.Sleep(1 * time.Second)
|
||||
// Wait for the scrape loop to be waiting on the offset timer.
|
||||
require.Eventually(t, func() bool {
|
||||
return fakeClock.HasWaiters()
|
||||
}, 100*time.Millisecond, 5*time.Millisecond, "Scrape loop should be waiting on offset timer")
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-signal:
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
require.FailNow(t, "Cancellation during initial offset failed.")
|
||||
case err := <-errc:
|
||||
require.FailNow(t, "Unexpected error", "err: %s", err)
|
||||
@ -1186,6 +1212,8 @@ func TestScrapeLoopRun(t *testing.T) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// For timeout testing, we use real clock since context.WithTimeout
|
||||
// uses real time internally.
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
sl = newBasicScrapeLoop(t, ctx, scraper, app, time.Second)
|
||||
sl.timeout = 100 * time.Millisecond
|
||||
@ -1298,6 +1326,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
|
||||
model.UTF8Validation,
|
||||
model.NoEscaping,
|
||||
"",
|
||||
clock.RealClock{},
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user