From 3b79fb207e44326488455a5ffd594cbdaaf2d91c Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 15 Aug 2025 18:46:12 -0700 Subject: [PATCH] Ignore compacted stale series from WAL Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 5 +-- tsdb/db_test.go | 16 ++++++++- tsdb/head.go | 90 ++++++++++++++++++++++++++++++++++++++++++------ tsdb/head_wal.go | 29 +++++++++++++--- 4 files changed, 123 insertions(+), 17 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 37bfa2ebac..adef3a9efd 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -44,7 +44,6 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met. - "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/compression" @@ -1574,7 +1573,9 @@ func (db *DB) CompactStaleHead() error { } } - db.head.truncateStaleSeries(index.NewListPostings(staleSeriesRefs), maxt) + if err := db.head.truncateStaleSeries(staleSeriesRefs, maxt); err != nil { + return fmt.Errorf("head truncate: %w", err) + } db.head.RebuildSymbolTable(db.logger) db.logger.Info("Ending stale series compaction") diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 274739bae1..431690b824 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -9673,7 +9673,9 @@ func TestStaleSeriesCompaction(t *testing.T) { nonFirstFH.CounterResetHint = histogram.NotCounterReset // Verify head block. - { + verifyHeadBlock := func() { + require.Equal(t, uint64(3), db.head.NumSeries()) + require.Equal(t, uint64(0), db.head.NumStaleSeries()) expHeadQuery := make(map[string][]chunks.Sample) for i := 0; i < numSeriesPerCategory; i++ { @@ -9697,6 +9699,8 @@ func TestStaleSeriesCompaction(t *testing.T) { require.Equal(t, expHeadQuery, seriesSet) } + verifyHeadBlock() + // Verify blocks from stale series. { expBlockQuery := make(map[string][]chunks.Sample) @@ -9769,4 +9773,14 @@ func TestStaleSeriesCompaction(t *testing.T) { } } } + + { + // Restart DB and verify that stale series were discarded from WAL replay. + require.NoError(t, db.Close()) + var err error + db, err = Open(db.Dir(), db.logger, db.registerer, db.opts, nil) + require.NoError(t, err) + + verifyHeadBlock() + } } diff --git a/tsdb/head.go b/tsdb/head.go index c43082812e..3fd9285a47 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1198,18 +1198,33 @@ func (h *Head) truncateMemory(mint int64) (err error) { return h.truncateSeriesAndChunkDiskMapper("truncateMemory") } -func (h *Head) truncateStaleSeries(p index.Postings, maxt int64) { +func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) error { h.chunkSnapshotMtx.Lock() defer h.chunkSnapshotMtx.Unlock() - if h.MinTime() >= maxt { - return + // Record these stale series refs in the WAL so that we can ignore them during replay. + if h.wal != nil { + stones := make([]tombstones.Stone, 0, len(seriesRefs)) + for _, ref := range seriesRefs { + stones = append(stones, tombstones.Stone{ + Ref: ref, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}}, + }) + } + var enc record.Encoder + if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { + return err + } + } + + if h.MinTime() >= maxt { + return nil } - // TODO: this will block all queries. See if we can do better. h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt) - h.gcStaleSeries(p, maxt) + h.gcStaleSeries(seriesRefs, maxt) + return nil } // WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. @@ -1713,10 +1728,10 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // gcStaleSeries removes all the stale series provided given that they are still stale // and the series maxt is <= the given max. -func (h *Head) gcStaleSeries(p index.Postings, maxt int64) { +func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) { // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. - deleted, affected, chunksRemoved := h.series.gcStaleSeries(p, maxt) + deleted, affected, chunksRemoved := h.series.gcStaleSeries(seriesRefs, maxt) seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) @@ -2122,8 +2137,64 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile } +// deleteSeriesByID deletes the series with the given reference. +// Only used for WAL replay. +func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) { + var ( + deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} + staleSeriesDeleted = 0 + chunksRemoved = 0 + ) + + for _, ref := range refs { + refShard := int(ref) & (h.series.size - 1) + h.series.locks[refShard].Lock() + + // Copying getByID here to avoid locking and unlocking twice. + series := h.series.series[refShard][ref] + if series == nil { + h.series.locks[refShard].Unlock() + continue + } + + if value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { + staleSeriesDeleted++ + } + + hash := series.lset.Hash() + hashShard := int(hash) & (h.series.size - 1) + + chunksRemoved += len(series.mmappedChunks) + if series.headChunks != nil { + chunksRemoved += series.headChunks.len() + } + + deleted[storage.SeriesRef(series.ref)] = struct{}{} + series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) + h.series.hashes[hashShard].del(hash, series.ref) + delete(h.series.series[refShard], series.ref) + + h.series.locks[refShard].Unlock() + } + + h.metrics.seriesRemoved.Add(float64(len(deleted))) + h.metrics.chunksRemoved.Add(float64(chunksRemoved)) + h.metrics.chunks.Sub(float64(chunksRemoved)) + h.numSeries.Sub(uint64(len(deleted))) + h.numStaleSeries.Sub(uint64(staleSeriesDeleted)) + + // Remove deleted series IDs from the postings lists. + h.postings.Delete(deleted, affected) + + // Remove tombstones referring to the deleted series. + h.tombstones.DeleteTombstones(deleted) +} + // TODO: add comments. -func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) { +func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) { var ( deleted = map[storage.SeriesRef]struct{}{} affected = map[labels.Label]struct{}{} @@ -2131,8 +2202,7 @@ func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storag ) staleSeriesMap := map[storage.SeriesRef]struct{}{} - for p.Next() { - ref := p.At() + for _, ref := range seriesRefs { staleSeriesMap[ref] = struct{}{} } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 9b0982423f..ed77f7d3ef 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -308,7 +308,15 @@ Outer: } h.wlReplaySamplesPool.Put(v) case []tombstones.Stone: + // Tombstone records will be fairly rare, so not trying to optimise the allocations here. + deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency) for _, s := range v { + if len(s.Intervals) == 1 && s.Intervals[0].Mint == math.MinInt64 && s.Intervals[0].Maxt == math.MaxInt64 { + // This series was fully deleted at this point. + mod := uint64(s.Ref) % uint64(concurrency) + deleteSeriesShards[mod] = append(deleteSeriesShards[mod], chunks.HeadSeriesRef(s.Ref)) + continue + } for _, itv := range s.Intervals { if itv.Maxt < h.minValidTime.Load() { continue @@ -326,6 +334,14 @@ Outer: h.tombstones.AddInterval(s.Ref, itv) } } + + for i := 0; i < concurrency; i++ { + if len(deleteSeriesShards[i]) > 0 { + processors[i].input <- walSubsetProcessorInputItem{deletedSeriesRefs: deleteSeriesShards[i]} + deleteSeriesShards[i] = nil + } + } + h.wlReplaytStonesPool.Put(v) case []record.RefExemplar: for _, e := range v { @@ -558,10 +574,11 @@ type walSubsetProcessor struct { } type walSubsetProcessorInputItem struct { - samples []record.RefSample - histogramSamples []histogramRecord - existingSeries *memSeries - walSeriesRef chunks.HeadSeriesRef + samples []record.RefSample + histogramSamples []histogramRecord + existingSeries *memSeries + walSeriesRef chunks.HeadSeriesRef + deletedSeriesRefs []chunks.HeadSeriesRef } func (wp *walSubsetProcessor) setup() { @@ -712,6 +729,10 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp case wp.histogramsOutput <- in.histogramSamples: default: } + + if len(in.deletedSeriesRefs) > 0 { + h.deleteSeriesByID(in.deletedSeriesRefs) + } } h.updateMinMaxTime(mint, maxt)