diff --git a/scrape/manager.go b/scrape/manager.go index 24a63b056b..bba820e1d8 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -126,6 +126,11 @@ type Options struct { // FeatureRegistry is the registry for tracking enabled/disabled features. FeatureRegistry features.Collector + // Option to allow a final scrape before the manager is shutdown. Useful + // for serverless flavours of OTel's prometheusreceiver which might require + // a final scrape of targets before the instance is shutdown. + ScrapeOnShutdown bool + // private option for testability. skipOffsetting bool } diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 395cc98a82..84e5ed9bb2 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -1596,3 +1596,99 @@ scrape_configs: } } } + +func TestManagerStopAfterScrapeAttempt(t *testing.T) { + interval := 10 * time.Second + for _, tcase := range []struct { + name string + scrapeOnShutdown bool + stopDelay time.Duration + expectedSamples int + }{ + { + name: "no scrape on shutdown before next interval", + stopDelay: 5 * time.Second, + expectedSamples: 1, + scrapeOnShutdown: false, + }, + { + name: "no scrape on shutdown before next interval", + stopDelay: 11 * time.Second, + expectedSamples: 2, + scrapeOnShutdown: false, + }, + { + name: "no scrape on shutdown before next interval", + stopDelay: 5 * time.Second, + expectedSamples: 2, + scrapeOnShutdown: true, + }, + { + name: "scrape on shutdown after next interval", + stopDelay: 11 * time.Second, + expectedSamples: 3, + scrapeOnShutdown: true, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + t.Parallel() + app := teststorage.NewAppendable() + // Setup scrape manager. + scrapeManager, err := NewManager( + &Options{ + ScrapeOnShutdown: tcase.scrapeOnShutdown, + skipOffsetting: true, + }, + promslog.New(&promslog.Config{}), + nil, + nil, + app, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(interval), + ScrapeTimeout: model.Duration(interval), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + // Start fake HTTP target to scrape returning a single metric. + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", `text/plain; version=0.0.4`) + w.Write([]byte("expected_metric 1\n")) + }), + ) + defer server.Close() + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + // Add fake target directly into tsets + reload. Normally users would use + // Manager.Run and wait for minimum 5s refresh interval. + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": { + { + Targets: []model.LabelSet{{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }}, + }, + }, + }) + scrapeManager.reload() + + // Wait for the defined stop delay, before stopping. + time.Sleep(tcase.stopDelay) + scrapeManager.Stop() + + // Verify results. + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples) + }) + } +} diff --git a/scrape/scrape.go b/scrape/scrape.go index d5a9ba72b4..6562200f48 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -248,7 +248,6 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger { func (sp *scrapePool) stop() { sp.mtx.Lock() defer sp.mtx.Unlock() - sp.cancel() var wg sync.WaitGroup sp.targetMtx.Lock() @@ -268,6 +267,7 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() + sp.cancel() sp.client.CloseIdleConnections() if sp.config != nil { @@ -830,13 +830,14 @@ type cacheEntry struct { type scrapeLoop struct { // Parameters. - ctx context.Context - cancel func() - stopped chan struct{} - parentCtx context.Context - appenderCtx context.Context - l *slog.Logger - cache *scrapeCache + ctx context.Context + cancel func() + stopped chan struct{} + shutdownScrape chan struct{} + parentCtx context.Context + appenderCtx context.Context + l *slog.Logger + cache *scrapeCache interval time.Duration timeout time.Duration @@ -874,8 +875,8 @@ type scrapeLoop struct { reportExtraMetrics bool appendMetadataToWAL bool passMetadataInContext bool + scrapeOnShutdown bool skipOffsetting bool // For testability. - // error injection through setForcedError. forcedErr error forcedErrMtx sync.Mutex @@ -1177,13 +1178,14 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { ctx, cancel := context.WithCancel(opts.sp.ctx) return &scrapeLoop{ - ctx: ctx, - cancel: cancel, - stopped: make(chan struct{}), - parentCtx: opts.sp.ctx, - appenderCtx: appenderCtx, - l: opts.sp.logger.With("target", opts.target), - cache: opts.cache, + ctx: ctx, + cancel: cancel, + stopped: make(chan struct{}), + shutdownScrape: make(chan struct{}), + parentCtx: opts.sp.ctx, + appenderCtx: appenderCtx, + l: opts.sp.logger.With("target", opts.target), + cache: opts.cache, interval: opts.interval, timeout: opts.timeout, @@ -1227,6 +1229,7 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels, appendMetadataToWAL: opts.sp.options.AppendMetadata, passMetadataInContext: opts.sp.options.PassMetadataInContext, + scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown, skipOffsetting: opts.sp.options.skipOffsetting, } } @@ -1245,6 +1248,8 @@ func (sl *scrapeLoop) run(errc chan<- error) { select { case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): // Continue after a scraping offset. + case <-sl.shutdownScrape: + sl.cancel() case <-sl.ctx.Done(): close(sl.stopped) return @@ -1259,15 +1264,6 @@ func (sl *scrapeLoop) run(errc chan<- error) { mainLoop: for { - select { - case <-sl.parentCtx.Done(): - close(sl.stopped) - return - case <-sl.ctx.Done(): - break mainLoop - default: - } - // Temporary workaround for a jitter in go timers that causes disk space // increase in TSDB. // See https://github.com/prometheus/prometheus/issues/7846 @@ -1296,6 +1292,8 @@ mainLoop: return case <-sl.ctx.Done(): break mainLoop + case <-sl.shutdownScrape: + sl.cancel() case <-ticker.C: } } @@ -1523,7 +1521,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Stop the scraping. May still write data and stale markers after it has // returned. Cancel the context to stop all writes. func (sl *scrapeLoop) stop() { - sl.cancel() + if sl.scrapeOnShutdown { + sl.shutdownScrape <- struct{}{} + } else { + sl.cancel() + } <-sl.stopped }