Adding scape on shutdown

Signed-off-by: avilevy <avilevy@google.com>
This commit is contained in:
avilevy 2026-02-11 21:23:36 +00:00
parent 19fd0b0b1d
commit 8f35d4e343
No known key found for this signature in database
3 changed files with 129 additions and 26 deletions

View File

@ -126,6 +126,11 @@ type Options struct {
// FeatureRegistry is the registry for tracking enabled/disabled features.
FeatureRegistry features.Collector
// Option to allow a final scrape before the manager is shutdown. Useful
// for serverless flavours of OTel's prometheusreceiver which might require
// a final scrape of targets before the instance is shutdown.
ScrapeOnShutdown bool
// private option for testability.
skipOffsetting bool
}

View File

@ -1596,3 +1596,99 @@ scrape_configs:
}
}
}
func TestManagerStopAfterScrapeAttempt(t *testing.T) {
interval := 10 * time.Second
for _, tcase := range []struct {
name string
scrapeOnShutdown bool
stopDelay time.Duration
expectedSamples int
}{
{
name: "no scrape on shutdown before next interval",
stopDelay: 5 * time.Second,
expectedSamples: 1,
scrapeOnShutdown: false,
},
{
name: "no scrape on shutdown before next interval",
stopDelay: 11 * time.Second,
expectedSamples: 2,
scrapeOnShutdown: false,
},
{
name: "no scrape on shutdown before next interval",
stopDelay: 5 * time.Second,
expectedSamples: 2,
scrapeOnShutdown: true,
},
{
name: "scrape on shutdown after next interval",
stopDelay: 11 * time.Second,
expectedSamples: 3,
scrapeOnShutdown: true,
},
} {
t.Run(tcase.name, func(t *testing.T) {
t.Parallel()
app := teststorage.NewAppendable()
// Setup scrape manager.
scrapeManager, err := NewManager(
&Options{
ScrapeOnShutdown: tcase.scrapeOnShutdown,
skipOffsetting: true,
},
promslog.New(&promslog.Config{}),
nil,
nil,
app,
prometheus.NewRegistry(),
)
require.NoError(t, err)
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
ScrapeTimeout: model.Duration(interval),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}
cfgText, err := yaml.Marshal(*cfg)
require.NoError(t, err)
cfg = loadConfiguration(t, string(cfgText))
require.NoError(t, scrapeManager.ApplyConfig(cfg))
// Start fake HTTP target to scrape returning a single metric.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("expected_metric 1\n"))
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {
{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
},
},
})
scrapeManager.reload()
// Wait for the defined stop delay, before stopping.
time.Sleep(tcase.stopDelay)
scrapeManager.Stop()
// Verify results.
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples)
})
}
}

View File

@ -248,7 +248,6 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger {
func (sp *scrapePool) stop() {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.cancel()
var wg sync.WaitGroup
sp.targetMtx.Lock()
@ -268,6 +267,7 @@ func (sp *scrapePool) stop() {
sp.targetMtx.Unlock()
wg.Wait()
sp.cancel()
sp.client.CloseIdleConnections()
if sp.config != nil {
@ -830,13 +830,14 @@ type cacheEntry struct {
type scrapeLoop struct {
// Parameters.
ctx context.Context
cancel func()
stopped chan struct{}
parentCtx context.Context
appenderCtx context.Context
l *slog.Logger
cache *scrapeCache
ctx context.Context
cancel func()
stopped chan struct{}
shutdownScrape chan struct{}
parentCtx context.Context
appenderCtx context.Context
l *slog.Logger
cache *scrapeCache
interval time.Duration
timeout time.Duration
@ -874,8 +875,8 @@ type scrapeLoop struct {
reportExtraMetrics bool
appendMetadataToWAL bool
passMetadataInContext bool
scrapeOnShutdown bool
skipOffsetting bool // For testability.
// error injection through setForcedError.
forcedErr error
forcedErrMtx sync.Mutex
@ -1177,13 +1178,14 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
ctx, cancel := context.WithCancel(opts.sp.ctx)
return &scrapeLoop{
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
parentCtx: opts.sp.ctx,
appenderCtx: appenderCtx,
l: opts.sp.logger.With("target", opts.target),
cache: opts.cache,
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
shutdownScrape: make(chan struct{}),
parentCtx: opts.sp.ctx,
appenderCtx: appenderCtx,
l: opts.sp.logger.With("target", opts.target),
cache: opts.cache,
interval: opts.interval,
timeout: opts.timeout,
@ -1227,6 +1229,7 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels,
appendMetadataToWAL: opts.sp.options.AppendMetadata,
passMetadataInContext: opts.sp.options.PassMetadataInContext,
scrapeOnShutdown: opts.sp.options.ScrapeOnShutdown,
skipOffsetting: opts.sp.options.skipOffsetting,
}
}
@ -1245,6 +1248,8 @@ func (sl *scrapeLoop) run(errc chan<- error) {
select {
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
// Continue after a scraping offset.
case <-sl.shutdownScrape:
sl.cancel()
case <-sl.ctx.Done():
close(sl.stopped)
return
@ -1259,15 +1264,6 @@ func (sl *scrapeLoop) run(errc chan<- error) {
mainLoop:
for {
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
default:
}
// Temporary workaround for a jitter in go timers that causes disk space
// increase in TSDB.
// See https://github.com/prometheus/prometheus/issues/7846
@ -1296,6 +1292,8 @@ mainLoop:
return
case <-sl.ctx.Done():
break mainLoop
case <-sl.shutdownScrape:
sl.cancel()
case <-ticker.C:
}
}
@ -1523,7 +1521,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// Stop the scraping. May still write data and stale markers after it has
// returned. Cancel the context to stop all writes.
func (sl *scrapeLoop) stop() {
sl.cancel()
if sl.scrapeOnShutdown {
sl.shutdownScrape <- struct{}{}
} else {
sl.cancel()
}
<-sl.stopped
}