mirror of
https://github.com/prometheus/prometheus.git
synced 2025-12-05 17:41:21 +01:00
Add DB Options to trigger staleseries compactions
Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
parent
de8ea977a9
commit
5349f2ceaa
57
tsdb/db.go
57
tsdb/db.go
@ -18,7 +18,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
@ -45,6 +44,7 @@ import (
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met.
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
"github.com/prometheus/prometheus/util/compression"
|
||||
@ -223,6 +223,19 @@ type Options struct {
|
||||
|
||||
// UseUncachedIO allows bypassing the page cache when appropriate.
|
||||
UseUncachedIO bool
|
||||
|
||||
// StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction
|
||||
// if the number of stale series crosses the given threshold.
|
||||
StaleSeriesCompactionInterval time.Duration
|
||||
|
||||
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
|
||||
// the in-memory Head block. If the % of stale series crosses this threshold, stale series
|
||||
// compaction will be run in the next stale series compaction interval.
|
||||
StaleSeriesCompactionThreshold float64
|
||||
|
||||
// StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
|
||||
// the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately.
|
||||
StaleSeriesImmediateCompactionThreshold float64
|
||||
}
|
||||
|
||||
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
|
||||
@ -819,6 +832,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
||||
// configured maximum block duration.
|
||||
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
|
||||
}
|
||||
|
||||
return opts, rngs
|
||||
}
|
||||
|
||||
@ -1090,6 +1104,17 @@ func (db *DB) run(ctx context.Context) {
|
||||
|
||||
backoff := time.Duration(0)
|
||||
|
||||
nextStaleSeriesCompactionTime := time.Now().Round(db.opts.StaleSeriesCompactionInterval)
|
||||
if nextStaleSeriesCompactionTime.Before(time.Now()) {
|
||||
nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval)
|
||||
}
|
||||
|
||||
staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime)
|
||||
if db.opts.StaleSeriesCompactionInterval <= 0 {
|
||||
// Long enough interval so that we don't schedule a stale series compaction.
|
||||
staleSeriesWaitDur = 365 * 24 * time.Hour
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-db.stopc:
|
||||
@ -1105,6 +1130,19 @@ func (db *DB) run(ctx context.Context) {
|
||||
}
|
||||
db.cmtx.Unlock()
|
||||
|
||||
// TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon.
|
||||
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
|
||||
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
|
||||
db.logger.Info("TEMP stale series ratio", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries)
|
||||
if db.autoCompact && db.opts.StaleSeriesImmediateCompactionThreshold > 0 &&
|
||||
staleSeriesRatio >= db.opts.StaleSeriesImmediateCompactionThreshold {
|
||||
db.logger.Info("TEMP starting immediate stale series compaction", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries)
|
||||
if err := db.CompactStaleHead(); err != nil {
|
||||
db.logger.Error("immediate stale series compaction failed", "err", err)
|
||||
}
|
||||
db.logger.Info("TEMP ended immediate stale series compaction", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries)
|
||||
}
|
||||
|
||||
select {
|
||||
case db.compactc <- struct{}{}:
|
||||
default:
|
||||
@ -1126,6 +1164,23 @@ func (db *DB) run(ctx context.Context) {
|
||||
db.metrics.compactionsSkipped.Inc()
|
||||
}
|
||||
db.autoCompactMtx.Unlock()
|
||||
case <-time.After(staleSeriesWaitDur):
|
||||
// TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon.
|
||||
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
|
||||
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
|
||||
db.logger.Info("TEMP stale series ratio", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries)
|
||||
if db.autoCompact && db.opts.StaleSeriesCompactionThreshold > 0 &&
|
||||
staleSeriesRatio >= db.opts.StaleSeriesCompactionThreshold {
|
||||
db.logger.Info("TEMP starting timed stale series compaction", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries)
|
||||
if err := db.CompactStaleHead(); err != nil {
|
||||
db.logger.Error("scheduled stale series compaction failed", "err", err)
|
||||
}
|
||||
db.logger.Info("TEMP ended timed stale series compaction", "ratio", staleSeriesRatio, "num_series", numSeries, "num_stale_series", numStaleSeries)
|
||||
}
|
||||
|
||||
nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval)
|
||||
staleSeriesWaitDur = time.Until(nextStaleSeriesCompactionTime)
|
||||
|
||||
case <-db.stopc:
|
||||
return
|
||||
}
|
||||
|
||||
@ -51,6 +51,7 @@ import (
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
|
||||
25
tsdb/head.go
25
tsdb/head.go
@ -1584,7 +1584,7 @@ func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.Series
|
||||
}
|
||||
|
||||
func (h *StaleHead) Index() (_ IndexReader, err error) {
|
||||
return h.head.staleIndex(h.RangeHead.mint, h.RangeHead.maxt, h.staleSeriesRefs)
|
||||
return h.head.staleIndex(h.mint, h.maxt, h.staleSeriesRefs)
|
||||
}
|
||||
|
||||
func (h *StaleHead) NumSeries() uint64 {
|
||||
@ -1680,13 +1680,14 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
||||
|
||||
// Drop old chunks and remember series IDs and hashes if they can be
|
||||
// deleted entirely.
|
||||
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries)
|
||||
deleted, affected, chunksRemoved, staleSeriesDeleted, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||||
seriesRemoved := len(deleted)
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||
h.numSeries.Sub(uint64(seriesRemoved))
|
||||
h.numStaleSeries.Sub(uint64(staleSeriesDeleted))
|
||||
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted, affected)
|
||||
@ -1713,7 +1714,6 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
||||
// gcStaleSeries removes all the stale series provided given that they are still stale
|
||||
// and the series maxt is <= the given max.
|
||||
func (h *Head) gcStaleSeries(p index.Postings, maxt int64) {
|
||||
|
||||
// Drop old chunks and remember series IDs and hashes if they can be
|
||||
// deleted entirely.
|
||||
deleted, affected, chunksRemoved := h.series.gcStaleSeries(p, maxt)
|
||||
@ -2041,13 +2041,14 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
||||
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
|
||||
// and there's no easy way to cast maps.
|
||||
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _, _ int, _, _ int64, minMmapFile int) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
actualMint int64 = math.MaxInt64
|
||||
minOOOTime int64 = math.MaxInt64
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
staleSeriesDeleted = 0
|
||||
actualMint int64 = math.MaxInt64
|
||||
minOOOTime int64 = math.MaxInt64
|
||||
)
|
||||
minMmapFile = math.MaxInt32
|
||||
|
||||
@ -2102,7 +2103,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n
|
||||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||
numStaleSeries.Dec()
|
||||
staleSeriesDeleted++
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
@ -2118,10 +2119,10 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n
|
||||
actualMint = mint
|
||||
}
|
||||
|
||||
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
|
||||
return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile
|
||||
}
|
||||
|
||||
// TODO: add comments
|
||||
// TODO: add comments.
|
||||
func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
|
||||
@ -1244,7 +1244,6 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
|
||||
if staleToNonStale {
|
||||
a.head.numStaleSeries.Dec()
|
||||
}
|
||||
|
||||
} else {
|
||||
// The sample is an exact duplicate, and should be silently dropped.
|
||||
acc.floatsAppended--
|
||||
|
||||
@ -17,12 +17,12 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"math"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
|
||||
@ -6688,7 +6688,7 @@ func TestStripeSeries_gc(t *testing.T) {
|
||||
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
|
||||
hash := ms1.lset.Hash()
|
||||
|
||||
s.gc(0, 0, nil)
|
||||
s.gc(0, 0)
|
||||
|
||||
// Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series
|
||||
got := s.getByHash(hash, ms1.lset)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user