From 7a947d362946f1852bf0097249b0f51385f1f800 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 24 Jul 2025 17:38:18 -0700 Subject: [PATCH 1/4] Track stale series in the Head Signed-off-by: Ganesh Vernekar --- tsdb/head.go | 27 ++++++++++++++++++++++++--- tsdb/head_append.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 7763d272b7..574305a287 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -68,6 +69,7 @@ var ( type Head struct { chunkRange atomic.Int64 numSeries atomic.Uint64 + numStaleSeries atomic.Uint64 minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection. minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked. minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. @@ -360,6 +362,7 @@ func (h *Head) resetWLReplayResources() { type headMetrics struct { activeAppenders prometheus.Gauge series prometheus.GaugeFunc + staleSeries prometheus.GaugeFunc seriesCreated prometheus.Counter seriesRemoved prometheus.Counter seriesNotFound prometheus.Counter @@ -406,6 +409,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { }, func() float64 { return float64(h.NumSeries()) }), + staleSeries: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_head_stale_series", + Help: "Total number of stale series in the head block.", + }, func() float64 { + return float64(h.NumStaleSeries()) + }), seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_created_total", Help: "Total number of series created in the head", @@ -532,6 +541,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { r.MustRegister( m.activeAppenders, m.series, + m.staleSeries, m.chunks, m.chunksCreated, m.chunksRemoved, @@ -1607,7 +1617,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. - deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef) + deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries) seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) @@ -1645,11 +1655,16 @@ func (h *Head) Tombstones() (tombstones.Reader, error) { return h.tombstones, nil } -// NumSeries returns the number of active series in the head. +// NumSeries returns the number of series tracked in the head. func (h *Head) NumSeries() uint64 { return h.numSeries.Load() } +// NumStaleSeries returns the number of stale series in the head. +func (h *Head) NumStaleSeries() uint64 { + return h.numStaleSeries.Load() +} + var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") // Meta returns meta information about the head. @@ -1929,7 +1944,7 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st // but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct // and there's no easy way to cast maps. // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. -func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) { +func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) { var ( deleted = map[storage.SeriesRef]struct{}{} affected = map[labels.Label]struct{}{} @@ -1987,6 +2002,12 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( defer s.locks[refShard].Unlock() } + if value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { + numStaleSeries.Dec() + } + deleted[storage.SeriesRef(series.ref)] = struct{}{} series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) s.hashes[hashShard].del(hash, series.ref) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 05299f048d..fa44f752f2 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1222,6 +1222,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { acc.floatsAppended-- } default: + newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V) + staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V) ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1230,6 +1232,12 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { // The sample is an exact duplicate, and should be silently dropped. acc.floatsAppended-- @@ -1310,6 +1318,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) { acc.histogramsAppended-- } default: + newlyStale := value.IsStaleNaN(s.H.Sum) + staleToNonStale := false + if series.lastHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum) + } ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1318,6 +1332,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { acc.histogramsAppended-- acc.histoOOORejected++ @@ -1398,6 +1418,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { acc.histogramsAppended-- } default: + newlyStale := value.IsStaleNaN(s.FH.Sum) + staleToNonStale := false + if series.lastFloatHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum) + } ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1406,6 +1432,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { acc.histogramsAppended-- acc.histoOOORejected++ From 0c3d3d74668ac3bf21c78ca63d308215278e6af0 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 24 Jul 2025 17:38:50 -0700 Subject: [PATCH 2/4] Test the stale series tracking in Head Signed-off-by: Ganesh Vernekar --- tsdb/head_test.go | 129 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 128 insertions(+), 1 deletion(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 73f67f4e8a..b1856c0419 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6435,7 +6435,7 @@ func TestStripeSeries_gc(t *testing.T) { s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) hash := ms1.lset.Hash() - s.gc(0, 0) + s.gc(0, 0, nil) // Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series got := s.getByHash(hash, ms1.lset) @@ -6866,3 +6866,130 @@ func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(sto wg.Wait() } + +func TestHead_NumStaleSeries(t *testing.T) { + head, _ := newTestHead(t, 1000, compression.None, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + // Initially, no series should be stale. + require.Equal(t, uint64(0), head.NumStaleSeries()) + + appendSample := func(lbls labels.Labels, ts int64, val float64) { + app := head.Appender(context.Background()) + _, err := app.Append(0, lbls, ts, val) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + appendHistogram := func(lbls labels.Labels, ts int64, val *histogram.Histogram) { + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, lbls, ts, val, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + appendFloatHistogram := func(lbls labels.Labels, ts int64, val *histogram.FloatHistogram) { + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, lbls, ts, nil, val) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + verifySeriesCounts := func(numStaleSeries, numSeries int) { + require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries()) + require.Equal(t, uint64(numSeries), head.NumSeries()) + } + + // Create some series with normal samples. + series1 := labels.FromStrings("name", "series1", "label", "value1") + series2 := labels.FromStrings("name", "series2", "label", "value2") + series3 := labels.FromStrings("name", "series3", "label", "value3") + + // Add normal samples to all series. + appendSample(series1, 100, 1) + appendSample(series2, 100, 2) + appendSample(series3, 100, 3) + // Still no stale series. + verifySeriesCounts(0, 3) + + // Make series1 stale by appending a stale sample. Now we should have 1 stale series. + appendSample(series1, 200, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(1, 3) + + // Make series2 stale as well. + appendSample(series2, 200, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(2, 3) + + // Add a non-stale sample to series1. It should not be counted as stale now. + appendSample(series1, 300, 10) + verifySeriesCounts(1, 3) + + // Test that series3 doesn't become stale when we add another normal sample. + appendSample(series3, 200, 10) + verifySeriesCounts(1, 3) + + // Test histogram stale samples as well. + series4 := labels.FromStrings("name", "series4", "type", "histogram") + h := tsdbutil.GenerateTestHistograms(1)[0] + appendHistogram(series4, 100, h) + verifySeriesCounts(1, 4) + + // Make histogram series stale. + staleHist := h.Copy() + staleHist.Sum = math.Float64frombits(value.StaleNaN) + appendHistogram(series4, 200, staleHist) + verifySeriesCounts(2, 4) + + // Test float histogram stale samples. + series5 := labels.FromStrings("name", "series5", "type", "float_histogram") + fh := tsdbutil.GenerateTestFloatHistograms(1)[0] + appendFloatHistogram(series5, 100, fh) + verifySeriesCounts(2, 5) + + // Make float histogram series stale. + staleFH := fh.Copy() + staleFH.Sum = math.Float64frombits(value.StaleNaN) + appendFloatHistogram(series5, 200, staleFH) + verifySeriesCounts(3, 5) + + // Make histogram sample non-stale and stale back again. + appendHistogram(series4, 210, h) + verifySeriesCounts(2, 5) + appendHistogram(series4, 220, staleHist) + verifySeriesCounts(3, 5) + + // Make float histogram sample non-stale and stale back again. + appendFloatHistogram(series5, 210, fh) + verifySeriesCounts(2, 5) + appendFloatHistogram(series5, 220, staleFH) + verifySeriesCounts(3, 5) + + // Series 1 and 3 are not stale at this point. Add a new sample to series 1 and series 5, + // so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series. + appendSample(series1, 400, 10) + appendFloatHistogram(series5, 400, staleFH) + verifySeriesCounts(3, 5) + + // Test garbage collection behavior - stale series should be decremented when GC'd. + // Force a garbage collection by truncating old data. + require.NoError(t, head.Truncate(300)) + + // After truncation, run GC to collect old chunks/series. + head.gc() + + // series 1 and series 5 are left. + verifySeriesCounts(1, 2) + + // Test creating a new series for each of float, histogram, float histogram that starts as stale. + // This should be counted as stale. + series6 := labels.FromStrings("name", "series6", "direct", "stale") + series7 := labels.FromStrings("name", "series7", "direct", "stale") + series8 := labels.FromStrings("name", "series8", "direct", "stale") + appendSample(series6, 400, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(2, 3) + appendHistogram(series7, 400, staleHist) + verifySeriesCounts(3, 4) + appendFloatHistogram(series8, 400, staleFH) + verifySeriesCounts(4, 5) +} From b29ce3e4891c5aabd03bffaa473f45d6a4100505 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 6 Aug 2025 16:09:21 -0700 Subject: [PATCH 3/4] Restore stale series count on WAL replay Signed-off-by: Ganesh Vernekar --- tsdb/head_test.go | 19 ++++++++++++++++++- tsdb/head_wal.go | 27 ++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b1856c0419..f64a93c7f3 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6901,6 +6901,20 @@ func TestHead_NumStaleSeries(t *testing.T) { require.Equal(t, uint64(numSeries), head.NumSeries()) } + restartHeadAndVerifySeriesCounts := func(numStaleSeries, numSeries int) { + verifySeriesCounts(numStaleSeries, numSeries) + + require.NoError(t, head.Close()) + + wal, err := wlog.NewSize(nil, nil, filepath.Join(head.opts.ChunkDirRoot, "wal"), 32768, compression.None) + require.NoError(t, err) + head, err = NewHead(nil, nil, wal, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + + verifySeriesCounts(numStaleSeries, numSeries) + } + // Create some series with normal samples. series1 := labels.FromStrings("name", "series1", "label", "value1") series2 := labels.FromStrings("name", "series2", "label", "value2") @@ -6920,10 +6934,12 @@ func TestHead_NumStaleSeries(t *testing.T) { // Make series2 stale as well. appendSample(series2, 200, math.Float64frombits(value.StaleNaN)) verifySeriesCounts(2, 3) + restartHeadAndVerifySeriesCounts(2, 3) // Add a non-stale sample to series1. It should not be counted as stale now. appendSample(series1, 300, 10) verifySeriesCounts(1, 3) + restartHeadAndVerifySeriesCounts(1, 3) // Test that series3 doesn't become stale when we add another normal sample. appendSample(series3, 200, 10) @@ -6946,6 +6962,7 @@ func TestHead_NumStaleSeries(t *testing.T) { fh := tsdbutil.GenerateTestFloatHistograms(1)[0] appendFloatHistogram(series5, 100, fh) verifySeriesCounts(2, 5) + restartHeadAndVerifySeriesCounts(2, 5) // Make float histogram series stale. staleFH := fh.Copy() @@ -6969,7 +6986,7 @@ func TestHead_NumStaleSeries(t *testing.T) { // so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series. appendSample(series1, 400, 10) appendFloatHistogram(series5, 400, staleFH) - verifySeriesCounts(3, 5) + restartHeadAndVerifySeriesCounts(3, 5) // Test garbage collection behavior - stale series should be decremented when GC'd. // Force a garbage collection by truncating old data. diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ee6557fdad..41317bbb92 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -16,6 +16,7 @@ package tsdb import ( "errors" "fmt" + "github.com/prometheus/prometheus/model/value" "maps" "math" "os" @@ -627,6 +628,14 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.T <= ms.mmMaxTime { continue } + + if !value.IsStaleNaN(ms.lastValue) && value.IsStaleNaN(s.V) { + h.numStaleSeries.Inc() + } + if value.IsStaleNaN(ms.lastValue) && !value.IsStaleNaN(s.V) { + h.numStaleSeries.Dec() + } + if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -657,12 +666,28 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.t <= ms.mmMaxTime { continue } - var chunkCreated bool + var chunkCreated, newlyStale, staleToNonStale bool if s.h != nil { + newlyStale = value.IsStaleNaN(s.h.Sum) + if ms.lastHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum) + } _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) } else { + newlyStale = value.IsStaleNaN(s.fh.Sum) + if ms.lastFloatHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum) + } _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) } + if newlyStale { + h.numStaleSeries.Inc() + } + if staleToNonStale { + h.numStaleSeries.Dec() + } if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() From 3904b3cd5f69ba5d308c23bd179f74e0c589e123 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 6 Aug 2025 16:22:58 -0700 Subject: [PATCH 4/4] Restore stale series count from chunk snapshots Signed-off-by: Ganesh Vernekar --- tsdb/head_test.go | 4 ++++ tsdb/head_wal.go | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f64a93c7f3..3c711e19a0 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6988,6 +6988,10 @@ func TestHead_NumStaleSeries(t *testing.T) { appendFloatHistogram(series5, 400, staleFH) restartHeadAndVerifySeriesCounts(3, 5) + // This will test restarting with snapshot. + head.opts.EnableMemorySnapshotOnShutdown = true + restartHeadAndVerifySeriesCounts(3, 5) + // Test garbage collection behavior - stale series should be decremented when GC'd. // Force a garbage collection by truncating old data. require.NoError(t, head.Truncate(300)) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 41317bbb92..3e0dadb526 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -16,7 +16,6 @@ package tsdb import ( "errors" "fmt" - "github.com/prometheus/prometheus/model/value" "maps" "math" "os" @@ -33,6 +32,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -1607,6 +1607,12 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie series.lastHistogramValue = csr.lastHistogramValue series.lastFloatHistogramValue = csr.lastFloatHistogramValue + if value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { + h.numStaleSeries.Inc() + } + app, err := series.headChunks.chunk.Appender() if err != nil { errChan <- err