mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-05 20:36:13 +02:00
scrape: replace skipOffsetting to make the test offset deterministic instead of skipping it entirely
Signed-off-by: avilevy <avilevy@google.com>
This commit is contained in:
parent
8f35d4e343
commit
a134497161
@ -99,6 +99,8 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo
|
||||
validationScheme: model.UTF8Validation,
|
||||
symbolTable: labels.NewSymbolTable(),
|
||||
appendMetadataToWAL: true, // Tests assumes it's enabled, unless explicitly turned off.
|
||||
initialScrapeOffset: nil,
|
||||
scrapeOnShutdown: false,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(sl)
|
||||
|
||||
@ -126,13 +126,19 @@ type Options struct {
|
||||
// FeatureRegistry is the registry for tracking enabled/disabled features.
|
||||
FeatureRegistry features.Collector
|
||||
|
||||
// Option to allow a final scrape before the manager is shutdown. Useful
|
||||
// for serverless flavours of OTel's prometheusreceiver which might require
|
||||
// a final scrape of targets before the instance is shutdown.
|
||||
// Option to allow a final scrape before the manager is shutdown. This is useful
|
||||
// for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
|
||||
// which might require a final scrape of targets before the instance is shutdown.
|
||||
//
|
||||
// Note: This final scrape ignores the configured scrape interval. If the time
|
||||
// elapsed since the last scrape is short, some backends (e.g. Google Cloud Monitoring)
|
||||
// may reject the data points due to timestamps being too close together.
|
||||
ScrapeOnShutdown bool
|
||||
|
||||
// private option for testability.
|
||||
skipOffsetting bool
|
||||
// initialScrapeOffset is a private option strictly for testing. It overrides
|
||||
// the standard scrape offset to manually control execution timing during
|
||||
// test runs.
|
||||
initialScrapeOffset *time.Duration
|
||||
}
|
||||
|
||||
// Manager maintains a set of scrape pools and manages start/stop cycles
|
||||
|
||||
@ -765,9 +765,10 @@ func TestManagerSTZeroIngestion(t *testing.T) {
|
||||
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, stTs)
|
||||
|
||||
app := teststorage.NewAppendable()
|
||||
noOffSet := time.Duration(0)
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: testSTZeroIngest,
|
||||
skipOffsetting: true,
|
||||
initialScrapeOffset: &noOffSet,
|
||||
}, app, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
@ -951,9 +952,10 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
app := teststorage.NewAppendable()
|
||||
noOffSet := time.Duration(0)
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
|
||||
skipOffsetting: true,
|
||||
initialScrapeOffset: &noOffSet,
|
||||
}, app, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
@ -1063,9 +1065,10 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
app := teststorage.NewAppendable()
|
||||
noOffSet := time.Duration(0)
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: true,
|
||||
skipOffsetting: true,
|
||||
initialScrapeOffset: &noOffSet,
|
||||
}, app, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
@ -1598,36 +1601,59 @@ scrape_configs:
|
||||
}
|
||||
|
||||
func TestManagerStopAfterScrapeAttempt(t *testing.T) {
|
||||
noOffset := 0 * time.Nanosecond
|
||||
largeOffset := 99 * time.Hour
|
||||
oneSecondOffset := 1 * time.Second
|
||||
tenSecondOffset := 10 * time.Second
|
||||
interval := 10 * time.Second
|
||||
for _, tcase := range []struct {
|
||||
name string
|
||||
scrapeOnShutdown bool
|
||||
stopDelay time.Duration
|
||||
expectedSamples int
|
||||
name string
|
||||
scrapeOnShutdown bool
|
||||
stopDelay time.Duration
|
||||
expectedSamples int
|
||||
initialScrapeOffset *time.Duration
|
||||
}{
|
||||
{
|
||||
name: "no scrape on shutdown before next interval",
|
||||
stopDelay: 5 * time.Second,
|
||||
expectedSamples: 1,
|
||||
scrapeOnShutdown: false,
|
||||
name: "no scrape on stop, with offset of 10s",
|
||||
initialScrapeOffset: &tenSecondOffset,
|
||||
stopDelay: 5 * time.Second,
|
||||
expectedSamples: 0,
|
||||
scrapeOnShutdown: false,
|
||||
},
|
||||
{
|
||||
name: "no scrape on shutdown before next interval",
|
||||
stopDelay: 11 * time.Second,
|
||||
expectedSamples: 2,
|
||||
scrapeOnShutdown: false,
|
||||
name: "no scrape on stop, no offset",
|
||||
initialScrapeOffset: &noOffset,
|
||||
stopDelay: 5 * time.Second,
|
||||
expectedSamples: 1,
|
||||
scrapeOnShutdown: false,
|
||||
},
|
||||
{
|
||||
name: "no scrape on shutdown before next interval",
|
||||
stopDelay: 5 * time.Second,
|
||||
expectedSamples: 2,
|
||||
scrapeOnShutdown: true,
|
||||
name: "scrape on stop, no offset",
|
||||
initialScrapeOffset: &noOffset,
|
||||
stopDelay: 5 * time.Second,
|
||||
expectedSamples: 2,
|
||||
scrapeOnShutdown: true,
|
||||
},
|
||||
{
|
||||
name: "scrape on shutdown after next interval",
|
||||
stopDelay: 11 * time.Second,
|
||||
expectedSamples: 3,
|
||||
scrapeOnShutdown: true,
|
||||
name: "scrape on stop, with large offset",
|
||||
initialScrapeOffset: &largeOffset,
|
||||
stopDelay: 5 * time.Second,
|
||||
expectedSamples: 1,
|
||||
scrapeOnShutdown: true,
|
||||
},
|
||||
{
|
||||
name: "scrape on stop after 5s, with offset of 1s",
|
||||
initialScrapeOffset: &oneSecondOffset,
|
||||
stopDelay: 5 * time.Second,
|
||||
expectedSamples: 2,
|
||||
scrapeOnShutdown: true,
|
||||
},
|
||||
{
|
||||
name: "scrape on stop after 5s, with offset of 10s",
|
||||
initialScrapeOffset: &tenSecondOffset,
|
||||
stopDelay: 5 * time.Second,
|
||||
expectedSamples: 1,
|
||||
scrapeOnShutdown: true,
|
||||
},
|
||||
} {
|
||||
t.Run(tcase.name, func(t *testing.T) {
|
||||
@ -1636,8 +1662,8 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) {
|
||||
// Setup scrape manager.
|
||||
scrapeManager, err := NewManager(
|
||||
&Options{
|
||||
ScrapeOnShutdown: tcase.scrapeOnShutdown,
|
||||
skipOffsetting: true,
|
||||
ScrapeOnShutdown: tcase.scrapeOnShutdown,
|
||||
initialScrapeOffset: tcase.initialScrapeOffset,
|
||||
},
|
||||
promslog.New(&promslog.Config{}),
|
||||
nil,
|
||||
@ -1681,6 +1707,7 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
scrapeManager.offsetSeed = uint64(0)
|
||||
scrapeManager.reload()
|
||||
|
||||
// Wait for the defined stop delay, before stopping.
|
||||
|
||||
@ -876,7 +876,7 @@ type scrapeLoop struct {
|
||||
appendMetadataToWAL bool
|
||||
passMetadataInContext bool
|
||||
scrapeOnShutdown bool
|
||||
skipOffsetting bool // For testability.
|
||||
initialScrapeOffset *time.Duration // For testing only: overrides the computed scrape offset.
|
||||
// error injection through setForcedError.
|
||||
forcedErr error
|
||||
forcedErrMtx sync.Mutex
|
||||
@ -1230,7 +1230,7 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
|
||||
appendMetadataToWAL: opts.sp.options.AppendMetadata,
|
||||
passMetadataInContext: opts.sp.options.PassMetadataInContext,
|
||||
scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown,
|
||||
skipOffsetting: opts.sp.options.skipOffsetting,
|
||||
initialScrapeOffset: opts.sp.options.initialScrapeOffset,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1243,17 +1243,22 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
|
||||
sl.scrapeFailureLogger = l
|
||||
}
|
||||
|
||||
func calculateScrapeOffset(sl *scrapeLoop) time.Duration {
|
||||
if sl.initialScrapeOffset == nil {
|
||||
return sl.scraper.offset(sl.interval, sl.offsetSeed)
|
||||
}
|
||||
return *sl.initialScrapeOffset
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) run(errc chan<- error) {
|
||||
if !sl.skipOffsetting {
|
||||
select {
|
||||
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
|
||||
// Continue after a scraping offset.
|
||||
case <-sl.shutdownScrape:
|
||||
sl.cancel()
|
||||
case <-sl.ctx.Done():
|
||||
close(sl.stopped)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-time.After(calculateScrapeOffset(sl)):
|
||||
// Continue after a scraping offset.
|
||||
case <-sl.shutdownScrape:
|
||||
sl.cancel()
|
||||
case <-sl.ctx.Done():
|
||||
close(sl.stopped)
|
||||
return
|
||||
}
|
||||
|
||||
var last time.Time
|
||||
|
||||
@ -6652,7 +6652,8 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) {
|
||||
}
|
||||
|
||||
sa := selectAppendable(s, appV2)
|
||||
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t))
|
||||
noOffSet := time.Duration(0)
|
||||
sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{initialScrapeOffset: &noOffSet}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user