From a825d448daf16b4683fe0f97a67f758ddafc3b70 Mon Sep 17 00:00:00 2001 From: machine424 Date: Wed, 26 Mar 2025 17:46:43 +0100 Subject: [PATCH] feat(tsdb/(head|agent)): dereference the pools at the end of the WL replay to not wait for an extra GC cycle until the built-in cleanup mechanism kicks in See https://github.com/prometheus/prometheus/pull/15778 Signed-off-by: machine424 --- tsdb/agent/db.go | 11 ++++++++++- tsdb/head.go | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 99126d16f1..cd5f531870 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -236,7 +236,8 @@ type DB struct { appenderPool sync.Pool bufPool sync.Pool - // These pools are used during WAL replay. + // These pools are only used during WAL replay and are reset at the end. + // NOTE: Adjust resetWALReplayResources() upon changes to the pools. walReplaySeriesPool zeropool.Pool[[]record.RefSeries] walReplaySamplesPool zeropool.Pool[[]record.RefSample] walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample] @@ -366,6 +367,7 @@ func validateOptions(opts *Options) *Options { func (db *DB) replayWAL() error { db.logger.Info("replaying WAL, this may take a while", "dir", db.wal.Dir()) + defer db.resetWALReplayResources() start := time.Now() dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir()) @@ -425,6 +427,13 @@ func (db *DB) replayWAL() error { return nil } +func (db *DB) resetWALReplayResources() { + db.walReplaySeriesPool = zeropool.Pool[[]record.RefSeries]{} + db.walReplaySamplesPool = zeropool.Pool[[]record.RefSample]{} + db.walReplayHistogramsPool = zeropool.Pool[[]record.RefHistogramSample]{} + db.walReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{} +} + func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { var ( syms = labels.NewSymbolTable() // One table for the whole WAL. diff --git a/tsdb/head.go b/tsdb/head.go index 3835970ca3..10fbfd0950 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -93,7 +93,8 @@ type Head struct { bytesPool zeropool.Pool[[]byte] memChunkPool sync.Pool - // These pools are used during WAL/WBL replay. + // These pools are only used during WAL/WBL replay and are reset at the end. + // NOTE: Adjust resetWLReplayResources() upon changes to the pools. wlReplaySeriesPool zeropool.Pool[[]record.RefSeries] wlReplaySamplesPool zeropool.Pool[[]record.RefSample] wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone] @@ -345,6 +346,17 @@ func (h *Head) resetInMemoryState() error { return nil } +func (h *Head) resetWLReplayResources() { + h.wlReplaySeriesPool = zeropool.Pool[[]record.RefSeries]{} + h.wlReplaySamplesPool = zeropool.Pool[[]record.RefSample]{} + h.wlReplaytStonesPool = zeropool.Pool[[]tombstones.Stone]{} + h.wlReplayExemplarsPool = zeropool.Pool[[]record.RefExemplar]{} + h.wlReplayHistogramsPool = zeropool.Pool[[]record.RefHistogramSample]{} + h.wlReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{} + h.wlReplayMetadataPool = zeropool.Pool[[]record.RefMetadata]{} + h.wlReplayMmapMarkersPool = zeropool.Pool[[]record.RefMmapMarker]{} +} + type headMetrics struct { activeAppenders prometheus.Gauge series prometheus.GaugeFunc @@ -629,6 +641,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second // limits the ingested samples to the head min valid time. func (h *Head) Init(minValidTime int64) error { h.minValidTime.Store(minValidTime) + defer h.resetWLReplayResources() defer func() { h.postings.EnsureOrder(h.opts.WALReplayConcurrency) }()