mirror of
https://github.com/prometheus/prometheus.git
synced 2025-12-01 23:51:01 +01:00
scrape: Allow disabling end-of-run staleness markers for targets
Signed-off-by: Piotr <17101802+thampiotr@users.noreply.github.com>
This commit is contained in:
parent
784ec0a792
commit
d6848c9f40
@ -409,3 +409,22 @@ func (m *Manager) ScrapePoolConfig(scrapePool string) (*config.ScrapeConfig, err
|
|||||||
|
|
||||||
return sp.config, nil
|
return sp.config, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DisableEndOfRunStalenessMarkers disables the end-of-run staleness markers for the provided targets in the given
|
||||||
|
// targetSet. When the end-of-run staleness is disabled for a target, when it goes away, there will be no staleness
|
||||||
|
// markers written for its series.
|
||||||
|
func (m *Manager) DisableEndOfRunStalenessMarkers(targetSet string, targets []*Target) {
|
||||||
|
// This avoids mutex lock contention.
|
||||||
|
if len(targets) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only hold the lock to find the scrape pool
|
||||||
|
m.mtxScrape.Lock()
|
||||||
|
sp, ok := m.scrapePools[targetSet]
|
||||||
|
m.mtxScrape.Unlock()
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
sp.disableEndOfRunStalenessMarkers(targets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1588,3 +1588,50 @@ scrape_configs:
|
|||||||
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)},
|
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestManagerDisableEndOfRunStalenessMarkers(t *testing.T) {
|
||||||
|
cfgText := `
|
||||||
|
scrape_configs:
|
||||||
|
- job_name: one
|
||||||
|
scrape_interval: 1m
|
||||||
|
scrape_timeout: 1m
|
||||||
|
- job_name: two
|
||||||
|
scrape_interval: 1m
|
||||||
|
scrape_timeout: 1m
|
||||||
|
`
|
||||||
|
|
||||||
|
cfg := loadConfiguration(t, cfgText)
|
||||||
|
|
||||||
|
m, err := NewManager(&Options{}, nil, nil, &nopAppendable{}, prometheus.NewRegistry())
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer m.Stop()
|
||||||
|
require.NoError(t, m.ApplyConfig(cfg))
|
||||||
|
|
||||||
|
// Pass targets to the manager.
|
||||||
|
tgs := map[string][]*targetgroup.Group{
|
||||||
|
"one": {{Targets: []model.LabelSet{{"__address__": "h1"}, {"__address__": "h2"}, {"__address__": "h3"}}}},
|
||||||
|
"two": {{Targets: []model.LabelSet{{"__address__": "h4"}}}},
|
||||||
|
}
|
||||||
|
m.updateTsets(tgs)
|
||||||
|
m.reload()
|
||||||
|
|
||||||
|
activeTargets := m.TargetsActive()
|
||||||
|
targetsToDisable := []*Target{
|
||||||
|
activeTargets["one"][0],
|
||||||
|
activeTargets["one"][2],
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disable end of run staleness markers for some targets.
|
||||||
|
m.DisableEndOfRunStalenessMarkers("one", targetsToDisable)
|
||||||
|
// This should be a no-op
|
||||||
|
m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable)
|
||||||
|
|
||||||
|
// Check that the end of run staleness markers are disabled for the correct targets.
|
||||||
|
for _, group := range []string{"one", "two"} {
|
||||||
|
for _, tg := range activeTargets[group] {
|
||||||
|
loop := m.scrapePools[group].loops[tg.hash()].(*scrapeLoop)
|
||||||
|
expectedDisabled := slices.Contains(targetsToDisable, tg)
|
||||||
|
require.Equal(t, expectedDisabled, loop.disabledEndOfRunStalenessMarkers.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -41,6 +41,7 @@ import (
|
|||||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||||
@ -641,6 +642,16 @@ func (sp *scrapePool) refreshTargetLimitErr() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sp *scrapePool) disableEndOfRunStalenessMarkers(targets []*Target) {
|
||||||
|
sp.mtx.Lock()
|
||||||
|
defer sp.mtx.Unlock()
|
||||||
|
for i := range targets {
|
||||||
|
if l, ok := sp.loops[targets[i].hash()]; ok {
|
||||||
|
l.disableEndOfRunStalenessMarkers()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error {
|
func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error {
|
||||||
if limits == nil {
|
if limits == nil {
|
||||||
return nil
|
return nil
|
||||||
@ -957,7 +968,7 @@ type scrapeLoop struct {
|
|||||||
cancel func()
|
cancel func()
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
|
|
||||||
disabledEndOfRunStalenessMarkers bool
|
disabledEndOfRunStalenessMarkers atomic.Bool
|
||||||
|
|
||||||
reportExtraMetrics bool
|
reportExtraMetrics bool
|
||||||
appendMetadataToWAL bool
|
appendMetadataToWAL bool
|
||||||
@ -1397,7 +1408,7 @@ mainLoop:
|
|||||||
|
|
||||||
close(sl.stopped)
|
close(sl.stopped)
|
||||||
|
|
||||||
if !sl.disabledEndOfRunStalenessMarkers {
|
if !sl.disabledEndOfRunStalenessMarkers.Load() {
|
||||||
sl.endOfRunStaleness(last, ticker, sl.interval)
|
sl.endOfRunStaleness(last, ticker, sl.interval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1563,6 +1574,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
|||||||
case <-time.After(interval / 10):
|
case <-time.After(interval / 10):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if end-of-run staleness markers have been disabled while we were waiting.
|
||||||
|
if sl.disabledEndOfRunStalenessMarkers.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||||
// If the target has since been recreated and scraped, the
|
// If the target has since been recreated and scraped, the
|
||||||
// stale markers will be out of order and ignored.
|
// stale markers will be out of order and ignored.
|
||||||
@ -1597,7 +1613,7 @@ func (sl *scrapeLoop) stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() {
|
func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() {
|
||||||
sl.disabledEndOfRunStalenessMarkers = true
|
sl.disabledEndOfRunStalenessMarkers.Store(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) getCache() *scrapeCache {
|
func (sl *scrapeLoop) getCache() *scrapeCache {
|
||||||
|
|||||||
@ -50,6 +50,7 @@ import (
|
|||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/propagation"
|
"go.opentelemetry.io/otel/propagation"
|
||||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/discovery"
|
"github.com/prometheus/prometheus/discovery"
|
||||||
@ -5897,3 +5898,47 @@ func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) {
|
|||||||
}...)
|
}...)
|
||||||
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
|
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestScrapeLoopDisableStalenessMarkerInjection(t *testing.T) {
|
||||||
|
var (
|
||||||
|
loopDone = atomic.NewBool(false)
|
||||||
|
appender = &collectResultAppender{}
|
||||||
|
scraper = &testScraper{}
|
||||||
|
app = func(_ context.Context) storage.Appender { return appender }
|
||||||
|
)
|
||||||
|
|
||||||
|
sl := newBasicScrapeLoop(t, context.Background(), scraper, app, 10*time.Millisecond)
|
||||||
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
||||||
|
if _, err := w.Write([]byte("metric_a 42\n")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the scrape loop.
|
||||||
|
go func() {
|
||||||
|
sl.run(nil)
|
||||||
|
loopDone.Store(true)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for some samples to be appended.
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
appender.mtx.Lock()
|
||||||
|
defer appender.mtx.Unlock()
|
||||||
|
return len(appender.resultFloats) > 2
|
||||||
|
}, 5*time.Second, 100*time.Millisecond, "Scrape loop didn't append any samples.")
|
||||||
|
|
||||||
|
// Disable end of run staleness markers and stop the loop.
|
||||||
|
sl.disableEndOfRunStalenessMarkers()
|
||||||
|
sl.stop()
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
return loopDone.Load()
|
||||||
|
}, 5*time.Second, 100*time.Millisecond, "Scrape loop didn't stop.")
|
||||||
|
|
||||||
|
// No stale markers should be appended, since they were disabled.
|
||||||
|
for _, s := range appender.resultFloats {
|
||||||
|
if value.IsStaleNaN(s.f) {
|
||||||
|
t.Fatalf("Got stale NaN samples while end of run staleness is disabled: %x", math.Float64bits(s.f))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user