Merge pull request #17114 from bboreham/scrape-stale-by-ref

Scraping: detect staleness via unique reference
This commit is contained in:
Bryan Boreham 2025-11-14 18:32:26 +01:00 committed by GitHub
commit b7aae06181
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 26 deletions

View File

@ -19,7 +19,6 @@ import (
"encoding/binary"
"fmt"
"math"
"math/rand"
"strings"
"sync"
"testing"
@ -148,10 +147,11 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels
f: v,
})
if ref == 0 {
ref = storage.SeriesRef(rand.Uint64())
}
if a.next == nil {
if ref == 0 {
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
ref = storage.SeriesRef(lset.Hash())
}
return ref, nil
}
@ -195,10 +195,10 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingMetadata = append(a.pendingMetadata, metadataEntry{metric: l, m: m})
if ref == 0 {
ref = storage.SeriesRef(rand.Uint64())
}
if a.next == nil {
if ref == 0 {
ref = storage.SeriesRef(l.Hash())
}
return ref, nil
}

View File

@ -996,11 +996,9 @@ type scrapeCache struct {
// be a pointer so we can update it.
droppedSeries map[string]*uint64
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
seriesCur map[uint64]*cacheEntry
seriesPrev map[uint64]*cacheEntry
// Series that were seen in the current and previous scrape, for staleness detection.
seriesCur map[storage.SeriesRef]*cacheEntry
seriesPrev map[storage.SeriesRef]*cacheEntry
// TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to
// avoid locking (using metadata API can block scraping).
@ -1027,8 +1025,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache {
return &scrapeCache{
series: map[string]*cacheEntry{},
droppedSeries: map[string]*uint64{},
seriesCur: map[uint64]*cacheEntry{},
seriesPrev: map[uint64]*cacheEntry{},
seriesCur: map[storage.SeriesRef]*cacheEntry{},
seriesPrev: map[storage.SeriesRef]*cacheEntry{},
metadata: map[string]*metaEntry{},
metrics: metrics,
}
@ -1076,13 +1074,9 @@ func (c *scrapeCache) iterDone(flushCache bool) {
c.metaMtx.Unlock()
}
// Swap current and previous series.
// Swap current and previous series then clear the new current, to save allocations.
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
// We have to delete every single key in the map.
for k := range c.seriesCur {
delete(c.seriesCur, k)
}
clear(c.seriesCur)
c.iter++
}
@ -1119,13 +1113,13 @@ func (c *scrapeCache) getDropped(met []byte) bool {
return ok
}
func (c *scrapeCache) trackStaleness(hash uint64, ce *cacheEntry) {
c.seriesCur[hash] = ce
func (c *scrapeCache) trackStaleness(ref storage.SeriesRef, ce *cacheEntry) {
c.seriesCur[ref] = ce
}
func (c *scrapeCache) forEachStale(f func(storage.SeriesRef, labels.Labels) bool) {
for h, ce := range c.seriesPrev {
if _, ok := c.seriesCur[h]; !ok {
for ref, ce := range c.seriesPrev {
if _, ok := c.seriesCur[ref]; !ok {
if !f(ce.ref, ce.lset) {
break
}
@ -1833,7 +1827,7 @@ loop:
if err == nil {
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce)
sl.cache.trackStaleness(ce.ref, ce)
}
}
@ -1854,7 +1848,7 @@ loop:
if ce != nil && (parsedTimestamp == nil || sl.trackTimestampsStaleness) {
// Bypass staleness logic if there is an explicit timestamp.
// But make sure we only do this if we have a cache entry (ce) for our series.
sl.cache.trackStaleness(hash, ce)
sl.cache.trackStaleness(ref, ce)
}
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++