Add support for stale series compaction to TSDB

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2025-07-25 15:51:14 -07:00
parent 7cf585527f
commit de8ea977a9
6 changed files with 322 additions and 3 deletions

View File

@ -439,7 +439,17 @@ func (e errChunksIterator) Err() error { return e.err }
// ExpandSamples iterates over all samples in the iterator, buffering all in slice.
// Optionally it takes samples constructor, useful when you want to compare sample slices with different
// sample implementations. if nil, sample type from this package will be used.
// For float sample, NaN values are replaced with -42.
func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
return expandSamples(iter, true, newSampleFn)
}
// ExpandSamplesWithoutReplacingNaNs is same as ExpandSamples but it does not replace float sample NaN values with anything.
func ExpandSamplesWithoutReplacingNaNs(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
return expandSamples(iter, false, newSampleFn)
}
func expandSamples(iter chunkenc.Iterator, replaceNaN bool, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
if newSampleFn == nil {
newSampleFn = func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample {
switch {
@ -461,7 +471,7 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64,
case chunkenc.ValFloat:
t, f := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(f) {
if replaceNaN && math.IsNaN(f) {
f = -42
}
result = append(result, newSampleFn(t, f, nil, nil))

View File

@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/prometheus/tsdb/index"
"io"
"io/fs"
"log/slog"
@ -1488,6 +1489,47 @@ func (db *DB) compactHead(head *RangeHead) error {
return nil
}
// CompactStaleHead compacts all the stale series that do no have out-of-order data into persistent blocks.
// If a stale series has out-of-order data, it is not possible to tell if the series stopped getting any data completely.
func (db *DB) CompactStaleHead() error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.logger.Info("Starting stale series compaction")
staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background())
if err != nil {
return err
}
meta := &BlockMeta{}
meta.Compaction.SetStaleSeries()
mint, maxt := db.head.opts.ChunkRange*(db.head.MinTime()/db.head.opts.ChunkRange), db.head.MaxTime()
for ; mint < maxt; mint += db.head.chunkRange.Load() {
staleHead := NewStaleHead(db.Head(), mint, mint+db.head.chunkRange.Load()-1, staleSeriesRefs)
uids, err := db.compactor.Write(db.dir, staleHead, staleHead.MinTime(), staleHead.BlockMaxTime(), nil)
if err != nil {
return fmt.Errorf("persist stale head: %w", err)
}
if err := db.reloadBlocks(); err != nil {
multiErr := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err))
for _, uid := range uids {
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
multiErr.Add(fmt.Errorf("delete persisted stale head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
}
}
return multiErr.Err()
}
}
db.head.truncateStaleSeries(index.NewListPostings(staleSeriesRefs), maxt)
db.head.RebuildSymbolTable(db.logger)
return nil
}
// compactBlocks compacts all the eligible on-disk blocks.
// The db.cmtx should be held before calling this method.
func (db *DB) compactBlocks() (err error) {

View File

@ -83,7 +83,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
tmpdir := t.TempDir()
var err error
if opts == nil {
if opts = nil {
opts = DefaultOptions()
}
opts.EnableNativeHistograms = true

View File

@ -1198,6 +1198,20 @@ func (h *Head) truncateMemory(mint int64) (err error) {
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
}
func (h *Head) truncateStaleSeries(p index.Postings, maxt int64) {
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
if h.MinTime() >= maxt {
return
}
// TODO: this will block all queries. See if we can do better.
h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt)
h.gcStaleSeries(p, maxt)
}
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
// The query timeout limits the max wait time of this function implicitly.
// The mint is inclusive and maxt is the truncation time hence exclusive.
@ -1550,6 +1564,53 @@ func (h *RangeHead) String() string {
return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
}
// StaleHead allows querying the stale series in the Head via an IndexReader, ChunkReader and tombstones.Reader.
// Used only for compactions.
type StaleHead struct {
RangeHead
staleSeriesRefs []storage.SeriesRef
}
// NewStaleHead returns a *StaleHead.
func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.SeriesRef) *StaleHead {
return &StaleHead{
RangeHead: RangeHead{
head: head,
mint: mint,
maxt: maxt,
},
staleSeriesRefs: staleSeriesRefs,
}
}
func (h *StaleHead) Index() (_ IndexReader, err error) {
return h.head.staleIndex(h.RangeHead.mint, h.RangeHead.maxt, h.staleSeriesRefs)
}
func (h *StaleHead) NumSeries() uint64 {
return h.head.NumStaleSeries()
}
var staleHeadULID = ulid.MustParse("0000000000XXXXXXXSTALEHEAD")
func (h *StaleHead) Meta() BlockMeta {
return BlockMeta{
MinTime: h.MinTime(),
MaxTime: h.MaxTime(),
ULID: staleHeadULID,
Stats: BlockStats{
NumSeries: h.NumSeries(),
},
}
}
// String returns an human readable representation of the stake head. It's important to
// keep this function in order to avoid the struct dump when the head is stringified in
// errors or logs.
func (h *StaleHead) String() string {
return fmt.Sprintf("stale head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
}
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
// label matchers.
func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
@ -1649,6 +1710,43 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
return actualInOrderMint, minOOOTime, minMmapFile
}
// gcStaleSeries removes all the stale series provided given that they are still stale
// and the series maxt is <= the given max.
func (h *Head) gcStaleSeries(p index.Postings, maxt int64) {
// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, affected, chunksRemoved := h.series.gcStaleSeries(p, maxt)
seriesRemoved := len(deleted)
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved))
h.numSeries.Sub(uint64(seriesRemoved))
h.numStaleSeries.Sub(uint64(seriesRemoved))
// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted, affected)
// Remove tombstones referring to the deleted series.
h.tombstones.DeleteTombstones(deleted)
if h.wal != nil {
_, last, _ := wlog.Segments(h.wal.Dir())
h.walExpiriesMtx.Lock()
// Keep series records until we're past segment 'last'
// because the WAL will still have samples records with
// this ref ID. If we didn't keep these series records then
// on start up when we replay the WAL, or any other code
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted {
h.walExpiries[chunks.HeadSeriesRef(ref)] = last
}
h.walExpiriesMtx.Unlock()
}
}
// Tombstones returns a new reader over the head's tombstones.
func (h *Head) Tombstones() (tombstones.Reader, error) {
return h.tombstones, nil
@ -2023,6 +2121,70 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
}
// TODO: add comments
func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) {
var (
deleted = map[storage.SeriesRef]struct{}{}
affected = map[labels.Label]struct{}{}
rmChunks = 0
)
staleSeriesMap := map[storage.SeriesRef]struct{}{}
for p.Next() {
ref := p.At()
staleSeriesMap[ref] = struct{}{}
}
check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) {
if _, exists := staleSeriesMap[storage.SeriesRef(series.ref)]; !exists {
// This series was not compacted. Skip it.
return
}
series.Lock()
defer series.Unlock()
if series.maxTime() > maxt {
return
}
// Check if the series is still stale.
isStale := value.IsStaleNaN(series.lastValue) ||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum))
if !isStale {
return
}
if series.headChunks != nil {
rmChunks += series.headChunks.len()
}
rmChunks += len(series.mmappedChunks)
// The series is gone entirely. We need to keep the series lock
// and make sure we have acquired the stripe locks for hash and ID of the
// series alike.
// If we don't hold them all, there's a very small chance that a series receives
// samples again while we are half-way into deleting it.
refShard := int(series.ref) & (s.size - 1)
if hashShard != refShard {
s.locks[refShard].Lock()
defer s.locks[refShard].Unlock()
}
deleted[storage.SeriesRef(series.ref)] = struct{}{}
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
s.hashes[hashShard].del(hash, series.ref)
delete(s.series[refShard], series.ref)
deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function.
}
s.iterForDeletion(check)
return deleted, affected, rmChunks
}
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
// The checkDeletedFunc takes a map as input and should add to it all series that were deleted and should be included
// when invoking the PostDeletion hook.

View File

@ -1244,6 +1244,7 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
// The sample is an exact duplicate, and should be silently dropped.
acc.floatsAppended--

View File

@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/prometheus/model/value"
"math"
"slices"
"sync"
@ -201,9 +202,112 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
return nil
}
func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) (*headStaleIndexReader, error) {
return &headStaleIndexReader{
headIndexReader: h.indexRange(mint, maxt),
staleSeriesRefs: staleSeriesRefs,
}, nil
}
// headStaleIndexReader gives the stale series that have no out-of-order data.
type headStaleIndexReader struct {
*headIndexReader
staleSeriesRefs []storage.SeriesRef
}
func (h *headStaleIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
// If all postings are requested, return the precalculated list.
k, v := index.AllPostingsKey()
if len(h.staleSeriesRefs) > 0 && name == k && len(values) == 1 && values[0] == v {
return index.NewListPostings(h.staleSeriesRefs), nil
}
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.Postings(ctx, name, values...))
if err != nil {
return index.ErrPostings(err), err
}
return index.NewListPostings(seriesRefs), nil
}
func (h *headStaleIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
// Unused for compaction, so we don't need to optimise.
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForLabelMatching(ctx, name, match))
if err != nil {
return index.ErrPostings(err)
}
return index.NewListPostings(seriesRefs)
}
func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
// Unused for compaction, so we don't need to optimise.
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForAllLabelValues(ctx, name))
if err != nil {
return index.ErrPostings(err)
}
return index.NewListPostings(seriesRefs)
}
// filterStaleSeriesAndSortPostings returns the stale series references from the given postings
// that also do not have any out-of-order data.
func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) {
series := make([]*memSeries, 0, 128)
notFoundSeriesCount := 0
for p.Next() {
s := h.series.getByID(chunks.HeadSeriesRef(p.At()))
if s == nil {
notFoundSeriesCount++
continue
}
s.Lock()
if s.ooo != nil {
// Has out-of-order data; skip it.
s.Unlock()
continue
}
if value.IsStaleNaN(s.lastValue) ||
(s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) ||
(s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) {
series = append(series, s)
}
s.Unlock()
}
if notFoundSeriesCount > 0 {
h.logger.Debug("Looked up stale series not found", "count", notFoundSeriesCount)
}
if err := p.Err(); err != nil {
return nil, fmt.Errorf("expand postings: %w", err)
}
slices.SortFunc(series, func(a, b *memSeries) int {
return labels.Compare(a.labels(), b.labels())
})
// Convert back to list.
ep := make([]storage.SeriesRef, 0, len(series))
for _, p := range series {
ep = append(ep, storage.SeriesRef(p.ref))
}
return ep, nil
}
// SortedPostings returns the postings as it is because we expect any postings obtained via
// headStaleIndexReader to be already sorted.
func (*headStaleIndexReader) SortedPostings(p index.Postings) index.Postings {
// All the postings function above already give the sorted list of postings.
return p
}
// SortedStaleSeriesRefsNoOOOData returns all the series refs of the stale series that do not have any out-of-order data.
func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.SeriesRef, error) {
k, v := index.AllPostingsKey()
return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v))
}
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside of the specified range.
// Do not expose chunks that are outside the specified range.
if !c.OverlapsClosedInterval(mint, maxt) {
continue
}