diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index f847dbac76..ff7a7bf65a 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -19,7 +19,6 @@ import ( "encoding/binary" "fmt" "math" - "math/rand" "strings" "sync" "testing" @@ -148,10 +147,11 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels f: v, }) - if ref == 0 { - ref = storage.SeriesRef(rand.Uint64()) - } if a.next == nil { + if ref == 0 { + // Use labels hash as a stand-in for unique series reference, to avoid having to track all series. + ref = storage.SeriesRef(lset.Hash()) + } return ref, nil } @@ -195,10 +195,10 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L a.mtx.Lock() defer a.mtx.Unlock() a.pendingMetadata = append(a.pendingMetadata, metadataEntry{metric: l, m: m}) - if ref == 0 { - ref = storage.SeriesRef(rand.Uint64()) - } if a.next == nil { + if ref == 0 { + ref = storage.SeriesRef(l.Hash()) + } return ref, nil } diff --git a/scrape/scrape.go b/scrape/scrape.go index d10858d8ae..db662cb089 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -996,11 +996,9 @@ type scrapeCache struct { // be a pointer so we can update it. droppedSeries map[string]*uint64 - // seriesCur and seriesPrev store the labels of series that were seen - // in the current and previous scrape. - // We hold two maps and swap them out to save allocations. - seriesCur map[uint64]*cacheEntry - seriesPrev map[uint64]*cacheEntry + // Series that were seen in the current and previous scrape, for staleness detection. + seriesCur map[storage.SeriesRef]*cacheEntry + seriesPrev map[storage.SeriesRef]*cacheEntry // TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to // avoid locking (using metadata API can block scraping). @@ -1027,8 +1025,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache { return &scrapeCache{ series: map[string]*cacheEntry{}, droppedSeries: map[string]*uint64{}, - seriesCur: map[uint64]*cacheEntry{}, - seriesPrev: map[uint64]*cacheEntry{}, + seriesCur: map[storage.SeriesRef]*cacheEntry{}, + seriesPrev: map[storage.SeriesRef]*cacheEntry{}, metadata: map[string]*metaEntry{}, metrics: metrics, } @@ -1076,13 +1074,9 @@ func (c *scrapeCache) iterDone(flushCache bool) { c.metaMtx.Unlock() } - // Swap current and previous series. + // Swap current and previous series then clear the new current, to save allocations. c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev - - // We have to delete every single key in the map. - for k := range c.seriesCur { - delete(c.seriesCur, k) - } + clear(c.seriesCur) c.iter++ } @@ -1119,13 +1113,13 @@ func (c *scrapeCache) getDropped(met []byte) bool { return ok } -func (c *scrapeCache) trackStaleness(hash uint64, ce *cacheEntry) { - c.seriesCur[hash] = ce +func (c *scrapeCache) trackStaleness(ref storage.SeriesRef, ce *cacheEntry) { + c.seriesCur[ref] = ce } func (c *scrapeCache) forEachStale(f func(storage.SeriesRef, labels.Labels) bool) { - for h, ce := range c.seriesPrev { - if _, ok := c.seriesCur[h]; !ok { + for ref, ce := range c.seriesPrev { + if _, ok := c.seriesCur[ref]; !ok { if !f(ce.ref, ce.lset) { break } @@ -1833,7 +1827,7 @@ loop: if err == nil { if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.hash, ce) + sl.cache.trackStaleness(ce.ref, ce) } } @@ -1854,7 +1848,7 @@ loop: if ce != nil && (parsedTimestamp == nil || sl.trackTimestampsStaleness) { // Bypass staleness logic if there is an explicit timestamp. // But make sure we only do this if we have a cache entry (ce) for our series. - sl.cache.trackStaleness(hash, ce) + sl.cache.trackStaleness(ref, ce) } if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil { seriesAdded++