diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 1db229561d..45c89ad8d7 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -18,17 +18,24 @@ import ( "context" "encoding/binary" "fmt" + "net" "net/http" + "net/http/httptest" + "sync" "testing" "time" "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" + "go.yaml.in/yaml/v2" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -233,3 +240,132 @@ func TestSelectAppendable(t *testing.T) { } }) } + +// pipeListener is an in-memory net.Listener that connects a custom DialContext +// directly to the httptest Server without opening real OS ports. +type pipeListener struct { + conns chan net.Conn + closed chan struct{} + once sync.Once +} + +func newPipeListener() *pipeListener { + return &pipeListener{ + conns: make(chan net.Conn), + closed: make(chan struct{}), + } +} + +func (l *pipeListener) Accept() (net.Conn, error) { + select { + case c := <-l.conns: + return c, nil + case <-l.closed: + return nil, net.ErrClosed + } +} + +func (l *pipeListener) Close() error { + l.once.Do(func() { close(l.closed) }) + return nil +} + +// Dummy Addr implementation to satisfy the net.Listener interface. +type pipeAddr struct{} + +func (pipeAddr) Network() string { return "pipe" } +func (pipeAddr) String() string { return "pipe" } +func (*pipeListener) Addr() net.Addr { return pipeAddr{} } + +// startFakeHTTPServer spins up a httptest.Server bound to an in-memory +// pipeListener. It returns the listener (to be wired to a custom dialer) and a +// cleanup function to shut down the server. +func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) { + t.Helper() + + listener := newPipeListener() + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Abort if the request context is canceled (e.g., due to a scrape timeout). + select { + case <-r.Context().Done(): + return + default: + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintln(w, "expected_metric 1") + } + }) + + srv := httptest.NewUnstartedServer(handler) + srv.Listener = listener + + // Background goroutines inherit the synctest bubble safely. + srv.Start() + + return listener, srv.Close +} + +// setupSynctestManager abstracts the boilerplate of creating a mock network, +// starting the fake HTTP server, and configuring the scrape manager for synctest. +func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) { + t.Helper() + app := teststorage.NewAppendable() + + listener, cleanup := startFakeHTTPServer(t) + + if opts == nil { + opts = &Options{} + } + opts.skipJitterOffsetting = true + + // Ensure the scraper creates a new net.Pipe on every dial attempt + // and hands the server-side connection to the mock server's listener. + opts.HTTPClientOptions = []config_util.HTTPClientOption{ + config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { + srvConn, cliConn := net.Pipe() + + select { + case listener.conns <- srvConn: + // Give the client side to the scraper. + return cliConn, nil + case <-listener.closed: + return nil, net.ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + } + }), + } + + scrapeManager, err := NewManager( + opts, + promslog.New(&promslog.Config{}), + nil, nil, app, prometheus.NewRegistry(), + ) + require.NoError(t, err) + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(interval), + ScrapeTimeout: model.Duration(interval), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: "http", + model.AddressLabel: "test.local", + }}, + }}, + }) + + scrapeManager.reload() + + return scrapeManager, app, cleanup +} diff --git a/scrape/manager_test.go b/scrape/manager_test.go index b3ef0b21bc..3dc05db011 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "maps" - "net" "net/http" "net/http/httptest" "net/url" @@ -34,7 +33,6 @@ import ( "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" - config_util "github.com/prometheus/common/config" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" @@ -1587,7 +1585,7 @@ scrape_configs: // Disable end of run staleness markers for some targets. m.DisableEndOfRunStalenessMarkers("one", targetsToDisable) - // This should be a no-op + // This should be a no-op. m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable) // Check that the end of run staleness markers are disabled for the correct targets. @@ -1600,135 +1598,6 @@ scrape_configs: } } -// pipeListener is an in-memory net.Listener that connects a custom DialContext -// directly to the httptest Server without opening real OS ports. -type pipeListener struct { - conns chan net.Conn - closed chan struct{} - once sync.Once -} - -func newPipeListener() *pipeListener { - return &pipeListener{ - conns: make(chan net.Conn), - closed: make(chan struct{}), - } -} - -func (l *pipeListener) Accept() (net.Conn, error) { - select { - case c := <-l.conns: - return c, nil - case <-l.closed: - return nil, net.ErrClosed - } -} - -func (l *pipeListener) Close() error { - l.once.Do(func() { close(l.closed) }) - return nil -} - -// Dummy Addr implementation to satisfy the net.Listener interface. -type pipeAddr struct{} - -func (pipeAddr) Network() string { return "pipe" } -func (pipeAddr) String() string { return "pipe" } -func (*pipeListener) Addr() net.Addr { return pipeAddr{} } - -// startFakeHTTPServer spins up a httptest.Server bound to an in-memory -// pipeListener. It returns the listener (to be wired to a custom dialer) and a -// cleanup function to shut down the server. -func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) { - t.Helper() - - listener := newPipeListener() - - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Abort if the request context is canceled (e.g., due to a scrape timeout). - select { - case <-r.Context().Done(): - return - default: - w.Header().Set("Content-Type", "text/plain; version=0.0.4") - fmt.Fprintln(w, "expected_metric 1") - } - }) - - srv := httptest.NewUnstartedServer(handler) - srv.Listener = listener - - // Background goroutines inherit the synctest bubble safely - srv.Start() - - return listener, srv.Close -} - -// setupSynctestManager abstracts the boilerplate of creating a mock network, -// starting the fake HTTP server, and configuring the scrape manager for synctest. -func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) { - t.Helper() - app := teststorage.NewAppendable() - - listener, cleanup := startFakeHTTPServer(t) - - if opts == nil { - opts = &Options{} - } - opts.skipJitterOffsetting = true - - // Ensure the scraper creates a new net.Pipe on every dial attempt - // and hands the server-side connection to the mock server's listener. - opts.HTTPClientOptions = []config_util.HTTPClientOption{ - config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { - srvConn, cliConn := net.Pipe() - - select { - case listener.conns <- srvConn: - // Give the client side to the scraper - return cliConn, nil - case <-listener.closed: - return nil, net.ErrClosed - case <-ctx.Done(): - return nil, ctx.Err() - } - }), - } - - scrapeManager, err := NewManager( - opts, - promslog.New(&promslog.Config{}), - nil, nil, app, prometheus.NewRegistry(), - ) - require.NoError(t, err) - - cfg := &config.Config{ - GlobalConfig: config.GlobalConfig{ - ScrapeInterval: model.Duration(interval), - ScrapeTimeout: model.Duration(interval), - ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, - }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, - } - cfgText, err := yaml.Marshal(*cfg) - require.NoError(t, err) - cfg = loadConfiguration(t, string(cfgText)) - require.NoError(t, scrapeManager.ApplyConfig(cfg)) - - scrapeManager.updateTsets(map[string][]*targetgroup.Group{ - "test": {{ - Targets: []model.LabelSet{{ - model.SchemeLabel: "http", - model.AddressLabel: "test.local", - }}, - }}, - }) - - scrapeManager.reload() - - return scrapeManager, app, cleanup -} - func TestManager_InitialScrapeOffset(t *testing.T) { interval := 10 * time.Second @@ -1752,12 +1621,6 @@ func TestManager_InitialScrapeOffset(t *testing.T) { initialScrapeOffset: 1 * time.Hour, runDuration: 59 * time.Minute, }, - { - name: "scrape happens when large offset elapses", - initialScrapeOffset: 1 * time.Hour, - runDuration: 1*time.Hour + 2*time.Second, - expectedSamples: 2, - }, } { t.Run(tcase.name, func(t *testing.T) { synctest.Test(t, func(t *testing.T) { @@ -1765,14 +1628,14 @@ func TestManager_InitialScrapeOffset(t *testing.T) { scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) defer cleanupConns() - // Wait for the scrape manager to block on its timers + // Wait for the scrape manager to block on its timers. synctest.Wait() - // Fast-forward the fake clock by the test case's run duration + // Fast-forward the fake clock by the test case's run duration. time.Sleep(tcase.runDuration) synctest.Wait() - // Stop the manager to clean up background goroutines + // Stop the manager to clean up background goroutines. scrapeManager.Stop() require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamples) @@ -1787,6 +1650,7 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { for _, tcase := range []struct { name string scrapeOnShutdown bool + initialScrapeOffset time.Duration runDuration time.Duration expectedSamplesTotal int }{ @@ -1806,17 +1670,34 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { runDuration: interval, expectedSamplesTotal: 3, }, + { + name: "scrape on shutdown with initial offset", + scrapeOnShutdown: true, + initialScrapeOffset: 10 * time.Second, + runDuration: 5 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "scrape on shutdown with short running instance (offset 5s)", + scrapeOnShutdown: true, + initialScrapeOffset: 5 * time.Second, + runDuration: 8 * time.Second, + expectedSamplesTotal: 2, + }, } { t.Run(tcase.name, func(t *testing.T) { synctest.Test(t, func(t *testing.T) { - opts := &Options{ScrapeOnShutdown: tcase.scrapeOnShutdown} + opts := &Options{ + ScrapeOnShutdown: tcase.scrapeOnShutdown, + InitialScrapeOffset: tcase.initialScrapeOffset, + } scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) defer cleanupConns() - // Wait for the initial scrape to happen exactly at t=0 + // Wait for the initial scrape to happen exactly at t=0. synctest.Wait() - // Fast-forward fake time to simulate scheduled scrapes before shutdown + // Fast-forward fake time to simulate scheduled scrapes before shutdown. if tcase.runDuration > 0 { time.Sleep(tcase.runDuration) synctest.Wait() diff --git a/scrape/scrape.go b/scrape/scrape.go index 7fe9a4ad80..52d202669f 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -248,7 +248,6 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger { func (sp *scrapePool) stop() { sp.mtx.Lock() defer sp.mtx.Unlock() - sp.cancel() var wg sync.WaitGroup sp.targetMtx.Lock() @@ -268,6 +267,10 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() + // Cancel the context after all loops have stopped. This is required for + // scrapeOnShutdown to work properly, as the shutdown scrape uses this + // context (via sl.parentCtx) and would fail if the context was cancelled early. + sp.cancel() sp.client.CloseIdleConnections() if sp.config != nil { @@ -829,11 +832,17 @@ type cacheEntry struct { } type scrapeLoop struct { - // Parameters. - ctx context.Context - cancel func() - stopped chan struct{} - parentCtx context.Context + // ctx represents a local context that is cancellable via s.cancel. + // It's meant to synchronize run() with stop(). + // It inherits parentCtx. + ctx context.Context + cancel func() + stopped chan struct{} + // parentCtx represents manager-level context, typically connected + // to process shutdown. + parentCtx context.Context + // appenderCtx is a parentCtx with some extra context for appender + // implementations. Potentially remove-able with removal of AppenderV1. appenderCtx context.Context l *slog.Logger cache *scrapeCache @@ -1258,7 +1267,6 @@ func (sl *scrapeLoop) run(errc chan<- error) { ticker = time.NewTicker(sl.interval) ) defer func() { - ticker.Stop() if sl.scrapeOnShutdown { last = sl.scrapeAndReport(last, time.Now().Round(0), errc) } @@ -1269,6 +1277,7 @@ func (sl *scrapeLoop) run(errc chan<- error) { sl.endOfRunStaleness(last, ticker, sl.interval) } } + ticker.Stop() }() // Initial offset and jitter offset, if any.