rework from suggestion

Signed-off-by: alexgreenbank <alex.greenbank@grafana.com>
This commit is contained in:
alexgreenbank 2024-11-06 17:06:23 +00:00
parent 2cd268c87b
commit c8eddaecff

View File

@ -1552,16 +1552,35 @@ type appendErrors struct {
numExemplarOutOfOrder int
}
// Update the stale markers.
func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
app.SetOptions(nil)
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
return err == nil
})
return
}
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
var p textparse.Parser
defTime := timestamp.FromTime(ts)
if len(b) == 0 {
// An empty scrape (such as from a timeout) should not cause errors to be logged or the append to fail.
// We treat it as if we received "text/plain" and allow the appender to continue to trigger stale markers.
p, err = textparse.New(b, "text/plain", "", sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable)
} else {
p, err = textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable)
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
err = sl.updateStaleMarkers(app, defTime)
sl.cache.iterDone(false)
return
}
p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable)
if p == nil {
sl.l.Error(
"Failed to determine correct type of scrape target.",
@ -1583,7 +1602,6 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
)
}
var (
defTime = timestamp.FromTime(ts)
appErrs = appendErrors{}
sampleLimitErr error
bucketLimitErr error
@ -1624,9 +1642,8 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
if err != nil {
return
}
// Only perform cache cleaning if the scrape was not empty.
// An empty scrape (usually) is used to indicate a failed scrape.
sl.cache.iterDone(len(b) > 0)
// Flush and swap the cache as the scrape was non-empty.
sl.cache.iterDone(true)
}()
loop:
@ -1869,19 +1886,7 @@ loop:
sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
}
if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
app.SetOptions(nil)
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
return err == nil
})
sl.updateStaleMarkers(app, defTime)
}
return
}