From de8ea977a99278dd291f0966915d703943c792d8 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 25 Jul 2025 15:51:14 -0700 Subject: [PATCH] Add support for stale series compaction to TSDB Signed-off-by: Ganesh Vernekar --- storage/series.go | 12 +++- tsdb/db.go | 42 ++++++++++++ tsdb/db_test.go | 2 +- tsdb/head.go | 162 ++++++++++++++++++++++++++++++++++++++++++++ tsdb/head_append.go | 1 + tsdb/head_read.go | 106 ++++++++++++++++++++++++++++- 6 files changed, 322 insertions(+), 3 deletions(-) diff --git a/storage/series.go b/storage/series.go index 2fff56785a..8d0a4e5494 100644 --- a/storage/series.go +++ b/storage/series.go @@ -439,7 +439,17 @@ func (e errChunksIterator) Err() error { return e.err } // ExpandSamples iterates over all samples in the iterator, buffering all in slice. // Optionally it takes samples constructor, useful when you want to compare sample slices with different // sample implementations. if nil, sample type from this package will be used. +// For float sample, NaN values are replaced with -42. func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { + return expandSamples(iter, true, newSampleFn) +} + +// ExpandSamplesWithoutReplacingNaNs is same as ExpandSamples but it does not replace float sample NaN values with anything. +func ExpandSamplesWithoutReplacingNaNs(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { + return expandSamples(iter, false, newSampleFn) +} + +func expandSamples(iter chunkenc.Iterator, replaceNaN bool, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { if newSampleFn == nil { newSampleFn = func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample { switch { @@ -461,7 +471,7 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, case chunkenc.ValFloat: t, f := iter.At() // NaNs can't be compared normally, so substitute for another value. - if math.IsNaN(f) { + if replaceNaN && math.IsNaN(f) { f = -42 } result = append(result, newSampleFn(t, f, nil, nil)) diff --git a/tsdb/db.go b/tsdb/db.go index 093ec5ab27..3373cf13eb 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/prometheus/tsdb/index" "io" "io/fs" "log/slog" @@ -1488,6 +1489,47 @@ func (db *DB) compactHead(head *RangeHead) error { return nil } +// CompactStaleHead compacts all the stale series that do no have out-of-order data into persistent blocks. +// If a stale series has out-of-order data, it is not possible to tell if the series stopped getting any data completely. +func (db *DB) CompactStaleHead() error { + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.logger.Info("Starting stale series compaction") + + staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background()) + if err != nil { + return err + } + meta := &BlockMeta{} + meta.Compaction.SetStaleSeries() + mint, maxt := db.head.opts.ChunkRange*(db.head.MinTime()/db.head.opts.ChunkRange), db.head.MaxTime() + for ; mint < maxt; mint += db.head.chunkRange.Load() { + staleHead := NewStaleHead(db.Head(), mint, mint+db.head.chunkRange.Load()-1, staleSeriesRefs) + + uids, err := db.compactor.Write(db.dir, staleHead, staleHead.MinTime(), staleHead.BlockMaxTime(), nil) + if err != nil { + return fmt.Errorf("persist stale head: %w", err) + } + + if err := db.reloadBlocks(); err != nil { + multiErr := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err)) + for _, uid := range uids { + if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { + multiErr.Add(fmt.Errorf("delete persisted stale head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll)) + } + } + return multiErr.Err() + } + } + + db.head.truncateStaleSeries(index.NewListPostings(staleSeriesRefs), maxt) + + db.head.RebuildSymbolTable(db.logger) + + return nil +} + // compactBlocks compacts all the eligible on-disk blocks. // The db.cmtx should be held before calling this method. func (db *DB) compactBlocks() (err error) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 87b3619705..00a1e1a0f5 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -83,7 +83,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) { tmpdir := t.TempDir() var err error - if opts == nil { + if opts = nil { opts = DefaultOptions() } opts.EnableNativeHistograms = true diff --git a/tsdb/head.go b/tsdb/head.go index cd0f771f96..5092fd9280 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1198,6 +1198,20 @@ func (h *Head) truncateMemory(mint int64) (err error) { return h.truncateSeriesAndChunkDiskMapper("truncateMemory") } +func (h *Head) truncateStaleSeries(p index.Postings, maxt int64) { + h.chunkSnapshotMtx.Lock() + defer h.chunkSnapshotMtx.Unlock() + + if h.MinTime() >= maxt { + return + } + + // TODO: this will block all queries. See if we can do better. + h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt) + + h.gcStaleSeries(p, maxt) +} + // WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. // The query timeout limits the max wait time of this function implicitly. // The mint is inclusive and maxt is the truncation time hence exclusive. @@ -1550,6 +1564,53 @@ func (h *RangeHead) String() string { return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) } +// StaleHead allows querying the stale series in the Head via an IndexReader, ChunkReader and tombstones.Reader. +// Used only for compactions. +type StaleHead struct { + RangeHead + staleSeriesRefs []storage.SeriesRef +} + +// NewStaleHead returns a *StaleHead. +func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.SeriesRef) *StaleHead { + return &StaleHead{ + RangeHead: RangeHead{ + head: head, + mint: mint, + maxt: maxt, + }, + staleSeriesRefs: staleSeriesRefs, + } +} + +func (h *StaleHead) Index() (_ IndexReader, err error) { + return h.head.staleIndex(h.RangeHead.mint, h.RangeHead.maxt, h.staleSeriesRefs) +} + +func (h *StaleHead) NumSeries() uint64 { + return h.head.NumStaleSeries() +} + +var staleHeadULID = ulid.MustParse("0000000000XXXXXXXSTALEHEAD") + +func (h *StaleHead) Meta() BlockMeta { + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: staleHeadULID, + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, + } +} + +// String returns an human readable representation of the stake head. It's important to +// keep this function in order to avoid the struct dump when the head is stringified in +// errors or logs. +func (h *StaleHead) String() string { + return fmt.Sprintf("stale head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) +} + // Delete all samples in the range of [mint, maxt] for series that satisfy the given // label matchers. func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error { @@ -1649,6 +1710,43 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { return actualInOrderMint, minOOOTime, minMmapFile } +// gcStaleSeries removes all the stale series provided given that they are still stale +// and the series maxt is <= the given max. +func (h *Head) gcStaleSeries(p index.Postings, maxt int64) { + + // Drop old chunks and remember series IDs and hashes if they can be + // deleted entirely. + deleted, affected, chunksRemoved := h.series.gcStaleSeries(p, maxt) + seriesRemoved := len(deleted) + + h.metrics.seriesRemoved.Add(float64(seriesRemoved)) + h.metrics.chunksRemoved.Add(float64(chunksRemoved)) + h.metrics.chunks.Sub(float64(chunksRemoved)) + h.numSeries.Sub(uint64(seriesRemoved)) + h.numStaleSeries.Sub(uint64(seriesRemoved)) + + // Remove deleted series IDs from the postings lists. + h.postings.Delete(deleted, affected) + + // Remove tombstones referring to the deleted series. + h.tombstones.DeleteTombstones(deleted) + + if h.wal != nil { + _, last, _ := wlog.Segments(h.wal.Dir()) + h.walExpiriesMtx.Lock() + // Keep series records until we're past segment 'last' + // because the WAL will still have samples records with + // this ref ID. If we didn't keep these series records then + // on start up when we replay the WAL, or any other code + // that reads the WAL, wouldn't be able to use those + // samples since we would have no labels for that ref ID. + for ref := range deleted { + h.walExpiries[chunks.HeadSeriesRef(ref)] = last + } + h.walExpiriesMtx.Unlock() + } +} + // Tombstones returns a new reader over the head's tombstones. func (h *Head) Tombstones() (tombstones.Reader, error) { return h.tombstones, nil @@ -2023,6 +2121,70 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile } +// TODO: add comments +func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) { + var ( + deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} + rmChunks = 0 + ) + + staleSeriesMap := map[storage.SeriesRef]struct{}{} + for p.Next() { + ref := p.At() + staleSeriesMap[ref] = struct{}{} + } + + check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) { + if _, exists := staleSeriesMap[storage.SeriesRef(series.ref)]; !exists { + // This series was not compacted. Skip it. + return + } + + series.Lock() + defer series.Unlock() + + if series.maxTime() > maxt { + return + } + + // Check if the series is still stale. + isStale := value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) + + if !isStale { + return + } + + if series.headChunks != nil { + rmChunks += series.headChunks.len() + } + rmChunks += len(series.mmappedChunks) + + // The series is gone entirely. We need to keep the series lock + // and make sure we have acquired the stripe locks for hash and ID of the + // series alike. + // If we don't hold them all, there's a very small chance that a series receives + // samples again while we are half-way into deleting it. + refShard := int(series.ref) & (s.size - 1) + if hashShard != refShard { + s.locks[refShard].Lock() + defer s.locks[refShard].Unlock() + } + + deleted[storage.SeriesRef(series.ref)] = struct{}{} + series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) + s.hashes[hashShard].del(hash, series.ref) + delete(s.series[refShard], series.ref) + deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function. + } + + s.iterForDeletion(check) + + return deleted, affected, rmChunks +} + // The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each. // The checkDeletedFunc takes a map as input and should add to it all series that were deleted and should be included // when invoking the PostDeletion hook. diff --git a/tsdb/head_append.go b/tsdb/head_append.go index a5d6e9127a..e2b9ae4fca 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1244,6 +1244,7 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { if staleToNonStale { a.head.numStaleSeries.Dec() } + } else { // The sample is an exact duplicate, and should be silently dropped. acc.floatsAppended-- diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 26b95880d3..80c115a73b 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/prometheus/model/value" "math" "slices" "sync" @@ -201,9 +202,112 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB return nil } +func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) (*headStaleIndexReader, error) { + return &headStaleIndexReader{ + headIndexReader: h.indexRange(mint, maxt), + staleSeriesRefs: staleSeriesRefs, + }, nil +} + +// headStaleIndexReader gives the stale series that have no out-of-order data. +type headStaleIndexReader struct { + *headIndexReader + staleSeriesRefs []storage.SeriesRef +} + +func (h *headStaleIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { + // If all postings are requested, return the precalculated list. + k, v := index.AllPostingsKey() + if len(h.staleSeriesRefs) > 0 && name == k && len(values) == 1 && values[0] == v { + return index.NewListPostings(h.staleSeriesRefs), nil + } + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.Postings(ctx, name, values...)) + if err != nil { + return index.ErrPostings(err), err + } + return index.NewListPostings(seriesRefs), nil +} + +func (h *headStaleIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings { + // Unused for compaction, so we don't need to optimise. + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForLabelMatching(ctx, name, match)) + if err != nil { + return index.ErrPostings(err) + } + return index.NewListPostings(seriesRefs) +} + +func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + // Unused for compaction, so we don't need to optimise. + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForAllLabelValues(ctx, name)) + if err != nil { + return index.ErrPostings(err) + } + return index.NewListPostings(seriesRefs) +} + +// filterStaleSeriesAndSortPostings returns the stale series references from the given postings +// that also do not have any out-of-order data. +func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) { + series := make([]*memSeries, 0, 128) + + notFoundSeriesCount := 0 + for p.Next() { + s := h.series.getByID(chunks.HeadSeriesRef(p.At())) + if s == nil { + notFoundSeriesCount++ + continue + } + + s.Lock() + if s.ooo != nil { + // Has out-of-order data; skip it. + s.Unlock() + continue + } + + if value.IsStaleNaN(s.lastValue) || + (s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) || + (s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) { + series = append(series, s) + } + s.Unlock() + } + if notFoundSeriesCount > 0 { + h.logger.Debug("Looked up stale series not found", "count", notFoundSeriesCount) + } + if err := p.Err(); err != nil { + return nil, fmt.Errorf("expand postings: %w", err) + } + + slices.SortFunc(series, func(a, b *memSeries) int { + return labels.Compare(a.labels(), b.labels()) + }) + + // Convert back to list. + ep := make([]storage.SeriesRef, 0, len(series)) + for _, p := range series { + ep = append(ep, storage.SeriesRef(p.ref)) + } + return ep, nil +} + +// SortedPostings returns the postings as it is because we expect any postings obtained via +// headStaleIndexReader to be already sorted. +func (*headStaleIndexReader) SortedPostings(p index.Postings) index.Postings { + // All the postings function above already give the sorted list of postings. + return p +} + +// SortedStaleSeriesRefsNoOOOData returns all the series refs of the stale series that do not have any out-of-order data. +func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.SeriesRef, error) { + k, v := index.AllPostingsKey() + return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v)) +} + func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { for i, c := range s.mmappedChunks { - // Do not expose chunks that are outside of the specified range. + // Do not expose chunks that are outside the specified range. if !c.OverlapsClosedInterval(mint, maxt) { continue }