mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-06 04:46:13 +02:00
Merge pull request #18127 from roidelapluie/roidelapluie/fix-scrape-logger-race
scrape: Fix race condition in scrapeFailureLogger access
This commit is contained in:
commit
3e8d15bb6c
@ -105,6 +105,7 @@ type scrapePool struct {
|
||||
activeTargets map[uint64]*Target
|
||||
droppedTargets []*Target // Subject to KeepDroppedTargets limit.
|
||||
droppedTargetsCount int // Count of all dropped targets.
|
||||
scrapeFailureLogger FailureLogger
|
||||
|
||||
// newLoop injection for testing purposes.
|
||||
injectTestNewLoop func(scrapeLoopOptions) loop
|
||||
@ -112,9 +113,6 @@ type scrapePool struct {
|
||||
metrics *scrapeMetrics
|
||||
buffers *pool.Pool
|
||||
offsetSeed uint64
|
||||
|
||||
scrapeFailureLogger FailureLogger
|
||||
scrapeFailureLoggerMtx sync.RWMutex
|
||||
}
|
||||
|
||||
type labelLimits struct {
|
||||
@ -224,26 +222,18 @@ func (sp *scrapePool) DroppedTargetsCount() int {
|
||||
}
|
||||
|
||||
func (sp *scrapePool) SetScrapeFailureLogger(l FailureLogger) {
|
||||
sp.scrapeFailureLoggerMtx.Lock()
|
||||
defer sp.scrapeFailureLoggerMtx.Unlock()
|
||||
sp.targetMtx.Lock()
|
||||
defer sp.targetMtx.Unlock()
|
||||
if l != nil {
|
||||
l = slog.New(l).With("job_name", sp.config.JobName).Handler().(FailureLogger)
|
||||
}
|
||||
sp.scrapeFailureLogger = l
|
||||
|
||||
sp.targetMtx.Lock()
|
||||
defer sp.targetMtx.Unlock()
|
||||
for _, s := range sp.loops {
|
||||
s.setScrapeFailureLogger(sp.scrapeFailureLogger)
|
||||
}
|
||||
}
|
||||
|
||||
func (sp *scrapePool) getScrapeFailureLogger() FailureLogger {
|
||||
sp.scrapeFailureLoggerMtx.RLock()
|
||||
defer sp.scrapeFailureLoggerMtx.RUnlock()
|
||||
return sp.scrapeFailureLogger
|
||||
}
|
||||
|
||||
// stop terminates all scrape loops and returns after they all terminated.
|
||||
func (sp *scrapePool) stop() {
|
||||
sp.mtx.Lock()
|
||||
@ -323,6 +313,7 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
|
||||
sp.targetMtx.Lock()
|
||||
|
||||
forcedErr := sp.refreshTargetLimitErr()
|
||||
scrapeFailureLogger := sp.scrapeFailureLogger
|
||||
for fp, oldLoop := range sp.loops {
|
||||
var cache *scrapeCache
|
||||
if oc := oldLoop.getCache(); reuseCache && oc != nil {
|
||||
@ -364,7 +355,7 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
|
||||
wg.Done()
|
||||
|
||||
newLoop.setForcedError(forcedErr)
|
||||
newLoop.setScrapeFailureLogger(sp.getScrapeFailureLogger())
|
||||
newLoop.setScrapeFailureLogger(scrapeFailureLogger)
|
||||
newLoop.run(nil)
|
||||
}(oldLoop, newLoop)
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"maps"
|
||||
"math"
|
||||
"net/http"
|
||||
@ -6734,3 +6735,54 @@ func testDropsSeriesFromMetricRelabeling(t *testing.T, appV2 bool) {
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
// noopFailureLogger is a minimal FailureLogger implementation for testing.
|
||||
type noopFailureLogger struct{}
|
||||
|
||||
func (noopFailureLogger) Enabled(context.Context, slog.Level) bool { return true }
|
||||
func (noopFailureLogger) Handle(context.Context, slog.Record) error { return nil }
|
||||
func (noopFailureLogger) WithAttrs([]slog.Attr) slog.Handler { return noopFailureLogger{} }
|
||||
func (noopFailureLogger) WithGroup(string) slog.Handler { return noopFailureLogger{} }
|
||||
func (noopFailureLogger) Close() error { return nil }
|
||||
|
||||
// TestScrapePoolSetScrapeFailureLoggerRace is a regression test for concurrent
|
||||
// access to scrapeFailureLogger. Both must use targetMtx for synchronization.
|
||||
func TestScrapePoolSetScrapeFailureLoggerRace(t *testing.T) {
|
||||
var (
|
||||
app = teststorage.NewAppendable()
|
||||
cfg = &config.ScrapeConfig{
|
||||
JobName: "test",
|
||||
ScrapeInterval: model.Duration(100 * time.Millisecond),
|
||||
ScrapeTimeout: model.Duration(50 * time.Millisecond),
|
||||
MetricNameValidationScheme: model.UTF8Validation,
|
||||
MetricNameEscapingScheme: model.AllowUTF8,
|
||||
}
|
||||
sp, err = newScrapePool(cfg, app, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
// Create a target group with a target.
|
||||
tg := &targetgroup.Group{
|
||||
Targets: []model.LabelSet{
|
||||
{model.AddressLabel: "127.0.0.1:9090"},
|
||||
},
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Go(func() {
|
||||
for range 100 {
|
||||
sp.SetScrapeFailureLogger(noopFailureLogger{})
|
||||
sp.SetScrapeFailureLogger(nil)
|
||||
}
|
||||
})
|
||||
|
||||
wg.Go(func() {
|
||||
for range 100 {
|
||||
sp.Sync([]*targetgroup.Group{tg})
|
||||
}
|
||||
})
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user