scrape: fix scrapeOnShutdown context bug and refactor test helpers

The scrapeOnShutdown feature was failing during manager shutdown because
the scrape pool context was being cancelled before the final shutdown
scrapes could execute. Fix this by delaying context cancellation
in scrapePool.stop() until after all scrape loops have stopped.
In addition:
- Added test cases to verify scrapeOnShutdown works with InitialScrapeOffset.
- Refactored network test helper functions from manager_test.go to
  helpers_test.go.
- Addressed other comments.

Signed-off-by: avilevy <avilevy@google.com>
This commit is contained in:
avilevy 2026-03-14 01:46:00 +00:00
parent dc0d919f4b
commit e681c53ba7
No known key found for this signature in database
3 changed files with 177 additions and 151 deletions

View File

@ -18,17 +18,24 @@ import (
"context"
"encoding/binary"
"fmt"
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"go.yaml.in/yaml/v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -233,3 +240,132 @@ func TestSelectAppendable(t *testing.T) {
}
})
}
// pipeListener is an in-memory net.Listener that connects a custom DialContext
// directly to the httptest Server without opening real OS ports.
type pipeListener struct {
conns chan net.Conn
closed chan struct{}
once sync.Once
}
func newPipeListener() *pipeListener {
return &pipeListener{
conns: make(chan net.Conn),
closed: make(chan struct{}),
}
}
func (l *pipeListener) Accept() (net.Conn, error) {
select {
case c := <-l.conns:
return c, nil
case <-l.closed:
return nil, net.ErrClosed
}
}
func (l *pipeListener) Close() error {
l.once.Do(func() { close(l.closed) })
return nil
}
// Dummy Addr implementation to satisfy the net.Listener interface.
type pipeAddr struct{}
func (pipeAddr) Network() string { return "pipe" }
func (pipeAddr) String() string { return "pipe" }
func (*pipeListener) Addr() net.Addr { return pipeAddr{} }
// startFakeHTTPServer spins up a httptest.Server bound to an in-memory
// pipeListener. It returns the listener (to be wired to a custom dialer) and a
// cleanup function to shut down the server.
func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) {
t.Helper()
listener := newPipeListener()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Abort if the request context is canceled (e.g., due to a scrape timeout).
select {
case <-r.Context().Done():
return
default:
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
fmt.Fprintln(w, "expected_metric 1")
}
})
srv := httptest.NewUnstartedServer(handler)
srv.Listener = listener
// Background goroutines inherit the synctest bubble safely.
srv.Start()
return listener, srv.Close
}
// setupSynctestManager abstracts the boilerplate of creating a mock network,
// starting the fake HTTP server, and configuring the scrape manager for synctest.
func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) {
t.Helper()
app := teststorage.NewAppendable()
listener, cleanup := startFakeHTTPServer(t)
if opts == nil {
opts = &Options{}
}
opts.skipJitterOffsetting = true
// Ensure the scraper creates a new net.Pipe on every dial attempt
// and hands the server-side connection to the mock server's listener.
opts.HTTPClientOptions = []config_util.HTTPClientOption{
config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) {
srvConn, cliConn := net.Pipe()
select {
case listener.conns <- srvConn:
// Give the client side to the scraper.
return cliConn, nil
case <-listener.closed:
return nil, net.ErrClosed
case <-ctx.Done():
return nil, ctx.Err()
}
}),
}
scrapeManager, err := NewManager(
opts,
promslog.New(&promslog.Config{}),
nil, nil, app, prometheus.NewRegistry(),
)
require.NoError(t, err)
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
ScrapeTimeout: model.Duration(interval),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}
cfgText, err := yaml.Marshal(*cfg)
require.NoError(t, err)
cfg = loadConfiguration(t, string(cfgText))
require.NoError(t, scrapeManager.ApplyConfig(cfg))
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: "http",
model.AddressLabel: "test.local",
}},
}},
})
scrapeManager.reload()
return scrapeManager, app, cleanup
}

View File

@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"maps"
"net"
"net/http"
"net/http/httptest"
"net/url"
@ -34,7 +33,6 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
@ -1587,7 +1585,7 @@ scrape_configs:
// Disable end of run staleness markers for some targets.
m.DisableEndOfRunStalenessMarkers("one", targetsToDisable)
// This should be a no-op
// This should be a no-op.
m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable)
// Check that the end of run staleness markers are disabled for the correct targets.
@ -1600,135 +1598,6 @@ scrape_configs:
}
}
// pipeListener is an in-memory net.Listener that connects a custom DialContext
// directly to the httptest Server without opening real OS ports.
type pipeListener struct {
conns chan net.Conn
closed chan struct{}
once sync.Once
}
func newPipeListener() *pipeListener {
return &pipeListener{
conns: make(chan net.Conn),
closed: make(chan struct{}),
}
}
func (l *pipeListener) Accept() (net.Conn, error) {
select {
case c := <-l.conns:
return c, nil
case <-l.closed:
return nil, net.ErrClosed
}
}
func (l *pipeListener) Close() error {
l.once.Do(func() { close(l.closed) })
return nil
}
// Dummy Addr implementation to satisfy the net.Listener interface.
type pipeAddr struct{}
func (pipeAddr) Network() string { return "pipe" }
func (pipeAddr) String() string { return "pipe" }
func (*pipeListener) Addr() net.Addr { return pipeAddr{} }
// startFakeHTTPServer spins up a httptest.Server bound to an in-memory
// pipeListener. It returns the listener (to be wired to a custom dialer) and a
// cleanup function to shut down the server.
func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) {
t.Helper()
listener := newPipeListener()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Abort if the request context is canceled (e.g., due to a scrape timeout).
select {
case <-r.Context().Done():
return
default:
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
fmt.Fprintln(w, "expected_metric 1")
}
})
srv := httptest.NewUnstartedServer(handler)
srv.Listener = listener
// Background goroutines inherit the synctest bubble safely
srv.Start()
return listener, srv.Close
}
// setupSynctestManager abstracts the boilerplate of creating a mock network,
// starting the fake HTTP server, and configuring the scrape manager for synctest.
func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) {
t.Helper()
app := teststorage.NewAppendable()
listener, cleanup := startFakeHTTPServer(t)
if opts == nil {
opts = &Options{}
}
opts.skipJitterOffsetting = true
// Ensure the scraper creates a new net.Pipe on every dial attempt
// and hands the server-side connection to the mock server's listener.
opts.HTTPClientOptions = []config_util.HTTPClientOption{
config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) {
srvConn, cliConn := net.Pipe()
select {
case listener.conns <- srvConn:
// Give the client side to the scraper
return cliConn, nil
case <-listener.closed:
return nil, net.ErrClosed
case <-ctx.Done():
return nil, ctx.Err()
}
}),
}
scrapeManager, err := NewManager(
opts,
promslog.New(&promslog.Config{}),
nil, nil, app, prometheus.NewRegistry(),
)
require.NoError(t, err)
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
ScrapeTimeout: model.Duration(interval),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}
cfgText, err := yaml.Marshal(*cfg)
require.NoError(t, err)
cfg = loadConfiguration(t, string(cfgText))
require.NoError(t, scrapeManager.ApplyConfig(cfg))
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: "http",
model.AddressLabel: "test.local",
}},
}},
})
scrapeManager.reload()
return scrapeManager, app, cleanup
}
func TestManager_InitialScrapeOffset(t *testing.T) {
interval := 10 * time.Second
@ -1752,12 +1621,6 @@ func TestManager_InitialScrapeOffset(t *testing.T) {
initialScrapeOffset: 1 * time.Hour,
runDuration: 59 * time.Minute,
},
{
name: "scrape happens when large offset elapses",
initialScrapeOffset: 1 * time.Hour,
runDuration: 1*time.Hour + 2*time.Second,
expectedSamples: 2,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
@ -1765,14 +1628,14 @@ func TestManager_InitialScrapeOffset(t *testing.T) {
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
defer cleanupConns()
// Wait for the scrape manager to block on its timers
// Wait for the scrape manager to block on its timers.
synctest.Wait()
// Fast-forward the fake clock by the test case's run duration
// Fast-forward the fake clock by the test case's run duration.
time.Sleep(tcase.runDuration)
synctest.Wait()
// Stop the manager to clean up background goroutines
// Stop the manager to clean up background goroutines.
scrapeManager.Stop()
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples)
@ -1787,6 +1650,7 @@ func TestManager_ScrapeOnShutdown(t *testing.T) {
for _, tcase := range []struct {
name string
scrapeOnShutdown bool
initialScrapeOffset time.Duration
runDuration time.Duration
expectedSamplesTotal int
}{
@ -1806,17 +1670,34 @@ func TestManager_ScrapeOnShutdown(t *testing.T) {
runDuration: interval,
expectedSamplesTotal: 3,
},
{
name: "scrape on shutdown with initial offset",
scrapeOnShutdown: true,
initialScrapeOffset: 10 * time.Second,
runDuration: 5 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "scrape on shutdown with short running instance (offset 5s)",
scrapeOnShutdown: true,
initialScrapeOffset: 5 * time.Second,
runDuration: 8 * time.Second,
expectedSamplesTotal: 2,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{ScrapeOnShutdown: tcase.scrapeOnShutdown}
opts := &Options{
ScrapeOnShutdown: tcase.scrapeOnShutdown,
InitialScrapeOffset: tcase.initialScrapeOffset,
}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
defer cleanupConns()
// Wait for the initial scrape to happen exactly at t=0
// Wait for the initial scrape to happen exactly at t=0.
synctest.Wait()
// Fast-forward fake time to simulate scheduled scrapes before shutdown
// Fast-forward fake time to simulate scheduled scrapes before shutdown.
if tcase.runDuration > 0 {
time.Sleep(tcase.runDuration)
synctest.Wait()

View File

@ -248,7 +248,6 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger {
func (sp *scrapePool) stop() {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.cancel()
var wg sync.WaitGroup
sp.targetMtx.Lock()
@ -268,6 +267,10 @@ func (sp *scrapePool) stop() {
sp.targetMtx.Unlock()
wg.Wait()
// Cancel the context after all loops have stopped. This is required for
// scrapeOnShutdown to work properly, as the shutdown scrape uses this
// context (via sl.parentCtx) and would fail if the context was cancelled early.
sp.cancel()
sp.client.CloseIdleConnections()
if sp.config != nil {
@ -829,11 +832,17 @@ type cacheEntry struct {
}
type scrapeLoop struct {
// Parameters.
ctx context.Context
cancel func()
stopped chan struct{}
parentCtx context.Context
// ctx represents a local context that is cancellable via s.cancel.
// It's meant to synchronize run() with stop().
// It inherits parentCtx.
ctx context.Context
cancel func()
stopped chan struct{}
// parentCtx represents manager-level context, typically connected
// to process shutdown.
parentCtx context.Context
// appenderCtx is a parentCtx with some extra context for appender
// implementations. Potentially remove-able with removal of AppenderV1.
appenderCtx context.Context
l *slog.Logger
cache *scrapeCache
@ -1258,7 +1267,6 @@ func (sl *scrapeLoop) run(errc chan<- error) {
ticker = time.NewTicker(sl.interval)
)
defer func() {
ticker.Stop()
if sl.scrapeOnShutdown {
last = sl.scrapeAndReport(last, time.Now().Round(0), errc)
}
@ -1269,6 +1277,7 @@ func (sl *scrapeLoop) run(errc chan<- error) {
sl.endOfRunStaleness(last, ticker, sl.interval)
}
}
ticker.Stop()
}()
// Initial offset and jitter offset, if any.