From a10a4c797d49b51324bb4effe267d0c922aaa267 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 25 Nov 2025 18:48:19 -0800 Subject: [PATCH] Remove some config options Signed-off-by: Ganesh Vernekar --- cmd/prometheus/main.go | 90 ++++++++++++++++++++---------------------- config/config.go | 11 +----- tsdb/db.go | 74 +++------------------------------- tsdb/head_wal.go | 1 + 4 files changed, 50 insertions(+), 126 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ffede437b1..8cbdb3e8f6 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -668,9 +668,7 @@ func main() { } if cfgFile.StorageConfig.TSDBConfig != nil { cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow - cfg.tsdb.StaleSeriesCompactionInterval = time.Duration(cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionInterval) cfg.tsdb.StaleSeriesCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold - cfg.tsdb.StaleSeriesImmediateCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesImmediateCompactionThreshold } // Set Go runtime parameters before we get too far into initialization. @@ -1860,56 +1858,52 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) { // tsdbOptions is tsdb.Option version with defined units. // This is required as tsdb.Option fields are unit agnostic (time). type tsdbOptions struct { - WALSegmentSize units.Base2Bytes - MaxBlockChunkSegmentSize units.Base2Bytes - RetentionDuration model.Duration - MaxBytes units.Base2Bytes - NoLockfile bool - WALCompressionType compression.Type - HeadChunksWriteQueueSize int - SamplesPerChunk int - StripeSize int - MinBlockDuration model.Duration - MaxBlockDuration model.Duration - OutOfOrderTimeWindow int64 - EnableExemplarStorage bool - MaxExemplars int64 - EnableMemorySnapshotOnShutdown bool - EnableNativeHistograms bool - EnableDelayedCompaction bool - CompactionDelayMaxPercent int - EnableOverlappingCompaction bool - UseUncachedIO bool - StaleSeriesCompactionInterval time.Duration - StaleSeriesCompactionThreshold float64 - StaleSeriesImmediateCompactionThreshold float64 + WALSegmentSize units.Base2Bytes + MaxBlockChunkSegmentSize units.Base2Bytes + RetentionDuration model.Duration + MaxBytes units.Base2Bytes + NoLockfile bool + WALCompressionType compression.Type + HeadChunksWriteQueueSize int + SamplesPerChunk int + StripeSize int + MinBlockDuration model.Duration + MaxBlockDuration model.Duration + OutOfOrderTimeWindow int64 + EnableExemplarStorage bool + MaxExemplars int64 + EnableMemorySnapshotOnShutdown bool + EnableNativeHistograms bool + EnableDelayedCompaction bool + CompactionDelayMaxPercent int + EnableOverlappingCompaction bool + UseUncachedIO bool + StaleSeriesCompactionThreshold float64 } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { return tsdb.Options{ - WALSegmentSize: int(opts.WALSegmentSize), - MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize), - RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), - MaxBytes: int64(opts.MaxBytes), - NoLockfile: opts.NoLockfile, - WALCompression: opts.WALCompressionType, - HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, - SamplesPerChunk: opts.SamplesPerChunk, - StripeSize: opts.StripeSize, - MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), - MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), - EnableExemplarStorage: opts.EnableExemplarStorage, - MaxExemplars: opts.MaxExemplars, - EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown, - EnableNativeHistograms: opts.EnableNativeHistograms, - OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow, - EnableDelayedCompaction: opts.EnableDelayedCompaction, - CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, - EnableOverlappingCompaction: opts.EnableOverlappingCompaction, - UseUncachedIO: opts.UseUncachedIO, - StaleSeriesCompactionInterval: opts.StaleSeriesCompactionInterval, - StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold, - StaleSeriesImmediateCompactionThreshold: opts.StaleSeriesImmediateCompactionThreshold, + WALSegmentSize: int(opts.WALSegmentSize), + MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize), + RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), + MaxBytes: int64(opts.MaxBytes), + NoLockfile: opts.NoLockfile, + WALCompression: opts.WALCompressionType, + HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, + SamplesPerChunk: opts.SamplesPerChunk, + StripeSize: opts.StripeSize, + MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), + MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), + EnableExemplarStorage: opts.EnableExemplarStorage, + MaxExemplars: opts.MaxExemplars, + EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown, + EnableNativeHistograms: opts.EnableNativeHistograms, + OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow, + EnableDelayedCompaction: opts.EnableDelayedCompaction, + CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, + EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + UseUncachedIO: opts.UseUncachedIO, + StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold, } } diff --git a/config/config.go b/config/config.go index b1551c9840..0110a54c7a 100644 --- a/config/config.go +++ b/config/config.go @@ -1022,18 +1022,9 @@ type TSDBConfig struct { // This should not be used directly and must be converted into OutOfOrderTimeWindow. OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"` - // StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction - // if the number of stale series crosses the given threshold. - StaleSeriesCompactionInterval model.Duration `yaml:"stale_series_compaction_interval,omitempty"` - // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in - // the in-memory Head block. If the % of stale series crosses this threshold, stale series - // compaction will be run in the next stale series compaction interval. + // the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately. StaleSeriesCompactionThreshold float64 `yaml:"stale_series_compaction_threshold,omitempty"` - - // StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in - // the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately. - StaleSeriesImmediateCompactionThreshold float64 `yaml:"stale_series_immediate_compaction_threshold,omitempty"` } // UnmarshalYAML implements the yaml.Unmarshaler interface. diff --git a/tsdb/db.go b/tsdb/db.go index 85d7237daf..e4abcd728f 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -98,15 +98,9 @@ func DefaultOptions() *Options { // Options of the DB storage. type Options struct { - // staleSeriesCompactionInterval is same as below option with same name, but is atomic so that we can do live updates without locks. - // This is the one that must be used by the code. - staleSeriesCompactionInterval atomic.Int64 - // staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks. - // This is the one that must be used by the code. - staleSeriesCompactionThreshold atomic.Float64 // staleSeriesImmediateCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks. // This is the one that must be used by the code. - staleSeriesImmediateCompactionThreshold atomic.Float64 + staleSeriesCompactionThreshold atomic.Float64 // Segments (wal files) max size. // WALSegmentSize = 0, segment size is default size. @@ -233,18 +227,9 @@ type Options struct { // UseUncachedIO allows bypassing the page cache when appropriate. UseUncachedIO bool - // StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction - // if the number of stale series crosses the given threshold. - StaleSeriesCompactionInterval time.Duration - // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in - // the in-memory Head block. If the % of stale series crosses this threshold, stale series - // compaction will be run in the next stale series compaction interval. + // the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately. StaleSeriesCompactionThreshold float64 - - // StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in - // the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately. - StaleSeriesImmediateCompactionThreshold float64 } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -842,9 +827,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3) } - opts.staleSeriesCompactionInterval.Store(int64(opts.StaleSeriesCompactionInterval)) opts.staleSeriesCompactionThreshold.Store(opts.StaleSeriesCompactionThreshold) - opts.staleSeriesImmediateCompactionThreshold.Store(opts.StaleSeriesImmediateCompactionThreshold) return opts, rngs } @@ -1117,15 +1100,6 @@ func (db *DB) run(ctx context.Context) { backoff := time.Duration(0) - staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load()) - nextStaleSeriesCompactionTime := nextStepAlignedTime(staleSeriesCompactionInterval) - timedStaleSeriesCompactionActive := true - if staleSeriesCompactionInterval <= 0 { - // Far enough so that we don't schedule a stale series compaction. - timedStaleSeriesCompactionActive = false - nextStaleSeriesCompactionTime = time.Now().Add(365 * 24 * time.Hour) - } - for { select { case <-db.stopc: @@ -1133,11 +1107,6 @@ func (db *DB) run(ctx context.Context) { case <-time.After(backoff): } - staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime) - if staleSeriesWaitDur < 0 { - staleSeriesWaitDur = 0 - } - select { case <-time.After(1 * time.Minute): db.cmtx.Lock() @@ -1149,8 +1118,8 @@ func (db *DB) run(ctx context.Context) { // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) - if db.autoCompact && db.opts.staleSeriesImmediateCompactionThreshold.Load() > 0 && - staleSeriesRatio >= db.opts.staleSeriesImmediateCompactionThreshold.Load() { + if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 && + staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() { if err := db.CompactStaleHead(); err != nil { db.logger.Error("immediate stale series compaction failed", "err", err) } @@ -1163,13 +1132,6 @@ func (db *DB) run(ctx context.Context) { // We attempt mmapping of head chunks regularly. db.head.mmapHeadChunks() - staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load()) - if !timedStaleSeriesCompactionActive && staleSeriesCompactionInterval > 0 { - // The config was updated in realtime. - timedStaleSeriesCompactionActive = true - nextStaleSeriesCompactionTime = nextStepAlignedTime(staleSeriesCompactionInterval) - } - case <-db.compactc: db.metrics.compactionsTriggered.Inc() @@ -1185,27 +1147,6 @@ func (db *DB) run(ctx context.Context) { db.metrics.compactionsSkipped.Inc() } db.autoCompactMtx.Unlock() - case <-time.After(staleSeriesWaitDur): - staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load()) - if staleSeriesCompactionInterval <= 0 { - // The config was updated in realtime. - // Far enough so that we don't schedule a stale series compaction. - timedStaleSeriesCompactionActive = false - nextStaleSeriesCompactionTime = time.Now().Add(365 * 24 * time.Hour) - continue - } - - // TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon. - numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() - staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) - if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 && - staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() { - if err := db.CompactStaleHead(); err != nil { - db.logger.Error("scheduled stale series compaction failed", "err", err) - } - } - - nextStaleSeriesCompactionTime = nextStepAlignedTime(db.opts.StaleSeriesCompactionInterval) case <-db.stopc: return @@ -1247,13 +1188,9 @@ func (db *DB) ApplyConfig(conf *config.Config) error { oooTimeWindow := int64(0) if conf.StorageConfig.TSDBConfig != nil { oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow - db.opts.staleSeriesCompactionInterval.Store(int64(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionInterval)) db.opts.staleSeriesCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold) - db.opts.staleSeriesImmediateCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesImmediateCompactionThreshold) } else { - db.opts.staleSeriesCompactionInterval.Store(0) db.opts.staleSeriesCompactionThreshold.Store(0) - db.opts.staleSeriesImmediateCompactionThreshold.Store(0) } if oooTimeWindow < 0 { oooTimeWindow = 0 @@ -1592,6 +1529,7 @@ func (db *DB) CompactStaleHead() error { defer db.cmtx.Unlock() db.logger.Info("Starting stale series compaction") + start := time.Now() staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background()) if err != nil { @@ -1626,7 +1564,7 @@ func (db *DB) CompactStaleHead() error { } db.head.RebuildSymbolTable(db.logger) - db.logger.Info("Ending stale series compaction") + db.logger.Info("Ending stale series compaction", "duration", time.Since(start)) return nil } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ed77f7d3ef..a9429f9620 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -308,6 +308,7 @@ Outer: } h.wlReplaySamplesPool.Put(v) case []tombstones.Stone: + // TODO: what if the ref doesnt match a series directly? Check about multiref! // Tombstone records will be fairly rare, so not trying to optimise the allocations here. deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency) for _, s := range v {