diff --git a/scrape/scrape.go b/scrape/scrape.go index 5e52528a2f..d3315a1aff 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -983,8 +983,8 @@ type scrapeCache struct { // seriesCur and seriesPrev store the labels of series that were seen // in the current and previous scrape. // We hold two maps and swap them out to save allocations. - seriesCur map[uint64]labels.Labels - seriesPrev map[uint64]labels.Labels + seriesCur map[uint64]*cacheEntry + seriesPrev map[uint64]*cacheEntry // TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to // avoid locking (using metadata API can block scraping). @@ -1011,8 +1011,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache { return &scrapeCache{ series: map[string]*cacheEntry{}, droppedSeries: map[string]*uint64{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, + seriesCur: map[uint64]*cacheEntry{}, + seriesPrev: map[uint64]*cacheEntry{}, metadata: map[string]*metaEntry{}, metrics: metrics, } @@ -1081,11 +1081,13 @@ func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) { return e, true, alreadyScraped } -func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) { +func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) (ce *cacheEntry) { if ref == 0 { - return + return nil } - c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} + ce = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} + c.series[string(met)] = ce + return ce } func (c *scrapeCache) addDropped(met []byte) { @@ -1101,14 +1103,14 @@ func (c *scrapeCache) getDropped(met []byte) bool { return ok } -func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { - c.seriesCur[hash] = lset +func (c *scrapeCache) trackStaleness(hash uint64, ce *cacheEntry) { + c.seriesCur[hash] = ce } -func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { - for h, lset := range c.seriesPrev { +func (c *scrapeCache) forEachStale(f func(storage.SeriesRef, labels.Labels) bool) { + for h, ce := range c.seriesPrev { if _, ok := c.seriesCur[h]; !ok { - if !f(lset) { + if !f(ce.ref, ce.lset) { break } } @@ -1606,10 +1608,10 @@ type appendErrors struct { // Update the stale markers. func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) { - sl.cache.forEachStale(func(lset labels.Labels) bool { + sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool { // Series no longer exposed, mark it stale. app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) - _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) + _, err = app.Append(ref, lset, defTime, math.Float64frombits(value.StaleNaN)) app.SetOptions(nil) switch { case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): @@ -1806,7 +1808,7 @@ loop: if err == nil { if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.hash, ce.lset) + sl.cache.trackStaleness(ce.hash, ce) } } @@ -1818,12 +1820,17 @@ loop: break loop } - if !seriesCached { - if parsedTimestamp == nil || sl.trackTimestampsStaleness { + // If series wasn't cached (is new, not seen on previous scrape) we need need to add it to the scrape cache. + // But we only do this for series that were appended to TSDB without errors. + // If a series was new but we didn't append it due to sample_limit or other errors then we don't need + // it in the scrape cache because we don't need to emit StaleNaNs for it when it disappears. + if !seriesCached && sampleAdded { + ce = sl.cache.addRef(met, ref, lset, hash) + if ce != nil && (parsedTimestamp == nil || sl.trackTimestampsStaleness) { // Bypass staleness logic if there is an explicit timestamp. - sl.cache.trackStaleness(hash, lset) + // But make sure we only do this if we have a cache entry (ce) for our series. + sl.cache.trackStaleness(hash, ce) } - sl.cache.addRef(met, ref, lset, hash) if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil { seriesAdded++ } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 955fbcdace..bd79bb2e85 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -37,6 +37,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/grafana/regexp" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" @@ -1731,6 +1732,76 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f)) } +// If we have a target with sample_limit set and scrape initially works but then we hit the sample_limit error, +// then we don't expect to see any StaleNaNs appended for the series that disappeared due to sample_limit error. +func TestScrapeLoopRunCreatesStaleMarkersOnSampleLimit(t *testing.T) { + appender := &collectResultAppender{} + var ( + signal = make(chan struct{}, 1) + scraper = &testScraper{} + app = func(_ context.Context) storage.Appender { return appender } + numScrapes = 0 + ) + + ctx, cancel := context.WithCancel(context.Background()) + // Since we're writing samples directly below we need to provide a protocol fallback. + sl := newBasicScrapeLoopWithFallback(t, ctx, scraper, app, 10*time.Millisecond, "text/plain") + sl.sampleLimit = 4 + + // Succeed once, several failures, then stop. + scraper.scrapeFunc = func(_ context.Context, w io.Writer) error { + numScrapes++ + switch numScrapes { + case 1: + w.Write([]byte("metric_a 10\nmetric_b 10\nmetric_c 10\nmetric_d 10\n")) + return nil + case 2: + w.Write([]byte("metric_a 20\nmetric_b 20\nmetric_c 20\nmetric_d 20\nmetric_e 999\n")) + return nil + case 3: + w.Write([]byte("metric_a 30\nmetric_b 30\nmetric_c 30\nmetric_d 30\n")) + return nil + case 4: + cancel() + } + return errors.New("scrape failed") + } + + go func() { + sl.run(nil) + signal <- struct{}{} + }() + + select { + case <-signal: + case <-time.After(5 * time.Second): + require.FailNow(t, "Scrape wasn't stopped.") + } + + // 4 scrapes in total: + // #1 - success - 4 samples appended + 5 report series + // #2 - sample_limit exceeded - no samples appended, only 5 report series + // #3 - success - 4 samples appended + 5 report series + // #4 - scrape canceled - 4 StaleNaNs appended because of scrape error + 5 report series + require.Len(t, appender.resultFloats, (4+5)+5+(4+5)+(4+5), "Appended samples not as expected:\n%s", appender) + // Expect first 4 samples to be metric_X [0-3]. + for i := range 4 { + require.Equal(t, 10.0, appender.resultFloats[i].f, "Appended %d sample not as expected", i) + } + // Next 5 samples are report series [4-8]. + // Next 5 samples are report series for the second scrape [9-13]. + // Expect first 4 samples to be metric_X from the third scrape [14-17]. + for i := 14; i <= 17; i++ { + require.Equal(t, 30.0, appender.resultFloats[i].f, "Appended %d sample not as expected", i) + } + // Next 5 samples are report series [18-22]. + // Next 5 samples are report series [23-26]. + for i := 23; i <= 26; i++ { + require.True(t, value.IsStaleNaN(appender.resultFloats[i].f), + "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[i].f)) + } +} + func TestScrapeLoopCache(t *testing.T) { s := teststorage.New(t) defer s.Close() @@ -1931,7 +2002,16 @@ func TestScrapeLoopAppend(t *testing.T) { func requireEqual(t *testing.T, expected, actual any, msgAndArgs ...any) { t.Helper() testutil.RequireEqualWithOptions(t, expected, actual, - []cmp.Option{cmp.Comparer(equalFloatSamples), cmp.AllowUnexported(histogramSample{})}, + []cmp.Option{ + cmp.Comparer(equalFloatSamples), + cmp.AllowUnexported(histogramSample{}), + // StaleNaN samples are generated by iterating over a map, which means that the order + // of samples might be different on every test run. Sort series by label to avoid + // test failures because of that. + cmpopts.SortSlices(func(a, b floatSample) int { + return labels.Compare(a.metric, b.metric) + }), + }, msgAndArgs...) } @@ -5641,3 +5721,219 @@ func TestScrapeAppendWithParseError(t *testing.T) { } requireEqual(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", capp) } + +// This test covers a case where there's a target with sample_limit set and the some of exporter samples +// changes between scrapes. +func TestScrapeLoopAppendSampleLimitWithDisappearingSeries(t *testing.T) { + const sampleLimit = 4 + resApp := &collectResultAppender{} + sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender { + return resApp + }, 0) + sl.sampleLimit = sampleLimit + + now := time.Now() + slApp := sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err := sl.append( + slApp, + // Start with 3 samples, all accepted. + []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), + "text/plain", + now, + ) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 3, samplesScraped) // All on scrape. + require.Equal(t, 3, samplesAfterRelabel) // This is series after relabeling. + require.Equal(t, 3, createdSeries) // Newly added to TSDB. + want := []floatSample{ + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_b"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_c"), + t: timestamp.FromTime(now), + f: 1, + }, + } + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) + + now = now.Add(time.Minute) + slApp = sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append( + slApp, + // Start exporting 3 more samples, so we're over the limit now. + []byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\n"), + "text/plain", + now, + ) + require.ErrorIs(t, err, errSampleLimit) + require.NoError(t, slApp.Rollback()) + require.Equal(t, 6, samplesScraped) + require.Equal(t, 6, samplesAfterRelabel) + require.Equal(t, 1, createdSeries) // We've added one series before hitting the limit. + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) + sl.cache.iterDone(false) + + now = now.Add(time.Minute) + slApp = sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append( + slApp, + // Remove all samples except first 2. + []byte("metric_a 1\nmetric_b 1\n"), + "text/plain", + now, + ) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 2, samplesScraped) + require.Equal(t, 2, samplesAfterRelabel) + require.Equal(t, 0, createdSeries) + // This is where important things happen. We should now see: + // - Appends for samples from metric_a & metric_b. + // - Append with stale markers for metric_c - this series was added during first scrape but disappeared during last scrape. + // - Append with stale marker for metric_d - this series was added during second scrape before we hit the sample_limit. + // We should NOT see: + // - Appends with stale markers for metric_e & metric_f - both over the limit during second scrape and so they never made it into TSDB. + want = append(want, []floatSample{ + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_b"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_c"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_d"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + }...) + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) +} + +// This test covers a case where there's a target with sample_limit set and each scrape sees a completely +// different set of samples. +func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) { + const sampleLimit = 4 + resApp := &collectResultAppender{} + sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender { + return resApp + }, 0) + sl.sampleLimit = sampleLimit + + now := time.Now() + slApp := sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err := sl.append( + slApp, + // Start with 4 samples, all accepted. + []byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\n"), + "text/plain", + now, + ) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 4, samplesScraped) // All on scrape. + require.Equal(t, 4, samplesAfterRelabel) // This is series after relabeling. + require.Equal(t, 4, createdSeries) // Newly added to TSDB. + want := []floatSample{ + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_b"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_c"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_d"), + t: timestamp.FromTime(now), + f: 1, + }, + } + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) + + now = now.Add(time.Minute) + slApp = sl.appender(context.Background()) + samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append( + slApp, + // Replace all samples with new time series. + []byte("metric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h 1\n"), + "text/plain", + now, + ) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 4, samplesScraped) + require.Equal(t, 4, samplesAfterRelabel) + require.Equal(t, 4, createdSeries) + // We replaced all samples from first scrape with new set of samples. + // We expect to see: + // - 4 appends for new samples. + // - 4 appends with staleness markers for old samples. + want = append(want, []floatSample{ + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_e"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_f"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_g"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_h"), + t: timestamp.FromTime(now), + f: 1, + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_b"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_c"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + { + metric: labels.FromStrings(model.MetricNameLabel, "metric_d"), + t: timestamp.FromTime(now), + f: math.Float64frombits(value.StaleNaN), + }, + }...) + requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp) +} diff --git a/scrape/target.go b/scrape/target.go index 30b47976a3..73fed40498 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -332,7 +332,9 @@ type limitAppender struct { } func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if !value.IsStaleNaN(v) { + // Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero). + // This ensures that if a series is already in TSDB then we always write the marker. + if ref == 0 || !value.IsStaleNaN(v) { app.i++ if app.i > app.limit { return 0, errSampleLimit