From b29ce3e4891c5aabd03bffaa473f45d6a4100505 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 6 Aug 2025 16:09:21 -0700 Subject: [PATCH] Restore stale series count on WAL replay Signed-off-by: Ganesh Vernekar --- tsdb/head_test.go | 19 ++++++++++++++++++- tsdb/head_wal.go | 27 ++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b1856c0419..f64a93c7f3 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6901,6 +6901,20 @@ func TestHead_NumStaleSeries(t *testing.T) { require.Equal(t, uint64(numSeries), head.NumSeries()) } + restartHeadAndVerifySeriesCounts := func(numStaleSeries, numSeries int) { + verifySeriesCounts(numStaleSeries, numSeries) + + require.NoError(t, head.Close()) + + wal, err := wlog.NewSize(nil, nil, filepath.Join(head.opts.ChunkDirRoot, "wal"), 32768, compression.None) + require.NoError(t, err) + head, err = NewHead(nil, nil, wal, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + + verifySeriesCounts(numStaleSeries, numSeries) + } + // Create some series with normal samples. series1 := labels.FromStrings("name", "series1", "label", "value1") series2 := labels.FromStrings("name", "series2", "label", "value2") @@ -6920,10 +6934,12 @@ func TestHead_NumStaleSeries(t *testing.T) { // Make series2 stale as well. appendSample(series2, 200, math.Float64frombits(value.StaleNaN)) verifySeriesCounts(2, 3) + restartHeadAndVerifySeriesCounts(2, 3) // Add a non-stale sample to series1. It should not be counted as stale now. appendSample(series1, 300, 10) verifySeriesCounts(1, 3) + restartHeadAndVerifySeriesCounts(1, 3) // Test that series3 doesn't become stale when we add another normal sample. appendSample(series3, 200, 10) @@ -6946,6 +6962,7 @@ func TestHead_NumStaleSeries(t *testing.T) { fh := tsdbutil.GenerateTestFloatHistograms(1)[0] appendFloatHistogram(series5, 100, fh) verifySeriesCounts(2, 5) + restartHeadAndVerifySeriesCounts(2, 5) // Make float histogram series stale. staleFH := fh.Copy() @@ -6969,7 +6986,7 @@ func TestHead_NumStaleSeries(t *testing.T) { // so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series. appendSample(series1, 400, 10) appendFloatHistogram(series5, 400, staleFH) - verifySeriesCounts(3, 5) + restartHeadAndVerifySeriesCounts(3, 5) // Test garbage collection behavior - stale series should be decremented when GC'd. // Force a garbage collection by truncating old data. diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ee6557fdad..41317bbb92 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -16,6 +16,7 @@ package tsdb import ( "errors" "fmt" + "github.com/prometheus/prometheus/model/value" "maps" "math" "os" @@ -627,6 +628,14 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.T <= ms.mmMaxTime { continue } + + if !value.IsStaleNaN(ms.lastValue) && value.IsStaleNaN(s.V) { + h.numStaleSeries.Inc() + } + if value.IsStaleNaN(ms.lastValue) && !value.IsStaleNaN(s.V) { + h.numStaleSeries.Dec() + } + if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -657,12 +666,28 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.t <= ms.mmMaxTime { continue } - var chunkCreated bool + var chunkCreated, newlyStale, staleToNonStale bool if s.h != nil { + newlyStale = value.IsStaleNaN(s.h.Sum) + if ms.lastHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum) + } _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) } else { + newlyStale = value.IsStaleNaN(s.fh.Sum) + if ms.lastFloatHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum) + } _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) } + if newlyStale { + h.numStaleSeries.Inc() + } + if staleToNonStale { + h.numStaleSeries.Dec() + } if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc()