diff --git a/scrape/manager.go b/scrape/manager.go index faa2dacfa6..7389f24b52 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -409,3 +409,22 @@ func (m *Manager) ScrapePoolConfig(scrapePool string) (*config.ScrapeConfig, err return sp.config, nil } + +// DisableEndOfRunStalenessMarkers disables the end-of-run staleness markers for the provided targets in the given +// targetSet. When the end-of-run staleness is disabled for a target, when it goes away, there will be no staleness +// markers written for its series. +func (m *Manager) DisableEndOfRunStalenessMarkers(targetSet string, targets []*Target) { + // This avoids mutex lock contention. + if len(targets) == 0 { + return + } + + // Only hold the lock to find the scrape pool + m.mtxScrape.Lock() + sp, ok := m.scrapePools[targetSet] + m.mtxScrape.Unlock() + + if ok { + sp.disableEndOfRunStalenessMarkers(targets) + } +} diff --git a/scrape/manager_test.go b/scrape/manager_test.go index dcaccd200d..008dd49842 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -1588,3 +1588,50 @@ scrape_configs: []string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)}, ) } + +func TestManagerDisableEndOfRunStalenessMarkers(t *testing.T) { + cfgText := ` +scrape_configs: + - job_name: one + scrape_interval: 1m + scrape_timeout: 1m + - job_name: two + scrape_interval: 1m + scrape_timeout: 1m +` + + cfg := loadConfiguration(t, cfgText) + + m, err := NewManager(&Options{}, nil, nil, &nopAppendable{}, prometheus.NewRegistry()) + require.NoError(t, err) + defer m.Stop() + require.NoError(t, m.ApplyConfig(cfg)) + + // Pass targets to the manager. + tgs := map[string][]*targetgroup.Group{ + "one": {{Targets: []model.LabelSet{{"__address__": "h1"}, {"__address__": "h2"}, {"__address__": "h3"}}}}, + "two": {{Targets: []model.LabelSet{{"__address__": "h4"}}}}, + } + m.updateTsets(tgs) + m.reload() + + activeTargets := m.TargetsActive() + targetsToDisable := []*Target{ + activeTargets["one"][0], + activeTargets["one"][2], + } + + // Disable end of run staleness markers for some targets. + m.DisableEndOfRunStalenessMarkers("one", targetsToDisable) + // This should be a no-op + m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable) + + // Check that the end of run staleness markers are disabled for the correct targets. + for _, group := range []string{"one", "two"} { + for _, tg := range activeTargets[group] { + loop := m.scrapePools[group].loops[tg.hash()].(*scrapeLoop) + expectedDisabled := slices.Contains(targetsToDisable, tg) + require.Equal(t, expectedDisabled, loop.disabledEndOfRunStalenessMarkers.Load()) + } + } +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 0e6981a545..4043f6c0f6 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -41,6 +41,7 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" + "go.uber.org/atomic" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -641,6 +642,16 @@ func (sp *scrapePool) refreshTargetLimitErr() error { return nil } +func (sp *scrapePool) disableEndOfRunStalenessMarkers(targets []*Target) { + sp.mtx.Lock() + defer sp.mtx.Unlock() + for i := range targets { + if l, ok := sp.loops[targets[i].hash()]; ok { + l.disableEndOfRunStalenessMarkers() + } + } +} + func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error { if limits == nil { return nil @@ -957,7 +968,7 @@ type scrapeLoop struct { cancel func() stopped chan struct{} - disabledEndOfRunStalenessMarkers bool + disabledEndOfRunStalenessMarkers atomic.Bool reportExtraMetrics bool appendMetadataToWAL bool @@ -1397,7 +1408,7 @@ mainLoop: close(sl.stopped) - if !sl.disabledEndOfRunStalenessMarkers { + if !sl.disabledEndOfRunStalenessMarkers.Load() { sl.endOfRunStaleness(last, ticker, sl.interval) } } @@ -1563,6 +1574,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int case <-time.After(interval / 10): } + // Check if end-of-run staleness markers have been disabled while we were waiting. + if sl.disabledEndOfRunStalenessMarkers.Load() { + return + } + // Call sl.append again with an empty scrape to trigger stale markers. // If the target has since been recreated and scraped, the // stale markers will be out of order and ignored. @@ -1597,7 +1613,7 @@ func (sl *scrapeLoop) stop() { } func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() { - sl.disabledEndOfRunStalenessMarkers = true + sl.disabledEndOfRunStalenessMarkers.Store(true) } func (sl *scrapeLoop) getCache() *scrapeCache { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 7ccd2c8c02..c7412365d0 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -50,6 +50,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.uber.org/atomic" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" @@ -5897,3 +5898,47 @@ func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) { }...) requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) } + +func TestScrapeLoopDisableStalenessMarkerInjection(t *testing.T) { + var ( + loopDone = atomic.NewBool(false) + appender = &collectResultAppender{} + scraper = &testScraper{} + app = func(_ context.Context) storage.Appender { return appender } + ) + + sl := newBasicScrapeLoop(t, context.Background(), scraper, app, 10*time.Millisecond) + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + if _, err := w.Write([]byte("metric_a 42\n")); err != nil { + return err + } + return ctx.Err() + } + + // Start the scrape loop. + go func() { + sl.run(nil) + loopDone.Store(true) + }() + + // Wait for some samples to be appended. + require.Eventually(t, func() bool { + appender.mtx.Lock() + defer appender.mtx.Unlock() + return len(appender.resultFloats) > 2 + }, 5*time.Second, 100*time.Millisecond, "Scrape loop didn't append any samples.") + + // Disable end of run staleness markers and stop the loop. + sl.disableEndOfRunStalenessMarkers() + sl.stop() + require.Eventually(t, func() bool { + return loopDone.Load() + }, 5*time.Second, 100*time.Millisecond, "Scrape loop didn't stop.") + + // No stale markers should be appended, since they were disabled. + for _, s := range appender.resultFloats { + if value.IsStaleNaN(s.f) { + t.Fatalf("Got stale NaN samples while end of run staleness is disabled: %x", math.Float64bits(s.f)) + } + } +}