Restore stale series count on WAL replay

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2025-08-06 16:09:21 -07:00
parent 0c3d3d7466
commit b29ce3e489
2 changed files with 44 additions and 2 deletions

View File

@ -6901,6 +6901,20 @@ func TestHead_NumStaleSeries(t *testing.T) {
require.Equal(t, uint64(numSeries), head.NumSeries())
}
restartHeadAndVerifySeriesCounts := func(numStaleSeries, numSeries int) {
verifySeriesCounts(numStaleSeries, numSeries)
require.NoError(t, head.Close())
wal, err := wlog.NewSize(nil, nil, filepath.Join(head.opts.ChunkDirRoot, "wal"), 32768, compression.None)
require.NoError(t, err)
head, err = NewHead(nil, nil, wal, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(0))
verifySeriesCounts(numStaleSeries, numSeries)
}
// Create some series with normal samples.
series1 := labels.FromStrings("name", "series1", "label", "value1")
series2 := labels.FromStrings("name", "series2", "label", "value2")
@ -6920,10 +6934,12 @@ func TestHead_NumStaleSeries(t *testing.T) {
// Make series2 stale as well.
appendSample(series2, 200, math.Float64frombits(value.StaleNaN))
verifySeriesCounts(2, 3)
restartHeadAndVerifySeriesCounts(2, 3)
// Add a non-stale sample to series1. It should not be counted as stale now.
appendSample(series1, 300, 10)
verifySeriesCounts(1, 3)
restartHeadAndVerifySeriesCounts(1, 3)
// Test that series3 doesn't become stale when we add another normal sample.
appendSample(series3, 200, 10)
@ -6946,6 +6962,7 @@ func TestHead_NumStaleSeries(t *testing.T) {
fh := tsdbutil.GenerateTestFloatHistograms(1)[0]
appendFloatHistogram(series5, 100, fh)
verifySeriesCounts(2, 5)
restartHeadAndVerifySeriesCounts(2, 5)
// Make float histogram series stale.
staleFH := fh.Copy()
@ -6969,7 +6986,7 @@ func TestHead_NumStaleSeries(t *testing.T) {
// so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series.
appendSample(series1, 400, 10)
appendFloatHistogram(series5, 400, staleFH)
verifySeriesCounts(3, 5)
restartHeadAndVerifySeriesCounts(3, 5)
// Test garbage collection behavior - stale series should be decremented when GC'd.
// Force a garbage collection by truncating old data.

View File

@ -16,6 +16,7 @@ package tsdb
import (
"errors"
"fmt"
"github.com/prometheus/prometheus/model/value"
"maps"
"math"
"os"
@ -627,6 +628,14 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.T <= ms.mmMaxTime {
continue
}
if !value.IsStaleNaN(ms.lastValue) && value.IsStaleNaN(s.V) {
h.numStaleSeries.Inc()
}
if value.IsStaleNaN(ms.lastValue) && !value.IsStaleNaN(s.V) {
h.numStaleSeries.Dec()
}
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
@ -657,12 +666,28 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.t <= ms.mmMaxTime {
continue
}
var chunkCreated bool
var chunkCreated, newlyStale, staleToNonStale bool
if s.h != nil {
newlyStale = value.IsStaleNaN(s.h.Sum)
if ms.lastHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum)
}
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
} else {
newlyStale = value.IsStaleNaN(s.fh.Sum)
if ms.lastFloatHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum)
}
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
}
if newlyStale {
h.numStaleSeries.Inc()
}
if staleToNonStale {
h.numStaleSeries.Dec()
}
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()