Remove some config options

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2025-11-25 18:48:19 -08:00
parent 5b1e6fe398
commit a10a4c797d
4 changed files with 50 additions and 126 deletions

View File

@ -668,9 +668,7 @@ func main() {
} }
if cfgFile.StorageConfig.TSDBConfig != nil { if cfgFile.StorageConfig.TSDBConfig != nil {
cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
cfg.tsdb.StaleSeriesCompactionInterval = time.Duration(cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionInterval)
cfg.tsdb.StaleSeriesCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold cfg.tsdb.StaleSeriesCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold
cfg.tsdb.StaleSeriesImmediateCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesImmediateCompactionThreshold
} }
// Set Go runtime parameters before we get too far into initialization. // Set Go runtime parameters before we get too far into initialization.
@ -1880,9 +1878,7 @@ type tsdbOptions struct {
CompactionDelayMaxPercent int CompactionDelayMaxPercent int
EnableOverlappingCompaction bool EnableOverlappingCompaction bool
UseUncachedIO bool UseUncachedIO bool
StaleSeriesCompactionInterval time.Duration
StaleSeriesCompactionThreshold float64 StaleSeriesCompactionThreshold float64
StaleSeriesImmediateCompactionThreshold float64
} }
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@ -1907,9 +1903,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction, EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
UseUncachedIO: opts.UseUncachedIO, UseUncachedIO: opts.UseUncachedIO,
StaleSeriesCompactionInterval: opts.StaleSeriesCompactionInterval,
StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold, StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold,
StaleSeriesImmediateCompactionThreshold: opts.StaleSeriesImmediateCompactionThreshold,
} }
} }

View File

@ -1022,18 +1022,9 @@ type TSDBConfig struct {
// This should not be used directly and must be converted into OutOfOrderTimeWindow. // This should not be used directly and must be converted into OutOfOrderTimeWindow.
OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"` OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"`
// StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction
// if the number of stale series crosses the given threshold.
StaleSeriesCompactionInterval model.Duration `yaml:"stale_series_compaction_interval,omitempty"`
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
// the in-memory Head block. If the % of stale series crosses this threshold, stale series // the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately.
// compaction will be run in the next stale series compaction interval.
StaleSeriesCompactionThreshold float64 `yaml:"stale_series_compaction_threshold,omitempty"` StaleSeriesCompactionThreshold float64 `yaml:"stale_series_compaction_threshold,omitempty"`
// StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
// the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately.
StaleSeriesImmediateCompactionThreshold float64 `yaml:"stale_series_immediate_compaction_threshold,omitempty"`
} }
// UnmarshalYAML implements the yaml.Unmarshaler interface. // UnmarshalYAML implements the yaml.Unmarshaler interface.

View File

@ -98,15 +98,9 @@ func DefaultOptions() *Options {
// Options of the DB storage. // Options of the DB storage.
type Options struct { type Options struct {
// staleSeriesCompactionInterval is same as below option with same name, but is atomic so that we can do live updates without locks.
// This is the one that must be used by the code.
staleSeriesCompactionInterval atomic.Int64
// staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks.
// This is the one that must be used by the code.
staleSeriesCompactionThreshold atomic.Float64
// staleSeriesImmediateCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks. // staleSeriesImmediateCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks.
// This is the one that must be used by the code. // This is the one that must be used by the code.
staleSeriesImmediateCompactionThreshold atomic.Float64 staleSeriesCompactionThreshold atomic.Float64
// Segments (wal files) max size. // Segments (wal files) max size.
// WALSegmentSize = 0, segment size is default size. // WALSegmentSize = 0, segment size is default size.
@ -233,18 +227,9 @@ type Options struct {
// UseUncachedIO allows bypassing the page cache when appropriate. // UseUncachedIO allows bypassing the page cache when appropriate.
UseUncachedIO bool UseUncachedIO bool
// StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction
// if the number of stale series crosses the given threshold.
StaleSeriesCompactionInterval time.Duration
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
// the in-memory Head block. If the % of stale series crosses this threshold, stale series // the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately.
// compaction will be run in the next stale series compaction interval.
StaleSeriesCompactionThreshold float64 StaleSeriesCompactionThreshold float64
// StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
// the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately.
StaleSeriesImmediateCompactionThreshold float64
} }
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
@ -842,9 +827,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3) rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
} }
opts.staleSeriesCompactionInterval.Store(int64(opts.StaleSeriesCompactionInterval))
opts.staleSeriesCompactionThreshold.Store(opts.StaleSeriesCompactionThreshold) opts.staleSeriesCompactionThreshold.Store(opts.StaleSeriesCompactionThreshold)
opts.staleSeriesImmediateCompactionThreshold.Store(opts.StaleSeriesImmediateCompactionThreshold)
return opts, rngs return opts, rngs
} }
@ -1117,15 +1100,6 @@ func (db *DB) run(ctx context.Context) {
backoff := time.Duration(0) backoff := time.Duration(0)
staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load())
nextStaleSeriesCompactionTime := nextStepAlignedTime(staleSeriesCompactionInterval)
timedStaleSeriesCompactionActive := true
if staleSeriesCompactionInterval <= 0 {
// Far enough so that we don't schedule a stale series compaction.
timedStaleSeriesCompactionActive = false
nextStaleSeriesCompactionTime = time.Now().Add(365 * 24 * time.Hour)
}
for { for {
select { select {
case <-db.stopc: case <-db.stopc:
@ -1133,11 +1107,6 @@ func (db *DB) run(ctx context.Context) {
case <-time.After(backoff): case <-time.After(backoff):
} }
staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime)
if staleSeriesWaitDur < 0 {
staleSeriesWaitDur = 0
}
select { select {
case <-time.After(1 * time.Minute): case <-time.After(1 * time.Minute):
db.cmtx.Lock() db.cmtx.Lock()
@ -1149,8 +1118,8 @@ func (db *DB) run(ctx context.Context) {
// TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon.
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
if db.autoCompact && db.opts.staleSeriesImmediateCompactionThreshold.Load() > 0 && if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 &&
staleSeriesRatio >= db.opts.staleSeriesImmediateCompactionThreshold.Load() { staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() {
if err := db.CompactStaleHead(); err != nil { if err := db.CompactStaleHead(); err != nil {
db.logger.Error("immediate stale series compaction failed", "err", err) db.logger.Error("immediate stale series compaction failed", "err", err)
} }
@ -1163,13 +1132,6 @@ func (db *DB) run(ctx context.Context) {
// We attempt mmapping of head chunks regularly. // We attempt mmapping of head chunks regularly.
db.head.mmapHeadChunks() db.head.mmapHeadChunks()
staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load())
if !timedStaleSeriesCompactionActive && staleSeriesCompactionInterval > 0 {
// The config was updated in realtime.
timedStaleSeriesCompactionActive = true
nextStaleSeriesCompactionTime = nextStepAlignedTime(staleSeriesCompactionInterval)
}
case <-db.compactc: case <-db.compactc:
db.metrics.compactionsTriggered.Inc() db.metrics.compactionsTriggered.Inc()
@ -1185,27 +1147,6 @@ func (db *DB) run(ctx context.Context) {
db.metrics.compactionsSkipped.Inc() db.metrics.compactionsSkipped.Inc()
} }
db.autoCompactMtx.Unlock() db.autoCompactMtx.Unlock()
case <-time.After(staleSeriesWaitDur):
staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load())
if staleSeriesCompactionInterval <= 0 {
// The config was updated in realtime.
// Far enough so that we don't schedule a stale series compaction.
timedStaleSeriesCompactionActive = false
nextStaleSeriesCompactionTime = time.Now().Add(365 * 24 * time.Hour)
continue
}
// TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon.
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 &&
staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() {
if err := db.CompactStaleHead(); err != nil {
db.logger.Error("scheduled stale series compaction failed", "err", err)
}
}
nextStaleSeriesCompactionTime = nextStepAlignedTime(db.opts.StaleSeriesCompactionInterval)
case <-db.stopc: case <-db.stopc:
return return
@ -1247,13 +1188,9 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
oooTimeWindow := int64(0) oooTimeWindow := int64(0)
if conf.StorageConfig.TSDBConfig != nil { if conf.StorageConfig.TSDBConfig != nil {
oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
db.opts.staleSeriesCompactionInterval.Store(int64(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionInterval))
db.opts.staleSeriesCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold) db.opts.staleSeriesCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold)
db.opts.staleSeriesImmediateCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesImmediateCompactionThreshold)
} else { } else {
db.opts.staleSeriesCompactionInterval.Store(0)
db.opts.staleSeriesCompactionThreshold.Store(0) db.opts.staleSeriesCompactionThreshold.Store(0)
db.opts.staleSeriesImmediateCompactionThreshold.Store(0)
} }
if oooTimeWindow < 0 { if oooTimeWindow < 0 {
oooTimeWindow = 0 oooTimeWindow = 0
@ -1592,6 +1529,7 @@ func (db *DB) CompactStaleHead() error {
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
db.logger.Info("Starting stale series compaction") db.logger.Info("Starting stale series compaction")
start := time.Now()
staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background()) staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background())
if err != nil { if err != nil {
@ -1626,7 +1564,7 @@ func (db *DB) CompactStaleHead() error {
} }
db.head.RebuildSymbolTable(db.logger) db.head.RebuildSymbolTable(db.logger)
db.logger.Info("Ending stale series compaction") db.logger.Info("Ending stale series compaction", "duration", time.Since(start))
return nil return nil
} }

View File

@ -308,6 +308,7 @@ Outer:
} }
h.wlReplaySamplesPool.Put(v) h.wlReplaySamplesPool.Put(v)
case []tombstones.Stone: case []tombstones.Stone:
// TODO: what if the ref doesnt match a series directly? Check about multiref!
// Tombstone records will be fairly rare, so not trying to optimise the allocations here. // Tombstone records will be fairly rare, so not trying to optimise the allocations here.
deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency) deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency)
for _, s := range v { for _, s := range v {