This commit is contained in:
Ganesh Vernekar 2025-08-05 10:59:15 -04:00 committed by GitHub
commit e542261a4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 183 additions and 4 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -68,6 +69,7 @@ var (
type Head struct {
chunkRange atomic.Int64
numSeries atomic.Uint64
numStaleSeries atomic.Uint64
minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection.
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked.
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
@ -360,6 +362,7 @@ func (h *Head) resetWLReplayResources() {
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.GaugeFunc
staleSeries prometheus.GaugeFunc
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
@ -406,6 +409,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
}, func() float64 {
return float64(h.NumSeries())
}),
staleSeries: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_stale_series",
Help: "Total number of stale series in the head block.",
}, func() float64 {
return float64(h.NumStaleSeries())
}),
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head",
@ -1607,7 +1616,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries)
seriesRemoved := len(deleted)
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
@ -1645,11 +1654,16 @@ func (h *Head) Tombstones() (tombstones.Reader, error) {
return h.tombstones, nil
}
// NumSeries returns the number of active series in the head.
// NumSeries returns the number of series tracked in the head.
func (h *Head) NumSeries() uint64 {
return h.numSeries.Load()
}
// NumStaleSeries returns the number of stale series in the head.
func (h *Head) NumStaleSeries() uint64 {
return h.numStaleSeries.Load()
}
var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")
// Meta returns meta information about the head.
@ -1929,7 +1943,7 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
// and there's no easy way to cast maps.
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
var (
deleted = map[storage.SeriesRef]struct{}{}
affected = map[labels.Label]struct{}{}
@ -1987,6 +2001,12 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
defer s.locks[refShard].Unlock()
}
if value.IsStaleNaN(series.lastValue) ||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
numStaleSeries.Dec()
}
deleted[storage.SeriesRef(series.ref)] = struct{}{}
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
s.hashes[hashShard].del(hash, series.ref)

View File

@ -1222,6 +1222,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
acc.floatsAppended--
}
default:
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
@ -1230,6 +1232,12 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
// The sample is an exact duplicate, and should be silently dropped.
acc.floatsAppended--
@ -1310,6 +1318,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
acc.histogramsAppended--
}
default:
newlyStale := value.IsStaleNaN(s.H.Sum)
staleToNonStale := false
if series.lastHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
}
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
@ -1318,6 +1332,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
acc.histogramsAppended--
acc.histoOOORejected++
@ -1398,6 +1418,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
acc.histogramsAppended--
}
default:
newlyStale := value.IsStaleNaN(s.FH.Sum)
staleToNonStale := false
if series.lastFloatHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
}
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
@ -1406,6 +1432,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
acc.histogramsAppended--
acc.histoOOORejected++

View File

@ -6435,7 +6435,7 @@ func TestStripeSeries_gc(t *testing.T) {
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
hash := ms1.lset.Hash()
s.gc(0, 0)
s.gc(0, 0, nil)
// Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series
got := s.getByHash(hash, ms1.lset)
@ -6866,3 +6866,130 @@ func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(sto
wg.Wait()
}
func TestHead_NumStaleSeries(t *testing.T) {
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))
// Initially, no series should be stale.
require.Equal(t, uint64(0), head.NumStaleSeries())
appendSample := func(lbls labels.Labels, ts int64, val float64) {
app := head.Appender(context.Background())
_, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
appendHistogram := func(lbls labels.Labels, ts int64, val *histogram.Histogram) {
app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, lbls, ts, val, nil)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
appendFloatHistogram := func(lbls labels.Labels, ts int64, val *histogram.FloatHistogram) {
app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, lbls, ts, nil, val)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
verifySeriesCounts := func(numStaleSeries, numSeries int) {
require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries())
require.Equal(t, uint64(numSeries), head.NumSeries())
}
// Create some series with normal samples.
series1 := labels.FromStrings("name", "series1", "label", "value1")
series2 := labels.FromStrings("name", "series2", "label", "value2")
series3 := labels.FromStrings("name", "series3", "label", "value3")
// Add normal samples to all series.
appendSample(series1, 100, 1)
appendSample(series2, 100, 2)
appendSample(series3, 100, 3)
// Still no stale series.
verifySeriesCounts(0, 3)
// Make series1 stale by appending a stale sample. Now we should have 1 stale series.
appendSample(series1, 200, math.Float64frombits(value.StaleNaN))
verifySeriesCounts(1, 3)
// Make series2 stale as well.
appendSample(series2, 200, math.Float64frombits(value.StaleNaN))
verifySeriesCounts(2, 3)
// Add a non-stale sample to series1. It should not be counted as stale now.
appendSample(series1, 300, 10)
verifySeriesCounts(1, 3)
// Test that series3 doesn't become stale when we add another normal sample.
appendSample(series3, 200, 10)
verifySeriesCounts(1, 3)
// Test histogram stale samples as well.
series4 := labels.FromStrings("name", "series4", "type", "histogram")
h := tsdbutil.GenerateTestHistograms(1)[0]
appendHistogram(series4, 100, h)
verifySeriesCounts(1, 4)
// Make histogram series stale.
staleHist := h.Copy()
staleHist.Sum = math.Float64frombits(value.StaleNaN)
appendHistogram(series4, 200, staleHist)
verifySeriesCounts(2, 4)
// Test float histogram stale samples.
series5 := labels.FromStrings("name", "series5", "type", "float_histogram")
fh := tsdbutil.GenerateTestFloatHistograms(1)[0]
appendFloatHistogram(series5, 100, fh)
verifySeriesCounts(2, 5)
// Make float histogram series stale.
staleFH := fh.Copy()
staleFH.Sum = math.Float64frombits(value.StaleNaN)
appendFloatHistogram(series5, 200, staleFH)
verifySeriesCounts(3, 5)
// Make histogram sample non-stale and stale back again.
appendHistogram(series4, 210, h)
verifySeriesCounts(2, 5)
appendHistogram(series4, 220, staleHist)
verifySeriesCounts(3, 5)
// Make float histogram sample non-stale and stale back again.
appendFloatHistogram(series5, 210, fh)
verifySeriesCounts(2, 5)
appendFloatHistogram(series5, 220, staleFH)
verifySeriesCounts(3, 5)
// Series 1 and 3 are not stale at this point. Add a new sample to series 1 and series 5,
// so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series.
appendSample(series1, 400, 10)
appendFloatHistogram(series5, 400, staleFH)
verifySeriesCounts(3, 5)
// Test garbage collection behavior - stale series should be decremented when GC'd.
// Force a garbage collection by truncating old data.
require.NoError(t, head.Truncate(300))
// After truncation, run GC to collect old chunks/series.
head.gc()
// series 1 and series 5 are left.
verifySeriesCounts(1, 2)
// Test creating a new series for each of float, histogram, float histogram that starts as stale.
// This should be counted as stale.
series6 := labels.FromStrings("name", "series6", "direct", "stale")
series7 := labels.FromStrings("name", "series7", "direct", "stale")
series8 := labels.FromStrings("name", "series8", "direct", "stale")
appendSample(series6, 400, math.Float64frombits(value.StaleNaN))
verifySeriesCounts(2, 3)
appendHistogram(series7, 400, staleHist)
verifySeriesCounts(3, 4)
appendFloatHistogram(series8, 400, staleFH)
verifySeriesCounts(4, 5)
}