diff --git a/scrape/scrape_append_v2.go b/scrape/scrape_append_v2.go index 825e56f9df..51863329a2 100644 --- a/scrape/scrape_append_v2.go +++ b/scrape/scrape_append_v2.go @@ -314,6 +314,11 @@ loop: // Append sample to the storage. ref, err = app.Append(ref, lset, st, t, val, h, fh, appOpts) } + if err == nil { + if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { + sl.cache.trackStaleness(ce.ref, ce) + } + } sampleAdded, err = sl.checkAddError(met, exemplars, err, &sampleLimitErr, &bucketLimitErr, &appErrs) if err != nil { if !errors.Is(err, storage.ErrNotFound) { @@ -321,9 +326,6 @@ loop: } break loop } - if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.ref, ce) - } // If series wasn't cached (is new, not seen on previous scrape) we need to add it to the scrape cache. // But we only do this for series that were appended to TSDB without errors. diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 49ee38f65d..5bc47133ad 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -3696,6 +3696,57 @@ func testScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T require.Equal(t, 1, seriesAdded) } +func TestScrapeLoopCreatesStaleMarkersOnOOOAppend(t *testing.T) { + foreachAppendable(t, func(t *testing.T, appV2 bool) { + testScrapeLoopCreatesStaleMarkersOnOOOAppend(t, appV2) + }) +} + +func testScrapeLoopCreatesStaleMarkersOnOOOAppend(t *testing.T, appV2 bool) { + appTest := teststorage.NewAppendable() + sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2)) + + now := time.Unix(1, 0) + app := sl.appender() + _, _, _, err := app.append([]byte("metric1 1\n"), "text/plain", now) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + metric1Calls := 0 + // The test storage doesn't track order, so it cannot generate OOO error, + // we have to emulate it. + appTest.WithErrs(func(ls labels.Labels) error { + switch ls.Get(model.MetricNameLabel) { + case "metric1": + metric1Calls++ + // Only return error on the first append, the second one will be the + // staleness marker. + if metric1Calls == 1 { + return storage.ErrOutOfOrderSample + } + return nil + default: + return nil + } + }, nil, nil) + + now2 := time.Unix(2, 0) + app2 := sl.appender() + _, _, _, err = app2.append([]byte("metric1 2\n"), "text/plain", now2) + require.NoError(t, err) + require.NoError(t, app2.Commit()) + + samples := appTest.ResultSamples() + foundStale := false + for _, s := range samples { + if s.L.Get(model.MetricNameLabel) == "metric1" && value.IsStaleNaN(s.V) { + foundStale = true + break + } + } + require.True(t, foundStale, "Stale NaN marker was expected for metric1 but was not found") +} + func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { foreachAppendable(t, func(t *testing.T, appV2 bool) { testScrapeLoopOutOfBoundsTimeError(t, appV2)