diff --git a/tsdb/db.go b/tsdb/db.go index 3373cf13eb..d3c023546f 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "github.com/prometheus/prometheus/tsdb/index" "io" "io/fs" "log/slog" @@ -45,6 +44,7 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met. + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wlog" "github.com/prometheus/prometheus/util/compression" @@ -223,6 +223,19 @@ type Options struct { // UseUncachedIO allows bypassing the page cache when appropriate. UseUncachedIO bool + + // StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction + // if the number of stale series crosses the given threshold. + StaleSeriesCompactionInterval time.Duration + + // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in + // the in-memory Head block. If the % of stale series crosses this threshold, stale series + // compaction will be run in the next stale series compaction interval. + StaleSeriesCompactionThreshold float64 + + // StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in + // the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately. + StaleSeriesImmediateCompactionThreshold float64 } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -819,6 +832,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { // configured maximum block duration. rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3) } + return opts, rngs } @@ -1090,6 +1104,17 @@ func (db *DB) run(ctx context.Context) { backoff := time.Duration(0) + nextStaleSeriesCompactionTime := time.Now().Round(db.opts.StaleSeriesCompactionInterval) + if nextStaleSeriesCompactionTime.Before(time.Now()) { + nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval) + } + + staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime) + if db.opts.StaleSeriesCompactionInterval <= 0 { + // Long enough interval so that we don't schedule a stale series compaction. + staleSeriesWaitDur = 365 * 24 * time.Hour + } + for { select { case <-db.stopc: @@ -1105,6 +1130,19 @@ func (db *DB) run(ctx context.Context) { } db.cmtx.Unlock() + // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. + numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() + staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) + db.logger.Info("TEMP stale series ratio", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries) + if db.autoCompact && db.opts.StaleSeriesImmediateCompactionThreshold > 0 && + staleSeriesRatio >= db.opts.StaleSeriesImmediateCompactionThreshold { + db.logger.Info("TEMP starting immediate stale series compaction", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries) + if err := db.CompactStaleHead(); err != nil { + db.logger.Error("immediate stale series compaction failed", "err", err) + } + db.logger.Info("TEMP ended immediate stale series compaction", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries) + } + select { case db.compactc <- struct{}{}: default: @@ -1126,6 +1164,23 @@ func (db *DB) run(ctx context.Context) { db.metrics.compactionsSkipped.Inc() } db.autoCompactMtx.Unlock() + case <-time.After(staleSeriesWaitDur): + // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. + numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() + staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) + db.logger.Info("TEMP stale series ratio", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries) + if db.autoCompact && db.opts.StaleSeriesCompactionThreshold > 0 && + staleSeriesRatio >= db.opts.StaleSeriesCompactionThreshold { + db.logger.Info("TEMP starting timed stale series compaction", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries) + if err := db.CompactStaleHead(); err != nil { + db.logger.Error("scheduled stale series compaction failed", "err", err) + } + db.logger.Info("TEMP ended timed stale series compaction", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries) + } + + nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval) + staleSeriesWaitDur = time.Until(nextStaleSeriesCompactionTime) + case <-db.stopc: return } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 00a1e1a0f5..ecfe1ab804 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -51,6 +51,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" diff --git a/tsdb/head.go b/tsdb/head.go index 5092fd9280..c43082812e 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1584,7 +1584,7 @@ func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.Series } func (h *StaleHead) Index() (_ IndexReader, err error) { - return h.head.staleIndex(h.RangeHead.mint, h.RangeHead.maxt, h.staleSeriesRefs) + return h.head.staleIndex(h.mint, h.maxt, h.staleSeriesRefs) } func (h *StaleHead) NumSeries() uint64 { @@ -1680,13 +1680,14 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. - deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries) + deleted, affected, chunksRemoved, staleSeriesDeleted, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef) seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) h.numSeries.Sub(uint64(seriesRemoved)) + h.numStaleSeries.Sub(uint64(staleSeriesDeleted)) // Remove deleted series IDs from the postings lists. h.postings.Delete(deleted, affected) @@ -1713,7 +1714,6 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // gcStaleSeries removes all the stale series provided given that they are still stale // and the series maxt is <= the given max. func (h *Head) gcStaleSeries(p index.Postings, maxt int64) { - // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. deleted, affected, chunksRemoved := h.series.gcStaleSeries(p, maxt) @@ -2041,13 +2041,14 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st // but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct // and there's no easy way to cast maps. // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. -func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) { +func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _, _ int, _, _ int64, minMmapFile int) { var ( - deleted = map[storage.SeriesRef]struct{}{} - affected = map[labels.Label]struct{}{} - rmChunks = 0 - actualMint int64 = math.MaxInt64 - minOOOTime int64 = math.MaxInt64 + deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} + rmChunks = 0 + staleSeriesDeleted = 0 + actualMint int64 = math.MaxInt64 + minOOOTime int64 = math.MaxInt64 ) minMmapFile = math.MaxInt32 @@ -2102,7 +2103,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n if value.IsStaleNaN(series.lastValue) || (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { - numStaleSeries.Dec() + staleSeriesDeleted++ } deleted[storage.SeriesRef(series.ref)] = struct{}{} @@ -2118,10 +2119,10 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n actualMint = mint } - return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile + return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile } -// TODO: add comments +// TODO: add comments. func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) { var ( deleted = map[storage.SeriesRef]struct{}{} diff --git a/tsdb/head_append.go b/tsdb/head_append.go index e2b9ae4fca..a5d6e9127a 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1244,7 +1244,6 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { if staleToNonStale { a.head.numStaleSeries.Dec() } - } else { // The sample is an exact duplicate, and should be silently dropped. acc.floatsAppended-- diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 80c115a73b..b33ee3393a 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -17,12 +17,12 @@ import ( "context" "errors" "fmt" - "github.com/prometheus/prometheus/model/value" "math" "slices" "sync" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" diff --git a/tsdb/head_test.go b/tsdb/head_test.go index e141c1dcfd..8a07b24d8f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6688,7 +6688,7 @@ func TestStripeSeries_gc(t *testing.T) { s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) hash := ms1.lset.Hash() - s.gc(0, 0, nil) + s.gc(0, 0) // Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series got := s.getByHash(hash, ms1.lset)