feat(tsdb/(head|agent)): dereference the pools at the end of the WL replay to

not wait for an extra GC cycle until the built-in cleanup mechanism
kicks in

See https://github.com/prometheus/prometheus/pull/15778

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
machine424 2025-03-26 17:46:43 +01:00 committed by Ayoub Mrini
parent c113475121
commit a825d448da
2 changed files with 24 additions and 2 deletions

View File

@ -236,7 +236,8 @@ type DB struct {
appenderPool sync.Pool appenderPool sync.Pool
bufPool sync.Pool bufPool sync.Pool
// These pools are used during WAL replay. // These pools are only used during WAL replay and are reset at the end.
// NOTE: Adjust resetWALReplayResources() upon changes to the pools.
walReplaySeriesPool zeropool.Pool[[]record.RefSeries] walReplaySeriesPool zeropool.Pool[[]record.RefSeries]
walReplaySamplesPool zeropool.Pool[[]record.RefSample] walReplaySamplesPool zeropool.Pool[[]record.RefSample]
walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample] walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
@ -366,6 +367,7 @@ func validateOptions(opts *Options) *Options {
func (db *DB) replayWAL() error { func (db *DB) replayWAL() error {
db.logger.Info("replaying WAL, this may take a while", "dir", db.wal.Dir()) db.logger.Info("replaying WAL, this may take a while", "dir", db.wal.Dir())
defer db.resetWALReplayResources()
start := time.Now() start := time.Now()
dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir()) dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir())
@ -425,6 +427,13 @@ func (db *DB) replayWAL() error {
return nil return nil
} }
func (db *DB) resetWALReplayResources() {
db.walReplaySeriesPool = zeropool.Pool[[]record.RefSeries]{}
db.walReplaySamplesPool = zeropool.Pool[[]record.RefSample]{}
db.walReplayHistogramsPool = zeropool.Pool[[]record.RefHistogramSample]{}
db.walReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{}
}
func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
var ( var (
syms = labels.NewSymbolTable() // One table for the whole WAL. syms = labels.NewSymbolTable() // One table for the whole WAL.

View File

@ -93,7 +93,8 @@ type Head struct {
bytesPool zeropool.Pool[[]byte] bytesPool zeropool.Pool[[]byte]
memChunkPool sync.Pool memChunkPool sync.Pool
// These pools are used during WAL/WBL replay. // These pools are only used during WAL/WBL replay and are reset at the end.
// NOTE: Adjust resetWLReplayResources() upon changes to the pools.
wlReplaySeriesPool zeropool.Pool[[]record.RefSeries] wlReplaySeriesPool zeropool.Pool[[]record.RefSeries]
wlReplaySamplesPool zeropool.Pool[[]record.RefSample] wlReplaySamplesPool zeropool.Pool[[]record.RefSample]
wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone] wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone]
@ -345,6 +346,17 @@ func (h *Head) resetInMemoryState() error {
return nil return nil
} }
func (h *Head) resetWLReplayResources() {
h.wlReplaySeriesPool = zeropool.Pool[[]record.RefSeries]{}
h.wlReplaySamplesPool = zeropool.Pool[[]record.RefSample]{}
h.wlReplaytStonesPool = zeropool.Pool[[]tombstones.Stone]{}
h.wlReplayExemplarsPool = zeropool.Pool[[]record.RefExemplar]{}
h.wlReplayHistogramsPool = zeropool.Pool[[]record.RefHistogramSample]{}
h.wlReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{}
h.wlReplayMetadataPool = zeropool.Pool[[]record.RefMetadata]{}
h.wlReplayMmapMarkersPool = zeropool.Pool[[]record.RefMmapMarker]{}
}
type headMetrics struct { type headMetrics struct {
activeAppenders prometheus.Gauge activeAppenders prometheus.Gauge
series prometheus.GaugeFunc series prometheus.GaugeFunc
@ -629,6 +641,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// limits the ingested samples to the head min valid time. // limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error { func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime) h.minValidTime.Store(minValidTime)
defer h.resetWLReplayResources()
defer func() { defer func() {
h.postings.EnsureOrder(h.opts.WALReplayConcurrency) h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
}() }()