diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 1db229561d..ecc9672a9b 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -99,6 +99,8 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo validationScheme: model.UTF8Validation, symbolTable: labels.NewSymbolTable(), appendMetadataToWAL: true, // Tests assumes it's enabled, unless explicitly turned off. + initialScrapeOffset: nil, + scrapeOnShutdown: false, } for _, o := range opts { o(sl) diff --git a/scrape/manager.go b/scrape/manager.go index bba820e1d8..7e3560a7a0 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -126,13 +126,19 @@ type Options struct { // FeatureRegistry is the registry for tracking enabled/disabled features. FeatureRegistry features.Collector - // Option to allow a final scrape before the manager is shutdown. Useful - // for serverless flavours of OTel's prometheusreceiver which might require - // a final scrape of targets before the instance is shutdown. + // Option to allow a final scrape before the manager is shutdown. This is useful + // for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver + // which might require a final scrape of targets before the instance is shutdown. + // + // Note: This final scrape ignores the configured scrape interval. If the time + // elapsed since the last scrape is short, some backends (e.g. Google Cloud Monitoring) + // may reject the data points due to timestamps being too close together. ScrapeOnShutdown bool - // private option for testability. - skipOffsetting bool + // initialScrapeOffset is a private option strictly for testing. It overrides + // the standard scrape offset to manually control execution timing during + // test runs. + initialScrapeOffset *time.Duration } // Manager maintains a set of scrape pools and manages start/stop cycles diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 84e5ed9bb2..92f14068d1 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -765,9 +765,10 @@ func TestManagerSTZeroIngestion(t *testing.T) { encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, stTs) app := teststorage.NewAppendable() + noOffSet := time.Duration(0) discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: testSTZeroIngest, - skipOffsetting: true, + initialScrapeOffset: &noOffSet, }, app, nil) defer scrapeManager.Stop() @@ -951,9 +952,10 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) { defer cancel() app := teststorage.NewAppendable() + noOffSet := time.Duration(0) discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion, - skipOffsetting: true, + initialScrapeOffset: &noOffSet, }, app, nil) defer scrapeManager.Stop() @@ -1063,9 +1065,10 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) { ctx := t.Context() app := teststorage.NewAppendable() + noOffSet := time.Duration(0) discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: true, - skipOffsetting: true, + initialScrapeOffset: &noOffSet, }, app, nil) defer scrapeManager.Stop() @@ -1598,36 +1601,59 @@ scrape_configs: } func TestManagerStopAfterScrapeAttempt(t *testing.T) { + noOffset := 0 * time.Nanosecond + largeOffset := 99 * time.Hour + oneSecondOffset := 1 * time.Second + tenSecondOffset := 10 * time.Second interval := 10 * time.Second for _, tcase := range []struct { - name string - scrapeOnShutdown bool - stopDelay time.Duration - expectedSamples int + name string + scrapeOnShutdown bool + stopDelay time.Duration + expectedSamples int + initialScrapeOffset *time.Duration }{ { - name: "no scrape on shutdown before next interval", - stopDelay: 5 * time.Second, - expectedSamples: 1, - scrapeOnShutdown: false, + name: "no scrape on stop, with offset of 10s", + initialScrapeOffset: &tenSecondOffset, + stopDelay: 5 * time.Second, + expectedSamples: 0, + scrapeOnShutdown: false, }, { - name: "no scrape on shutdown before next interval", - stopDelay: 11 * time.Second, - expectedSamples: 2, - scrapeOnShutdown: false, + name: "no scrape on stop, no offset", + initialScrapeOffset: &noOffset, + stopDelay: 5 * time.Second, + expectedSamples: 1, + scrapeOnShutdown: false, }, { - name: "no scrape on shutdown before next interval", - stopDelay: 5 * time.Second, - expectedSamples: 2, - scrapeOnShutdown: true, + name: "scrape on stop, no offset", + initialScrapeOffset: &noOffset, + stopDelay: 5 * time.Second, + expectedSamples: 2, + scrapeOnShutdown: true, }, { - name: "scrape on shutdown after next interval", - stopDelay: 11 * time.Second, - expectedSamples: 3, - scrapeOnShutdown: true, + name: "scrape on stop, with large offset", + initialScrapeOffset: &largeOffset, + stopDelay: 5 * time.Second, + expectedSamples: 1, + scrapeOnShutdown: true, + }, + { + name: "scrape on stop after 5s, with offset of 1s", + initialScrapeOffset: &oneSecondOffset, + stopDelay: 5 * time.Second, + expectedSamples: 2, + scrapeOnShutdown: true, + }, + { + name: "scrape on stop after 5s, with offset of 10s", + initialScrapeOffset: &tenSecondOffset, + stopDelay: 5 * time.Second, + expectedSamples: 1, + scrapeOnShutdown: true, }, } { t.Run(tcase.name, func(t *testing.T) { @@ -1636,8 +1662,8 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { // Setup scrape manager. scrapeManager, err := NewManager( &Options{ - ScrapeOnShutdown: tcase.scrapeOnShutdown, - skipOffsetting: true, + ScrapeOnShutdown: tcase.scrapeOnShutdown, + initialScrapeOffset: tcase.initialScrapeOffset, }, promslog.New(&promslog.Config{}), nil, @@ -1681,6 +1707,7 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { }, }, }) + scrapeManager.offsetSeed = uint64(0) scrapeManager.reload() // Wait for the defined stop delay, before stopping. diff --git a/scrape/scrape.go b/scrape/scrape.go index 6562200f48..5ff7a5ecfc 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -876,7 +876,7 @@ type scrapeLoop struct { appendMetadataToWAL bool passMetadataInContext bool scrapeOnShutdown bool - skipOffsetting bool // For testability. + initialScrapeOffset *time.Duration // For testing only: overrides the computed scrape offset. // error injection through setForcedError. forcedErr error forcedErrMtx sync.Mutex @@ -1230,7 +1230,7 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { appendMetadataToWAL: opts.sp.options.AppendMetadata, passMetadataInContext: opts.sp.options.PassMetadataInContext, scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown, - skipOffsetting: opts.sp.options.skipOffsetting, + initialScrapeOffset: opts.sp.options.initialScrapeOffset, } } @@ -1243,17 +1243,22 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) { sl.scrapeFailureLogger = l } +func calculateScrapeOffset(sl *scrapeLoop) time.Duration { + if sl.initialScrapeOffset == nil { + return sl.scraper.offset(sl.interval, sl.offsetSeed) + } + return *sl.initialScrapeOffset +} + func (sl *scrapeLoop) run(errc chan<- error) { - if !sl.skipOffsetting { - select { - case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): - // Continue after a scraping offset. - case <-sl.shutdownScrape: - sl.cancel() - case <-sl.ctx.Done(): - close(sl.stopped) - return - } + select { + case <-time.After(calculateScrapeOffset(sl)): + // Continue after a scraping offset. + case <-sl.shutdownScrape: + sl.cancel() + case <-sl.ctx.Done(): + close(sl.stopped) + return } var last time.Time diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index cab2b2918a..2d02818238 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -6652,7 +6652,8 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) { } sa := selectAppendable(s, appV2) - sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t)) + noOffSet := time.Duration(0) + sp, err := newScrapePool(cfg, sa.V1(), sa.V2(), 0, nil, nil, &Options{initialScrapeOffset: &noOffSet}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop()