From 1f7a23cced729c1905acfe21c60939e784143725 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Mon, 14 Apr 2025 15:42:19 +0100 Subject: [PATCH 1/7] Add tests for staleness markers appended to TSDB when sample_limit is set Signed-off-by: Lukasz Mierzwa --- scrape/scrape_test.go | 216 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 47829bf20e..61eace1bab 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -5568,3 +5568,219 @@ func TestScrapeAppendWithParseError(t *testing.T) { } requireEqual(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", capp) } + +// This test covers a case where there's a target with sample_limit set and the some of exporter samples +// changes between scrapes. +func TestScrapeLoopAppendSampleLimitWithDisappearingSeries(t *testing.T) { + const sampleLimit = 4 + resApp := &collectResultAppender{} + sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender { + return resApp + }, 0) + sl.sampleLimit = sampleLimit + + now := time.Now() + slApp := sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err := sl.append( + slApp, + // Start with 3 samples, all accepted. + []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), + "text/plain", + now, + ) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 3, samplesScraped) // All on scrape. + require.Equal(t, 3, samplesAfterRelabel) // This is series after relabeling. + require.Equal(t, 3, createdSeries) // Newly added to TSDB. + want := []floatSample{ + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_b"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_c"), + t: timestamp.FromTime(now), + f: 1, + }, + } + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) + + now = now.Add(time.Minute) + slApp = sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append( + slApp, + // Start exporting 3 more samples, so we're over the limit now. + []byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\n"), + "text/plain", + now, + ) + require.ErrorIs(t, err, errSampleLimit) + require.NoError(t, slApp.Rollback()) + require.Equal(t, 6, samplesScraped) + require.Equal(t, 6, samplesAfterRelabel) + require.Equal(t, 1, createdSeries) // We've added one series before hitting the limit. + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) + sl.cache.iterDone(false) + + now = now.Add(time.Minute) + slApp = sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append( + slApp, + // Remove all samples except first 2. + []byte("metric_a 1\nmetric_b 1\n"), + "text/plain", + now, + ) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 2, samplesScraped) + require.Equal(t, 2, samplesAfterRelabel) + require.Equal(t, 0, createdSeries) + // This is where important things happen. We should now see: + // - Appends for samples from metric_a & metric_b. + // - Append with stale markers for metric_c - this series was added during first scrape but disappeared during last scrape. + // - Append with stale marker for metric_d - this series was added during second scrape before we hit the sample_limit. + // We should NOT see: + // - Appends with stale markers for metric_e & metric_f - both over the limit during second scrape and so they never made it into TSDB. + want = append(want, []floatSample{ + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_b"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_c"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_d"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + }...) + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) +} + +// This test covers a case where there's a target with sample_limit set and each scrape sees a completely +// different set of samples. +func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) { + const sampleLimit = 4 + resApp := &collectResultAppender{} + sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender { + return resApp + }, 0) + sl.sampleLimit = sampleLimit + + now := time.Now() + slApp := sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err := sl.append( + slApp, + // Start with 4 samples, all accepted. + []byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\n"), + "text/plain", + now, + ) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 4, samplesScraped) // All on scrape. + require.Equal(t, 4, samplesAfterRelabel) // This is series after relabeling. + require.Equal(t, 4, createdSeries) // Newly added to TSDB. + want := []floatSample{ + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_b"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_c"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_d"), + t: timestamp.FromTime(now), + f: 1, + }, + } + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) + + now = now.Add(time.Minute) + slApp = sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append( + slApp, + // Replace all samples with new time series. + []byte("metric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h 1\n"), + "text/plain", + now, + ) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 4, samplesScraped) + require.Equal(t, 4, samplesAfterRelabel) + require.Equal(t, 4, createdSeries) + // We replaced all samples from first scrape with new set of samples. + // We expect to see: + // - 4 appends for new samples. + // - 4 appends with staleness markers for old samples. + want = append(want, []floatSample{ + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_e"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_f"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_g"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_h"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_b"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_c"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_d"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + }...) + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) +} From 872f03766cbe5ca88b4e5303e7caf01038f6a2e2 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Mon, 14 Apr 2025 15:46:53 +0100 Subject: [PATCH 2/7] Pass last know ref ID when injecting staleness markers Currently all staleness markers are appended for any sample that disappears from scrape cache, even if that sample was never appended to TSDB. When staleness markers are appended they always use ref=0 as the SeriesRef, so the downstream appender doesn't know if the sample is for a know series or not. This changes the scrape cache so the map used for staleness tracking stores the cache entry instead of only the label set. Having the cache entry means: - we can ignore stale samples that didn't end up in TSDB (not in the scrape cache) - we can append them to TSDB using correct ref value, so the appender knows if they are for know or unknown series Signed-off-by: Lukasz Mierzwa --- scrape/scrape.go | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 4d23efdbc8..830463982d 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -977,8 +977,8 @@ type scrapeCache struct { // seriesCur and seriesPrev store the labels of series that were seen // in the current and previous scrape. // We hold two maps and swap them out to save allocations. - seriesCur map[uint64]labels.Labels - seriesPrev map[uint64]labels.Labels + seriesCur map[uint64]*cacheEntry + seriesPrev map[uint64]*cacheEntry // TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to // avoid locking (using metadata API can block scraping). @@ -1005,8 +1005,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache { return &scrapeCache{ series: map[string]*cacheEntry{}, droppedSeries: map[string]*uint64{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, + seriesCur: map[uint64]*cacheEntry{}, + seriesPrev: map[uint64]*cacheEntry{}, metadata: map[string]*metaEntry{}, metrics: metrics, } @@ -1075,11 +1075,13 @@ func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) { return e, true, alreadyScraped } -func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) { +func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) (ce *cacheEntry) { if ref == 0 { - return + return nil } - c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} + ce = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} + c.series[string(met)] = ce + return ce } func (c *scrapeCache) addDropped(met []byte) { @@ -1095,14 +1097,17 @@ func (c *scrapeCache) getDropped(met []byte) bool { return ok } -func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { - c.seriesCur[hash] = lset +func (c *scrapeCache) trackStaleness(hash uint64, ce *cacheEntry) { + c.seriesCur[hash] = ce } -func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { - for h, lset := range c.seriesPrev { +func (c *scrapeCache) forEachStale(f func(storage.SeriesRef, labels.Labels) bool) { + for h, ce := range c.seriesPrev { + if ce == nil { + continue + } if _, ok := c.seriesCur[h]; !ok { - if !f(lset) { + if !f(ce.ref, ce.lset) { break } } @@ -1600,10 +1605,10 @@ type appendErrors struct { // Update the stale markers. func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) { - sl.cache.forEachStale(func(lset labels.Labels) bool { + sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool { // Series no longer exposed, mark it stale. app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) - _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) + _, err = app.Append(ref, lset, defTime, math.Float64frombits(value.StaleNaN)) app.SetOptions(nil) switch { case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): @@ -1800,7 +1805,7 @@ loop: if err == nil { if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.hash, ce.lset) + sl.cache.trackStaleness(ce.hash, ce) } } @@ -1813,11 +1818,11 @@ loop: } if !seriesCached { + ce = sl.cache.addRef(met, ref, lset, hash) if parsedTimestamp == nil || sl.trackTimestampsStaleness { // Bypass staleness logic if there is an explicit timestamp. - sl.cache.trackStaleness(hash, lset) + sl.cache.trackStaleness(hash, ce) } - sl.cache.addRef(met, ref, lset, hash) if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil { seriesAdded++ } From 0eedc046f4a64b00be24112f4d3a0c7fced4fa82 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Mon, 14 Apr 2025 15:48:25 +0100 Subject: [PATCH 3/7] Check ref value when appending staleness markers Signed-off-by: Lukasz Mierzwa --- scrape/target.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scrape/target.go b/scrape/target.go index 30b47976a3..73fed40498 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -332,7 +332,9 @@ type limitAppender struct { } func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if !value.IsStaleNaN(v) { + // Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero). + // This ensures that if a series is already in TSDB then we always write the marker. + if ref == 0 || !value.IsStaleNaN(v) { app.i++ if app.i > app.limit { return 0, errSampleLimit From e2193f634f268785b2a100ab91d14003270228ad Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Wed, 16 Apr 2025 13:34:08 +0100 Subject: [PATCH 4/7] Add a test for StaleNaNs after hitting sample_limit I was confused why there are no StaleNaN markers appended when a scrape hits sample_limit, but reading the code I see that's expected, so add a test for it. Signed-off-by: Lukasz Mierzwa --- scrape/scrape_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 61eace1bab..ee8bfcef67 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1722,6 +1722,76 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f)) } +// If we have a target with sample_limit set and scrape initially works but then we hit the sample_limit error, +// then we don't expect to see any StaleNaNs appended for the series that disappeared due to sample_limit error. +func TestScrapeLoopRunCreatesStaleMarkersOnSampleLimit(t *testing.T) { + appender := &collectResultAppender{} + var ( + signal = make(chan struct{}, 1) + scraper = &testScraper{} + app = func(_ context.Context) storage.Appender { return appender } + numScrapes = 0 + ) + + ctx, cancel := context.WithCancel(context.Background()) + // Since we're writing samples directly below we need to provide a protocol fallback. + sl := newBasicScrapeLoopWithFallback(t, ctx, scraper, app, 10*time.Millisecond, "text/plain") + sl.sampleLimit = 4 + + // Succeed once, several failures, then stop. + scraper.scrapeFunc = func(_ context.Context, w io.Writer) error { + numScrapes++ + switch numScrapes { + case 1: + w.Write([]byte("metric_a 10\nmetric_b 10\nmetric_c 10\nmetric_d 10\n")) + return nil + case 2: + w.Write([]byte("metric_a 20\nmetric_b 20\nmetric_c 20\nmetric_d 20\nmetric_e 999\n")) + return nil + case 3: + w.Write([]byte("metric_a 30\nmetric_b 30\nmetric_c 30\nmetric_d 30\n")) + return nil + case 4: + cancel() + } + return errors.New("scrape failed") + } + + go func() { + sl.run(nil) + signal <- struct{}{} + }() + + select { + case <-signal: + case <-time.After(5 * time.Second): + require.FailNow(t, "Scrape wasn't stopped.") + } + + // 4 scrapes in total: + // #1 - success - 4 samples appended + 5 report series + // #2 - sample_limit exceeded - no samples appended, only 5 report series + // #3 - success - 4 samples appended + 5 report series + // #4 - scrape canceled - 4 StaleNaNs appended because of scrape error + 5 report series + require.Len(t, appender.resultFloats, (4+5)+5+(4+5)+(4+5), "Appended samples not as expected:\n%s", appender) + // Expect first 4 samples to be metric_X [0-3]. + for i := range 4 { + require.Equal(t, 10.0, appender.resultFloats[i].f, "Appended %d sample not as expected", i) + } + // Next 5 samples are report series [4-8]. + // Next 5 samples are report series for the second scrape [9-13]. + // Expect first 4 samples to be metric_X from the third scrape [14-17]. + for i := 14; i <= 17; i++ { + require.Equal(t, 30.0, appender.resultFloats[i].f, "Appended %d sample not as expected", i) + } + // Next 5 samples are report series [18-22]. + // Next 5 samples are report series [23-26]. + for i := 23; i <= 26; i++ { + require.True(t, value.IsStaleNaN(appender.resultFloats[i].f), + "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[i].f)) + } +} + func TestScrapeLoopCache(t *testing.T) { s := teststorage.New(t) defer s.Close() From c75768739a3288218a0f2555e99f3ce00b9c33e2 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Mon, 28 Apr 2025 16:04:18 +0100 Subject: [PATCH 5/7] Sort series by labels in requireEqual() Tests that look at samples with StaleNaN values will fail because these samples are generated from map iteration and so the order can be unstable. Signed-off-by: Lukasz Mierzwa --- scrape/scrape_test.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index ee8bfcef67..d28bd4ed18 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -36,6 +36,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/grafana/regexp" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" @@ -1992,7 +1993,16 @@ func TestScrapeLoopAppend(t *testing.T) { func requireEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) { t.Helper() testutil.RequireEqualWithOptions(t, expected, actual, - []cmp.Option{cmp.Comparer(equalFloatSamples), cmp.AllowUnexported(histogramSample{})}, + []cmp.Option{ + cmp.Comparer(equalFloatSamples), + cmp.AllowUnexported(histogramSample{}), + // StaleNaN samples are generated by iterating over a map, which means that the order + // of samples might be different on every test run. Sort series by label to avoid + // test failures because of that. + cmpopts.SortSlices(func(a, b floatSample) int { + return labels.Compare(a.metric, b.metric) + }), + }, msgAndArgs...) } From 6687bf5653da9296430fab9da71bd161b03f385b Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Mon, 28 Apr 2025 16:05:37 +0100 Subject: [PATCH 6/7] Only add series to scrape cache if they were appended to TSDB Scrape cache is used to emit StaleNaN markers after a series disappears so it should only hold entries for series that did end up in TSDB, which is not always the case due to sample_limit. Signed-off-by: Lukasz Mierzwa --- scrape/scrape.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 830463982d..9ba5c6669b 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1103,9 +1103,6 @@ func (c *scrapeCache) trackStaleness(hash uint64, ce *cacheEntry) { func (c *scrapeCache) forEachStale(f func(storage.SeriesRef, labels.Labels) bool) { for h, ce := range c.seriesPrev { - if ce == nil { - continue - } if _, ok := c.seriesCur[h]; !ok { if !f(ce.ref, ce.lset) { break @@ -1817,7 +1814,11 @@ loop: break loop } - if !seriesCached { + // If series wasn't cached (is new, not seen on previous scrape) we need need to add it to the scrape cache. + // But we only do this for series that were appended to TSDB without errors. + // If a series was new but we didn't append it due to sample_limit or other errors then we don't need + // it in the scrape cache because we don't need to emit StaleNaNs for it when it disappears. + if !seriesCached && sampleAdded { ce = sl.cache.addRef(met, ref, lset, hash) if parsedTimestamp == nil || sl.trackTimestampsStaleness { // Bypass staleness logic if there is an explicit timestamp. From bb690a23b9b8b0eae99a246976a3222dfdced423 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Tue, 29 Apr 2025 09:24:27 +0100 Subject: [PATCH 7/7] Make sure we never call trackStaleness with nil cache entry Signed-off-by: Lukasz Mierzwa --- scrape/scrape.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 9ba5c6669b..cf26d16ae2 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1820,8 +1820,9 @@ loop: // it in the scrape cache because we don't need to emit StaleNaNs for it when it disappears. if !seriesCached && sampleAdded { ce = sl.cache.addRef(met, ref, lset, hash) - if parsedTimestamp == nil || sl.trackTimestampsStaleness { + if ce != nil && (parsedTimestamp == nil || sl.trackTimestampsStaleness) { // Bypass staleness logic if there is an explicit timestamp. + // But make sure we only do this if we have a cache entry (ce) for our series. sl.cache.trackStaleness(hash, ce) } if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {