diff --git a/scrape/scrape.go b/scrape/scrape.go index af0679fcb5..61c5a2f8f9 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1552,16 +1552,35 @@ type appendErrors struct { numExemplarOutOfOrder int } +// Update the stale markers. +func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) { + sl.cache.forEachStale(func(lset labels.Labels) bool { + // Series no longer exposed, mark it stale. + app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) + _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) + app.SetOptions(nil) + switch { + case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): + // Do not count these in logging, as this is expected if a target + // goes away and comes back again with a new scrape loop. + err = nil + } + return err == nil + }) + return +} + func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { - var p textparse.Parser + defTime := timestamp.FromTime(ts) if len(b) == 0 { - // An empty scrape (such as from a timeout) should not cause errors to be logged or the append to fail. - // We treat it as if we received "text/plain" and allow the appender to continue to trigger stale markers. - p, err = textparse.New(b, "text/plain", "", sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable) - } else { - p, err = textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable) + // Empty scrape. Just update the stale makers and swap the cache (but don't flush it). + err = sl.updateStaleMarkers(app, defTime) + sl.cache.iterDone(false) + return } + + p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable) if p == nil { sl.l.Error( "Failed to determine correct type of scrape target.", @@ -1583,7 +1602,6 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ) } var ( - defTime = timestamp.FromTime(ts) appErrs = appendErrors{} sampleLimitErr error bucketLimitErr error @@ -1624,9 +1642,8 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, if err != nil { return } - // Only perform cache cleaning if the scrape was not empty. - // An empty scrape (usually) is used to indicate a failed scrape. - sl.cache.iterDone(len(b) > 0) + // Flush and swap the cache as the scrape was non-empty. + sl.cache.iterDone(true) }() loop: @@ -1869,19 +1886,7 @@ loop: sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) } if err == nil { - sl.cache.forEachStale(func(lset labels.Labels) bool { - // Series no longer exposed, mark it stale. - app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) - _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) - app.SetOptions(nil) - switch { - case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): - // Do not count these in logging, as this is expected if a target - // goes away and comes back again with a new scrape loop. - err = nil - } - return err == nil - }) + sl.updateStaleMarkers(app, defTime) } return }