diff --git a/storage/series.go b/storage/series.go index e61b225937..5339070de1 100644 --- a/storage/series.go +++ b/storage/series.go @@ -439,7 +439,17 @@ func (e errChunksIterator) Err() error { return e.err } // ExpandSamples iterates over all samples in the iterator, buffering all in slice. // Optionally it takes samples constructor, useful when you want to compare sample slices with different // sample implementations. if nil, sample type from this package will be used. +// For float sample, NaN values are replaced with -42. func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { + return expandSamples(iter, true, newSampleFn) +} + +// ExpandSamplesWithoutReplacingNaNs is same as ExpandSamples but it does not replace float sample NaN values with anything. +func ExpandSamplesWithoutReplacingNaNs(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { + return expandSamples(iter, false, newSampleFn) +} + +func expandSamples(iter chunkenc.Iterator, replaceNaN bool, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { if newSampleFn == nil { newSampleFn = func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample { switch { @@ -461,7 +471,7 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, case chunkenc.ValFloat: t, f := iter.At() // NaNs can't be compared normally, so substitute for another value. - if math.IsNaN(f) { + if replaceNaN && math.IsNaN(f) { f = -42 } result = append(result, newSampleFn(t, f, nil, nil)) diff --git a/tsdb/block.go b/tsdb/block.go index 44c6ef5053..41b3037989 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -233,6 +233,18 @@ func (bm *BlockMetaCompaction) FromOutOfOrder() bool { return slices.Contains(bm.Hints, CompactionHintFromOutOfOrder) } +func (bm *BlockMetaCompaction) SetStaleSeries() { + if bm.FromStaleSeries() { + return + } + bm.Hints = append(bm.Hints, CompactionHintFromStaleSeries) + slices.Sort(bm.Hints) +} + +func (bm *BlockMetaCompaction) FromStaleSeries() bool { + return slices.Contains(bm.Hints, CompactionHintFromStaleSeries) +} + const ( indexFilename = "index" metaFilename = "meta.json" @@ -241,6 +253,10 @@ const ( // CompactionHintFromOutOfOrder is a hint noting that the block // was created from out-of-order chunks. CompactionHintFromOutOfOrder = "from-out-of-order" + + // CompactionHintFromStaleSeries is a hint noting that the block + // was created from stale series. + CompactionHintFromStaleSeries = "from-stale-series" ) func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } diff --git a/tsdb/compact.go b/tsdb/compact.go index 7828fd0860..ce15387bab 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -578,6 +578,9 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b if base.Compaction.FromOutOfOrder() { meta.Compaction.SetOutOfOrder() } + if base.Compaction.FromStaleSeries() { + meta.Compaction.SetStaleSeries() + } } err := c.write(dest, meta, DefaultBlockPopulator{}, b) diff --git a/tsdb/db.go b/tsdb/db.go index 4d21d4dc12..878eab34e7 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -44,6 +44,7 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met. + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/compression" @@ -222,6 +223,19 @@ type Options struct { // UseUncachedIO allows bypassing the page cache when appropriate. UseUncachedIO bool + + // StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction + // if the number of stale series crosses the given threshold. + StaleSeriesCompactionInterval time.Duration + + // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in + // the in-memory Head block. If the % of stale series crosses this threshold, stale series + // compaction will be run in the next stale series compaction interval. + StaleSeriesCompactionThreshold float64 + + // StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in + // the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately. + StaleSeriesImmediateCompactionThreshold float64 } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -818,6 +832,12 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { // configured maximum block duration. rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3) } + + // For testing purposes. TODO: remove before merging the PR. + opts.StaleSeriesCompactionInterval = 15 * time.Minute + opts.StaleSeriesCompactionThreshold = 0.2 + opts.StaleSeriesImmediateCompactionThreshold = 0.35 + return opts, rngs } @@ -1089,6 +1109,17 @@ func (db *DB) run(ctx context.Context) { backoff := time.Duration(0) + nextStaleSeriesCompactionTime := time.Now().Round(db.opts.StaleSeriesCompactionInterval) + if nextStaleSeriesCompactionTime.Before(time.Now()) { + nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval) + } + + staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime) + if db.opts.StaleSeriesCompactionInterval <= 0 { + // Long enough interval so that we don't schedule a stale series compaction. + staleSeriesWaitDur = 365 * 24 * time.Hour + } + for { select { case <-db.stopc: @@ -1104,6 +1135,16 @@ func (db *DB) run(ctx context.Context) { } db.cmtx.Unlock() + // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. + numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() + staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) + if db.autoCompact && db.opts.StaleSeriesImmediateCompactionThreshold > 0 && + staleSeriesRatio >= db.opts.StaleSeriesImmediateCompactionThreshold { + if err := db.CompactStaleHead(); err != nil { + db.logger.Error("immediate stale series compaction failed", "err", err) + } + } + select { case db.compactc <- struct{}{}: default: @@ -1125,6 +1166,20 @@ func (db *DB) run(ctx context.Context) { db.metrics.compactionsSkipped.Inc() } db.autoCompactMtx.Unlock() + case <-time.After(staleSeriesWaitDur): + // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. + numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() + staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) + if db.autoCompact && db.opts.StaleSeriesCompactionThreshold > 0 && + staleSeriesRatio >= db.opts.StaleSeriesCompactionThreshold { + if err := db.CompactStaleHead(); err != nil { + db.logger.Error("scheduled stale series compaction failed", "err", err) + } + } + + nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval) + staleSeriesWaitDur = time.Until(nextStaleSeriesCompactionTime) + case <-db.stopc: return } @@ -1488,6 +1543,46 @@ func (db *DB) compactHead(head *RangeHead) error { return nil } +func (db *DB) CompactStaleHead() error { + // TODO: compact the OOO data as well. + // TODO: test splitting of data in multiple blocks if stale series crosses block boundary. + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.logger.Info("Starting stale series compaction") + + staleSeriesRefs := db.head.SortedStaleSeriesRefs(context.Background()) + meta := &BlockMeta{} + meta.Compaction.SetStaleSeries() + mint, maxt := db.head.MinTime(), db.head.MaxTime() + for ; mint < maxt; mint += db.head.chunkRange.Load() { + staleHead := NewStaleHead(db.Head(), mint, mint+db.head.chunkRange.Load(), staleSeriesRefs) + + uids, err := db.compactor.Write(db.dir, staleHead, staleHead.MinTime(), staleHead.BlockMaxTime(), meta) + if err != nil { + return fmt.Errorf("persist stale head: %w", err) + } + + db.logger.Info("Stale series block created", "ulids", fmt.Sprintf("%v", uids), "min_time", mint, "max_time", maxt) + + if err := db.reloadBlocks(); err != nil { + multiErr := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err)) + for _, uid := range uids { + if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { + multiErr.Add(fmt.Errorf("delete persisted stale head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll)) + } + } + return multiErr.Err() + } + } + + db.head.truncateStaleSeries(index.NewListPostings(staleSeriesRefs), maxt) + db.head.RebuildSymbolTable(db.logger) + + db.logger.Info("Ending stale series compaction") + return nil +} + // compactBlocks compacts all the eligible on-disk blocks. // The db.cmtx should be held before calling this method. func (db *DB) compactBlocks() (err error) { @@ -1945,7 +2040,7 @@ func (db *DB) inOrderBlocksMaxTime() (maxt int64, ok bool) { maxt, ok = int64(math.MinInt64), false // If blocks are overlapping, last block might not have the max time. So check all blocks. for _, b := range db.Blocks() { - if !b.meta.Compaction.FromOutOfOrder() && b.meta.MaxTime > maxt { + if !b.meta.Compaction.FromOutOfOrder() && !b.meta.Compaction.FromStaleSeries() && b.meta.MaxTime > maxt { ok = true maxt = b.meta.MaxTime } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2c9bfe6640..406ef19952 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -51,6 +51,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" @@ -100,8 +101,8 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) { return db } -// query runs a matcher query against the querier and fully expands its data. -func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { +// queryHelper runs a matcher query against the querier and fully expands its data. +func queryHelper(t testing.TB, q storage.Querier, withNaNReplacement bool, matchers ...*labels.Matcher) map[string][]chunks.Sample { ss := q.Select(context.Background(), false, nil, matchers...) defer func() { require.NoError(t, q.Close()) @@ -113,7 +114,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str series := ss.At() it = series.Iterator(it) - samples, err := storage.ExpandSamples(it, newSample) + var samples []chunks.Sample + var err error + if withNaNReplacement { + samples, err = storage.ExpandSamples(it, newSample) + } else { + samples, err = storage.ExpandSamplesWithoutReplacingNaNs(it, newSample) + } require.NoError(t, err) require.NoError(t, it.Err()) @@ -130,6 +137,16 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str return result } +// query runs a matcher query against the querier and fully expands its data. +func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { + return queryHelper(t, q, true, matchers...) +} + +// queryWithoutReplacingNaNs runs a matcher query against the querier and fully expands its data. +func queryWithoutReplacingNaNs(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { + return queryHelper(t, q, false, matchers...) +} + // queryAndExpandChunks runs a matcher query against the querier and fully expands its data into samples. func queryAndExpandChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][][]chunks.Sample { s := queryChunks(t, q, matchers...) @@ -9517,3 +9534,158 @@ func TestBlockClosingBlockedDuringRemoteRead(t *testing.T) { case <-blockClosed: } } + +func TestStaleSeriesCompaction(t *testing.T) { + opts := DefaultOptions() + db := openTestDB(t, opts, nil) + db.DisableCompactions() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + var ( + nonStaleSeries, staleSeries, + nonStaleHist, staleHist, + nonStaleFHist, staleFHist []labels.Labels + numSeriesPerCategory = 1 + ) + for i := 0; i < numSeriesPerCategory; i++ { + nonStaleSeries = append(nonStaleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 1000+i))) + nonStaleHist = append(nonStaleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 2000+i))) + nonStaleFHist = append(nonStaleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 3000+i))) + + staleSeries = append(staleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 4000+i))) + staleHist = append(staleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 5000+i))) + staleFHist = append(staleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 6000+i))) + } + + var ( + v = 10.0 + staleV = math.Float64frombits(value.StaleNaN) + h = tsdbutil.GenerateTestHistograms(1)[0] + fh = tsdbutil.GenerateTestFloatHistograms(1)[0] + staleH = &histogram.Histogram{Sum: staleV} + staleFH = &histogram.FloatHistogram{Sum: staleV} + ) + + addNormalSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) { + app := db.Appender(context.Background()) + for i := 0; i < len(floatSeries); i++ { + _, err := app.Append(0, floatSeries[i], ts, v) + require.NoError(t, err) + _, err = app.AppendHistogram(0, histSeries[i], ts, h, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, fh) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + addStaleSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) { + app := db.Appender(context.Background()) + for i := 0; i < len(floatSeries); i++ { + _, err := app.Append(0, floatSeries[i], ts, staleV) + require.NoError(t, err) + _, err = app.AppendHistogram(0, histSeries[i], ts, staleH, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, staleFH) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + // Normal sample for all. + addNormalSamples(100, nonStaleSeries, nonStaleHist, nonStaleFHist) + addNormalSamples(100, staleSeries, staleHist, staleFHist) + + // Stale sample for the stale series. Normal sample for the non-stale series. + addNormalSamples(200, nonStaleSeries, nonStaleHist, nonStaleFHist) + addStaleSamples(200, staleSeries, staleHist, staleFHist) + + // Normal samples for the non-stale series later + addNormalSamples(300, nonStaleSeries, nonStaleHist, nonStaleFHist) + + require.Equal(t, uint64(6*numSeriesPerCategory), db.Head().NumSeries()) + require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumStaleSeries()) + + require.NoError(t, db.CompactStaleHead()) + + require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumSeries()) + require.Equal(t, uint64(0), db.Head().NumStaleSeries()) + require.Len(t, db.Blocks(), 1) + + nonFirstH := h.Copy() + nonFirstH.CounterResetHint = histogram.NotCounterReset + nonFirstFH := fh.Copy() + nonFirstFH.CounterResetHint = histogram.NotCounterReset + + expHeadQuery := make(map[string][]chunks.Sample) + for i := 0; i < numSeriesPerCategory; i++ { + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleSeries[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, f: v}, sample{t: 200, f: v}, sample{t: 300, f: v}, + } + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, h: h}, sample{t: 200, h: nonFirstH}, sample{t: 300, h: nonFirstH}, + } + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleFHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, fh: fh}, sample{t: 200, fh: nonFirstFH}, sample{t: 300, fh: nonFirstFH}, + } + } + + querier, err := NewBlockQuerier(NewRangeHead(db.head, 0, 300), 0, 300) + require.NoError(t, err) + t.Cleanup(func() { + querier.Close() + }) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*")) + require.Equal(t, expHeadQuery, seriesSet) + + expBlockQuery := make(map[string][]chunks.Sample) + for i := 0; i < numSeriesPerCategory; i++ { + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, f: v}, sample{t: 200, f: staleV}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, h: h}, sample{t: 200, h: staleH}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, fh: fh}, sample{t: 200, fh: staleFH}, + } + } + + querier, err = NewBlockQuerier(db.Blocks()[0], 0, 300) + require.NoError(t, err) + t.Cleanup(func() { + querier.Close() + }) + seriesSet = queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*")) + require.Len(t, seriesSet, len(expBlockQuery)) + + // Compare all the samples except the stale value that needs special handling. + for _, category := range [][]labels.Labels{staleSeries, staleHist, staleFHist} { + for i := 0; i < numSeriesPerCategory; i++ { + seriesKey := fmt.Sprintf(`{name="%s"}`, category[i].Get("name")) + samples := expBlockQuery[seriesKey] + actSamples, exists := seriesSet[seriesKey] + require.Truef(t, exists, "series not found in result %s", seriesKey) + require.Len(t, actSamples, len(samples)) + require.Equal(t, samples[0], actSamples[0]) + require.Equal(t, samples[1].T(), actSamples[1].T()) + } + } + + // Check the NaN values. + for i := 0; i < numSeriesPerCategory; i++ { + seriesKey := fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name")) + require.True(t, value.IsStaleNaN(seriesSet[seriesKey][1].F())) + } + + for i := 0; i < numSeriesPerCategory; i++ { + seriesKey := fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name")) + require.True(t, seriesSet[seriesKey][1].H().Equals(staleH)) + } + + for i := 0; i < numSeriesPerCategory; i++ { + seriesKey := fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name")) + require.True(t, seriesSet[seriesKey][1].FH().Equals(staleFH)) + } +} diff --git a/tsdb/head.go b/tsdb/head.go index 7763d272b7..3c1080342e 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -68,6 +69,7 @@ var ( type Head struct { chunkRange atomic.Int64 numSeries atomic.Uint64 + numStaleSeries atomic.Uint64 minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection. minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked. minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. @@ -360,6 +362,7 @@ func (h *Head) resetWLReplayResources() { type headMetrics struct { activeAppenders prometheus.Gauge series prometheus.GaugeFunc + staleSeries prometheus.GaugeFunc seriesCreated prometheus.Counter seriesRemoved prometheus.Counter seriesNotFound prometheus.Counter @@ -406,6 +409,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { }, func() float64 { return float64(h.NumSeries()) }), + staleSeries: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_head_stale_series", + Help: "Total number of stale series in the head block.", + }, func() float64 { + return float64(h.NumStaleSeries()) + }), seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_created_total", Help: "Total number of series created in the head", @@ -532,6 +541,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { r.MustRegister( m.activeAppenders, m.series, + m.staleSeries, m.chunks, m.chunksCreated, m.chunksRemoved, @@ -1188,6 +1198,20 @@ func (h *Head) truncateMemory(mint int64) (err error) { return h.truncateSeriesAndChunkDiskMapper("truncateMemory") } +func (h *Head) truncateStaleSeries(p index.Postings, maxt int64) { + h.chunkSnapshotMtx.Lock() + defer h.chunkSnapshotMtx.Unlock() + + if h.MinTime() >= maxt { + return + } + + // TODO: this will block all queries. See if we can do better. + h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt) + + h.gcStaleSeries(p, maxt) +} + // WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. // The query timeout limits the max wait time of this function implicitly. // The mint is inclusive and maxt is the truncation time hence exclusive. @@ -1538,6 +1562,53 @@ func (h *RangeHead) String() string { return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) } +// StaleHead allows querying the stale series in the Head via an IndexReader, ChunkReader and tombstones.Reader. +// Used only for compactions. +type StaleHead struct { + RangeHead + staleSeriesRefs []storage.SeriesRef +} + +// NewStaleHead returns a *StaleHead. +func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.SeriesRef) *StaleHead { + return &StaleHead{ + RangeHead: RangeHead{ + head: head, + mint: mint, + maxt: maxt, + }, + staleSeriesRefs: staleSeriesRefs, + } +} + +func (h *StaleHead) Index() (_ IndexReader, err error) { + return h.head.staleIndex(h.mint, h.maxt, h.staleSeriesRefs) +} + +func (h *StaleHead) NumSeries() uint64 { + return h.head.NumStaleSeries() +} + +var staleHeadULID = ulid.MustParse("0000000000XXXXXXXSTALEHEAD") + +func (h *StaleHead) Meta() BlockMeta { + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: staleHeadULID, + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, + } +} + +// String returns an human readable representation of the stake head. It's important to +// keep this function in order to avoid the struct dump when the head is stringified in +// errors or logs. +func (h *StaleHead) String() string { + return fmt.Sprintf("stale head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) +} + // Delete all samples in the range of [mint, maxt] for series that satisfy the given // label matchers. func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error { @@ -1607,13 +1678,14 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. - deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef) + deleted, affected, chunksRemoved, staleSeriesDeleted, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef) seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) h.numSeries.Sub(uint64(seriesRemoved)) + h.numStaleSeries.Sub(uint64(staleSeriesDeleted)) // Remove deleted series IDs from the postings lists. h.postings.Delete(deleted, affected) @@ -1640,16 +1712,57 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { return actualInOrderMint, minOOOTime, minMmapFile } +// gcStaleSeries removes all the stale series provided given that they are still stale +// and the series maxt is <= the given max. +func (h *Head) gcStaleSeries(p index.Postings, maxt int64) { + // Drop old chunks and remember series IDs and hashes if they can be + // deleted entirely. + deleted, affected, chunksRemoved := h.series.gcStaleSeries(p, maxt) + seriesRemoved := len(deleted) + + h.metrics.seriesRemoved.Add(float64(seriesRemoved)) + h.metrics.chunksRemoved.Add(float64(chunksRemoved)) + h.metrics.chunks.Sub(float64(chunksRemoved)) + h.numSeries.Sub(uint64(seriesRemoved)) + h.numStaleSeries.Sub(uint64(seriesRemoved)) + + // Remove deleted series IDs from the postings lists. + h.postings.Delete(deleted, affected) + + // Remove tombstones referring to the deleted series. + h.tombstones.DeleteTombstones(deleted) + + if h.wal != nil { + _, last, _ := wlog.Segments(h.wal.Dir()) + h.walExpiriesMtx.Lock() + // Keep series records until we're past segment 'last' + // because the WAL will still have samples records with + // this ref ID. If we didn't keep these series records then + // on start up when we replay the WAL, or any other code + // that reads the WAL, wouldn't be able to use those + // samples since we would have no labels for that ref ID. + for ref := range deleted { + h.walExpiries[chunks.HeadSeriesRef(ref)] = last + } + h.walExpiriesMtx.Unlock() + } +} + // Tombstones returns a new reader over the head's tombstones. func (h *Head) Tombstones() (tombstones.Reader, error) { return h.tombstones, nil } -// NumSeries returns the number of active series in the head. +// NumSeries returns the number of series tracked in the head. func (h *Head) NumSeries() uint64 { return h.numSeries.Load() } +// NumStaleSeries returns the number of stale series in the head. +func (h *Head) NumStaleSeries() uint64 { + return h.numStaleSeries.Load() +} + var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") // Meta returns meta information about the head. @@ -1929,13 +2042,14 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st // but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct // and there's no easy way to cast maps. // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. -func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) { +func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _, _ int, _, _ int64, minMmapFile int) { var ( - deleted = map[storage.SeriesRef]struct{}{} - affected = map[labels.Label]struct{}{} - rmChunks = 0 - actualMint int64 = math.MaxInt64 - minOOOTime int64 = math.MaxInt64 + deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} + rmChunks = 0 + staleSeriesDeleted = 0 + actualMint int64 = math.MaxInt64 + minOOOTime int64 = math.MaxInt64 ) minMmapFile = math.MaxInt32 @@ -1987,6 +2101,12 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( defer s.locks[refShard].Unlock() } + if value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { + staleSeriesDeleted++ + } + deleted[storage.SeriesRef(series.ref)] = struct{}{} series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) s.hashes[hashShard].del(hash, series.ref) @@ -2000,7 +2120,71 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( actualMint = mint } - return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile + return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile +} + +// TODO: add comments. +func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) { + var ( + deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} + rmChunks = 0 + ) + + staleSeriesMap := map[storage.SeriesRef]struct{}{} + for p.Next() { + ref := p.At() + staleSeriesMap[ref] = struct{}{} + } + + check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) { + if _, exists := staleSeriesMap[storage.SeriesRef(series.ref)]; !exists { + // This series was not compacted. Skip it. + return + } + + series.Lock() + defer series.Unlock() + + if series.maxTime() > maxt { + return + } + + // Check if the series is still stale. + isStale := value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) + + if !isStale { + return + } + + if series.headChunks != nil { + rmChunks += series.headChunks.len() + } + rmChunks += len(series.mmappedChunks) + + // The series is gone entirely. We need to keep the series lock + // and make sure we have acquired the stripe locks for hash and ID of the + // series alike. + // If we don't hold them all, there's a very small chance that a series receives + // samples again while we are half-way into deleting it. + refShard := int(series.ref) & (s.size - 1) + if hashShard != refShard { + s.locks[refShard].Lock() + defer s.locks[refShard].Unlock() + } + + deleted[storage.SeriesRef(series.ref)] = struct{}{} + series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) + s.hashes[hashShard].del(hash, series.ref) + delete(s.series[refShard], series.ref) + deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function. + } + + s.iterForDeletion(check) + + return deleted, affected, rmChunks } // The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each. diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 05299f048d..fa44f752f2 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1222,6 +1222,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { acc.floatsAppended-- } default: + newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V) + staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V) ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1230,6 +1232,12 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { // The sample is an exact duplicate, and should be silently dropped. acc.floatsAppended-- @@ -1310,6 +1318,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) { acc.histogramsAppended-- } default: + newlyStale := value.IsStaleNaN(s.H.Sum) + staleToNonStale := false + if series.lastHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum) + } ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1318,6 +1332,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { acc.histogramsAppended-- acc.histoOOORejected++ @@ -1398,6 +1418,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { acc.histogramsAppended-- } default: + newlyStale := value.IsStaleNaN(s.FH.Sum) + staleToNonStale := false + if series.lastFloatHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum) + } ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1406,6 +1432,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { acc.histogramsAppended-- acc.histoOOORejected++ diff --git a/tsdb/head_read.go b/tsdb/head_read.go index b653b5dc14..4dd2f22f8a 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -201,9 +202,102 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB return nil } +func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) (*headStaleIndexReader, error) { + return &headStaleIndexReader{ + headIndexReader: h.indexRange(mint, maxt), + staleSeriesRefs: staleSeriesRefs, + }, nil +} + +type headStaleIndexReader struct { + *headIndexReader + staleSeriesRefs []storage.SeriesRef +} + +func (h *headStaleIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { + // If all postings are requested, return the precalculated list. + k, v := index.AllPostingsKey() + if len(h.staleSeriesRefs) > 0 && name == k && len(values) == 1 && values[0] == v { + return index.NewListPostings(h.staleSeriesRefs), nil + } + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.Postings(ctx, name, values...)) + if err != nil { + return index.ErrPostings(err), err + } + return index.NewListPostings(seriesRefs), nil +} + +func (h *headStaleIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings { + // Unused for compaction, so we don't need to optimise. + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForLabelMatching(ctx, name, match)) + if err != nil { + return index.ErrPostings(err) + } + return index.NewListPostings(seriesRefs) +} + +func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + // Unused for compaction, so we don't need to optimise. + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForAllLabelValues(ctx, name)) + if err != nil { + return index.ErrPostings(err) + } + return index.NewListPostings(seriesRefs) +} + +func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) { + series := make([]*memSeries, 0, 128) + + notFoundSeriesCount := 0 + for p.Next() { + s := h.series.getByID(chunks.HeadSeriesRef(p.At())) + if s == nil { + notFoundSeriesCount++ + continue + } + + s.Lock() + if value.IsStaleNaN(s.lastValue) || + (s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) || + (s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) { + series = append(series, s) + } + s.Unlock() + } + if notFoundSeriesCount > 0 { + h.logger.Debug("Looked up stale series not found", "count", notFoundSeriesCount) + } + if err := p.Err(); err != nil { + return nil, fmt.Errorf("expand postings: %w", err) + } + + slices.SortFunc(series, func(a, b *memSeries) int { + return labels.Compare(a.labels(), b.labels()) + }) + + // Convert back to list. + ep := make([]storage.SeriesRef, 0, len(series)) + for _, p := range series { + ep = append(ep, storage.SeriesRef(p.ref)) + } + return ep, nil +} + +func (h *headStaleIndexReader) SortedPostings(p index.Postings) index.Postings { + // All the postings function above already give the sorted list of postings. + return p +} + +func (h *Head) SortedStaleSeriesRefs(ctx context.Context) []storage.SeriesRef { + k, v := index.AllPostingsKey() + // TODO: handle error + seriesRefs, _ := h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v)) + return seriesRefs +} + func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { for i, c := range s.mmappedChunks { - // Do not expose chunks that are outside of the specified range. + // Do not expose chunks that are outside the specified range. if !c.OverlapsClosedInterval(mint, maxt) { continue } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 73f67f4e8a..3fad73297f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6866,3 +6866,130 @@ func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(sto wg.Wait() } + +func TestHead_NumStaleSeries(t *testing.T) { + head, _ := newTestHead(t, 1000, compression.None, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + // Initially, no series should be stale. + require.Equal(t, uint64(0), head.NumStaleSeries()) + + appendSample := func(lbls labels.Labels, ts int64, val float64) { + app := head.Appender(context.Background()) + _, err := app.Append(0, lbls, ts, val) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + appendHistogram := func(lbls labels.Labels, ts int64, val *histogram.Histogram) { + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, lbls, ts, val, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + appendFloatHistogram := func(lbls labels.Labels, ts int64, val *histogram.FloatHistogram) { + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, lbls, ts, nil, val) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + verifySeriesCounts := func(numStaleSeries, numSeries int) { + require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries()) + require.Equal(t, uint64(numSeries), head.NumSeries()) + } + + // Create some series with normal samples. + series1 := labels.FromStrings("name", "series1", "label", "value1") + series2 := labels.FromStrings("name", "series2", "label", "value2") + series3 := labels.FromStrings("name", "series3", "label", "value3") + + // Add normal samples to all series. + appendSample(series1, 100, 1) + appendSample(series2, 100, 2) + appendSample(series3, 100, 3) + // Still no stale series. + verifySeriesCounts(0, 3) + + // Make series1 stale by appending a stale sample. Now we should have 1 stale series. + appendSample(series1, 200, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(1, 3) + + // Make series2 stale as well. + appendSample(series2, 200, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(2, 3) + + // Add a non-stale sample to series1. It should not be counted as stale now. + appendSample(series1, 300, 10) + verifySeriesCounts(1, 3) + + // Test that series3 doesn't become stale when we add another normal sample. + appendSample(series3, 200, 10) + verifySeriesCounts(1, 3) + + // Test histogram stale samples as well. + series4 := labels.FromStrings("name", "series4", "type", "histogram") + h := tsdbutil.GenerateTestHistograms(1)[0] + appendHistogram(series4, 100, h) + verifySeriesCounts(1, 4) + + // Make histogram series stale. + staleHist := h.Copy() + staleHist.Sum = math.Float64frombits(value.StaleNaN) + appendHistogram(series4, 200, staleHist) + verifySeriesCounts(2, 4) + + // Test float histogram stale samples. + series5 := labels.FromStrings("name", "series5", "type", "float_histogram") + fh := tsdbutil.GenerateTestFloatHistograms(1)[0] + appendFloatHistogram(series5, 100, fh) + verifySeriesCounts(2, 5) + + // Make float histogram series stale. + staleFH := fh.Copy() + staleFH.Sum = math.Float64frombits(value.StaleNaN) + appendFloatHistogram(series5, 200, staleFH) + verifySeriesCounts(3, 5) + + // Make histogram sample non-stale and stale back again. + appendHistogram(series4, 210, h) + verifySeriesCounts(2, 5) + appendHistogram(series4, 220, staleHist) + verifySeriesCounts(3, 5) + + // Make float histogram sample non-stale and stale back again. + appendFloatHistogram(series5, 210, fh) + verifySeriesCounts(2, 5) + appendFloatHistogram(series5, 220, staleFH) + verifySeriesCounts(3, 5) + + // Series 1 and 3 are not stale at this point. Add a new sample to series 1 and series 5, + // so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series. + appendSample(series1, 400, 10) + appendFloatHistogram(series5, 400, staleFH) + verifySeriesCounts(3, 5) + + // Test garbage collection behavior - stale series should be decremented when GC'd. + // Force a garbage collection by truncating old data. + require.NoError(t, head.Truncate(300)) + + // After truncation, run GC to collect old chunks/series. + head.gc() + + // series 1 and series 5 are left. + verifySeriesCounts(1, 2) + + // Test creating a new series for each of float, histogram, float histogram that starts as stale. + // This should be counted as stale. + series6 := labels.FromStrings("name", "series6", "direct", "stale") + series7 := labels.FromStrings("name", "series7", "direct", "stale") + series8 := labels.FromStrings("name", "series8", "direct", "stale") + appendSample(series6, 400, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(2, 3) + appendHistogram(series7, 400, staleHist) + verifySeriesCounts(3, 4) + appendFloatHistogram(series8, 400, staleFH) + verifySeriesCounts(4, 5) +}