Merge pull request #16429 from prymitive/scrapeCacheStaleNaN

Append staleness markers only for known series
This commit is contained in:
Bryan Boreham 2025-09-02 10:41:07 +01:00 committed by GitHub
commit 70bf09cb2b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 326 additions and 21 deletions

View File

@ -983,8 +983,8 @@ type scrapeCache struct {
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
seriesCur map[uint64]labels.Labels
seriesPrev map[uint64]labels.Labels
seriesCur map[uint64]*cacheEntry
seriesPrev map[uint64]*cacheEntry
// TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to
// avoid locking (using metadata API can block scraping).
@ -1011,8 +1011,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache {
return &scrapeCache{
series: map[string]*cacheEntry{},
droppedSeries: map[string]*uint64{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
seriesCur: map[uint64]*cacheEntry{},
seriesPrev: map[uint64]*cacheEntry{},
metadata: map[string]*metaEntry{},
metrics: metrics,
}
@ -1081,11 +1081,13 @@ func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) {
return e, true, alreadyScraped
}
func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) {
func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) (ce *cacheEntry) {
if ref == 0 {
return
return nil
}
c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
ce = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
c.series[string(met)] = ce
return ce
}
func (c *scrapeCache) addDropped(met []byte) {
@ -1101,14 +1103,14 @@ func (c *scrapeCache) getDropped(met []byte) bool {
return ok
}
func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
c.seriesCur[hash] = lset
func (c *scrapeCache) trackStaleness(hash uint64, ce *cacheEntry) {
c.seriesCur[hash] = ce
}
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
for h, lset := range c.seriesPrev {
func (c *scrapeCache) forEachStale(f func(storage.SeriesRef, labels.Labels) bool) {
for h, ce := range c.seriesPrev {
if _, ok := c.seriesCur[h]; !ok {
if !f(lset) {
if !f(ce.ref, ce.lset) {
break
}
}
@ -1606,10 +1608,10 @@ type appendErrors struct {
// Update the stale markers.
func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) {
sl.cache.forEachStale(func(lset labels.Labels) bool {
sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
_, err = app.Append(ref, lset, defTime, math.Float64frombits(value.StaleNaN))
app.SetOptions(nil)
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
@ -1806,7 +1808,7 @@ loop:
if err == nil {
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
sl.cache.trackStaleness(ce.hash, ce)
}
}
@ -1818,12 +1820,17 @@ loop:
break loop
}
if !seriesCached {
if parsedTimestamp == nil || sl.trackTimestampsStaleness {
// If series wasn't cached (is new, not seen on previous scrape) we need need to add it to the scrape cache.
// But we only do this for series that were appended to TSDB without errors.
// If a series was new but we didn't append it due to sample_limit or other errors then we don't need
// it in the scrape cache because we don't need to emit StaleNaNs for it when it disappears.
if !seriesCached && sampleAdded {
ce = sl.cache.addRef(met, ref, lset, hash)
if ce != nil && (parsedTimestamp == nil || sl.trackTimestampsStaleness) {
// Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, lset)
// But make sure we only do this if we have a cache entry (ce) for our series.
sl.cache.trackStaleness(hash, ce)
}
sl.cache.addRef(met, ref, lset, hash)
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++
}

View File

@ -37,6 +37,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/grafana/regexp"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
@ -1731,6 +1732,76 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
}
// If we have a target with sample_limit set and scrape initially works but then we hit the sample_limit error,
// then we don't expect to see any StaleNaNs appended for the series that disappeared due to sample_limit error.
func TestScrapeLoopRunCreatesStaleMarkersOnSampleLimit(t *testing.T) {
appender := &collectResultAppender{}
var (
signal = make(chan struct{}, 1)
scraper = &testScraper{}
app = func(_ context.Context) storage.Appender { return appender }
numScrapes = 0
)
ctx, cancel := context.WithCancel(context.Background())
// Since we're writing samples directly below we need to provide a protocol fallback.
sl := newBasicScrapeLoopWithFallback(t, ctx, scraper, app, 10*time.Millisecond, "text/plain")
sl.sampleLimit = 4
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(_ context.Context, w io.Writer) error {
numScrapes++
switch numScrapes {
case 1:
w.Write([]byte("metric_a 10\nmetric_b 10\nmetric_c 10\nmetric_d 10\n"))
return nil
case 2:
w.Write([]byte("metric_a 20\nmetric_b 20\nmetric_c 20\nmetric_d 20\nmetric_e 999\n"))
return nil
case 3:
w.Write([]byte("metric_a 30\nmetric_b 30\nmetric_c 30\nmetric_d 30\n"))
return nil
case 4:
cancel()
}
return errors.New("scrape failed")
}
go func() {
sl.run(nil)
signal <- struct{}{}
}()
select {
case <-signal:
case <-time.After(5 * time.Second):
require.FailNow(t, "Scrape wasn't stopped.")
}
// 4 scrapes in total:
// #1 - success - 4 samples appended + 5 report series
// #2 - sample_limit exceeded - no samples appended, only 5 report series
// #3 - success - 4 samples appended + 5 report series
// #4 - scrape canceled - 4 StaleNaNs appended because of scrape error + 5 report series
require.Len(t, appender.resultFloats, (4+5)+5+(4+5)+(4+5), "Appended samples not as expected:\n%s", appender)
// Expect first 4 samples to be metric_X [0-3].
for i := range 4 {
require.Equal(t, 10.0, appender.resultFloats[i].f, "Appended %d sample not as expected", i)
}
// Next 5 samples are report series [4-8].
// Next 5 samples are report series for the second scrape [9-13].
// Expect first 4 samples to be metric_X from the third scrape [14-17].
for i := 14; i <= 17; i++ {
require.Equal(t, 30.0, appender.resultFloats[i].f, "Appended %d sample not as expected", i)
}
// Next 5 samples are report series [18-22].
// Next 5 samples are report series [23-26].
for i := 23; i <= 26; i++ {
require.True(t, value.IsStaleNaN(appender.resultFloats[i].f),
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[i].f))
}
}
func TestScrapeLoopCache(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
@ -1931,7 +2002,16 @@ func TestScrapeLoopAppend(t *testing.T) {
func requireEqual(t *testing.T, expected, actual any, msgAndArgs ...any) {
t.Helper()
testutil.RequireEqualWithOptions(t, expected, actual,
[]cmp.Option{cmp.Comparer(equalFloatSamples), cmp.AllowUnexported(histogramSample{})},
[]cmp.Option{
cmp.Comparer(equalFloatSamples),
cmp.AllowUnexported(histogramSample{}),
// StaleNaN samples are generated by iterating over a map, which means that the order
// of samples might be different on every test run. Sort series by label to avoid
// test failures because of that.
cmpopts.SortSlices(func(a, b floatSample) int {
return labels.Compare(a.metric, b.metric)
}),
},
msgAndArgs...)
}
@ -5641,3 +5721,219 @@ func TestScrapeAppendWithParseError(t *testing.T) {
}
requireEqual(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", capp)
}
// This test covers a case where there's a target with sample_limit set and the some of exporter samples
// changes between scrapes.
func TestScrapeLoopAppendSampleLimitWithDisappearingSeries(t *testing.T) {
const sampleLimit = 4
resApp := &collectResultAppender{}
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender {
return resApp
}, 0)
sl.sampleLimit = sampleLimit
now := time.Now()
slApp := sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err := sl.append(
slApp,
// Start with 3 samples, all accepted.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 3, samplesScraped) // All on scrape.
require.Equal(t, 3, samplesAfterRelabel) // This is series after relabeling.
require.Equal(t, 3, createdSeries) // Newly added to TSDB.
want := []floatSample{
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_b"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_c"),
t: timestamp.FromTime(now),
f: 1,
},
}
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
now = now.Add(time.Minute)
slApp = sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append(
slApp,
// Start exporting 3 more samples, so we're over the limit now.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\n"),
"text/plain",
now,
)
require.ErrorIs(t, err, errSampleLimit)
require.NoError(t, slApp.Rollback())
require.Equal(t, 6, samplesScraped)
require.Equal(t, 6, samplesAfterRelabel)
require.Equal(t, 1, createdSeries) // We've added one series before hitting the limit.
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
sl.cache.iterDone(false)
now = now.Add(time.Minute)
slApp = sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append(
slApp,
// Remove all samples except first 2.
[]byte("metric_a 1\nmetric_b 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 2, samplesScraped)
require.Equal(t, 2, samplesAfterRelabel)
require.Equal(t, 0, createdSeries)
// This is where important things happen. We should now see:
// - Appends for samples from metric_a & metric_b.
// - Append with stale markers for metric_c - this series was added during first scrape but disappeared during last scrape.
// - Append with stale marker for metric_d - this series was added during second scrape before we hit the sample_limit.
// We should NOT see:
// - Appends with stale markers for metric_e & metric_f - both over the limit during second scrape and so they never made it into TSDB.
want = append(want, []floatSample{
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_b"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_c"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_d"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
}...)
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
}
// This test covers a case where there's a target with sample_limit set and each scrape sees a completely
// different set of samples.
func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) {
const sampleLimit = 4
resApp := &collectResultAppender{}
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender {
return resApp
}, 0)
sl.sampleLimit = sampleLimit
now := time.Now()
slApp := sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err := sl.append(
slApp,
// Start with 4 samples, all accepted.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 4, samplesScraped) // All on scrape.
require.Equal(t, 4, samplesAfterRelabel) // This is series after relabeling.
require.Equal(t, 4, createdSeries) // Newly added to TSDB.
want := []floatSample{
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_b"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_c"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_d"),
t: timestamp.FromTime(now),
f: 1,
},
}
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
now = now.Add(time.Minute)
slApp = sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append(
slApp,
// Replace all samples with new time series.
[]byte("metric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 4, samplesScraped)
require.Equal(t, 4, samplesAfterRelabel)
require.Equal(t, 4, createdSeries)
// We replaced all samples from first scrape with new set of samples.
// We expect to see:
// - 4 appends for new samples.
// - 4 appends with staleness markers for old samples.
want = append(want, []floatSample{
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_e"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_f"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_g"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_h"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_b"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_c"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_d"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
}...)
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
}

View File

@ -332,7 +332,9 @@ type limitAppender struct {
}
func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if !value.IsStaleNaN(v) {
// Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero).
// This ensures that if a series is already in TSDB then we always write the marker.
if ref == 0 || !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return 0, errSampleLimit