From 218558f5431980750a969e7acf0cc7a9fcd0e879 Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Thu, 7 Aug 2025 16:11:32 -0400 Subject: [PATCH 1/7] Store mint rather than the last WAL segment in head.walExpiries during head GC Signed-off-by: Patryk Prus
--- tsdb/agent/db.go | 6 +++-- tsdb/db_test.go | 4 ++-- tsdb/head.go | 34 +++++++++++++--------------- tsdb/head_test.go | 43 +++++++++++++++++------------------- tsdb/head_wal.go | 2 +- tsdb/wlog/checkpoint.go | 6 ++--- tsdb/wlog/checkpoint_test.go | 2 +- tsdb/wlog/watcher_test.go | 6 ++--- 8 files changed, 49 insertions(+), 54 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 158bedc31b..f757e42132 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -633,7 +633,7 @@ Loop: // keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint // last is the last WAL segment that was considered for checkpointing. -func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { +func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int64) bool { // Keep the record if the series exists in the db. if db.series.GetByID(id) != nil { return true @@ -641,9 +641,11 @@ func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool // Keep the record if the series was recently deleted. seg, ok := db.deleted[id] - return ok && seg > last + return ok && int(seg) > int(last) } +// !!!! TODO -- this likely breaks?? + func (db *DB) truncate(mint int64) error { db.mtx.RLock() defer db.mtx.RUnlock() diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 3a31c70a37..b4acf6afa0 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1552,7 +1552,7 @@ func TestSizeRetention(t *testing.T) { // Create a WAL checkpoint, and compare sizes. first, last, err := wlog.Segments(db.Head().wal.Dir()) require.NoError(t, err) - _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef, int) bool { return false }, 0) + _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef, int64) bool { return false }, 0) require.NoError(t, err) blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err = db.Head().wal.Size() @@ -4721,7 +4721,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { // Let's create a checkpoint. first, last, err := wlog.Segments(w.Dir()) require.NoError(t, err) - keep := func(id chunks.HeadSeriesRef, _ int) bool { + keep := func(id chunks.HeadSeriesRef, _ int64) bool { return id != 3 } _, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0) diff --git a/tsdb/head.go b/tsdb/head.go index 8834fd31a6..c710ae8e71 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -108,7 +108,7 @@ type Head struct { series *stripeSeries walExpiriesMtx sync.Mutex - walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until. + walExpiries map[chunks.HeadSeriesRef]int64 // Series no longer in the head, and what time they must be kept until. // TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings. postings *index.MemPostings // Postings lists for terms. @@ -335,7 +335,7 @@ func (h *Head) resetInMemoryState() error { h.exemplars = es h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() - h.walExpiries = map[chunks.HeadSeriesRef]int{} + h.walExpiries = map[chunks.HeadSeriesRef]int64{} h.chunkRange.Store(h.opts.ChunkRange) h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) @@ -1272,7 +1272,7 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) return false, false, 0 } -func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) { +func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int64, bool) { h.walExpiriesMtx.Lock() defer h.walExpiriesMtx.Unlock() @@ -1280,7 +1280,7 @@ func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) { return keepUntil, ok } -func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) { +func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) { h.walExpiriesMtx.Lock() defer h.walExpiriesMtx.Unlock() @@ -1288,8 +1288,8 @@ func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) { } // keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint -// last is the last WAL segment that was considered for checkpointing. -func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { +// mint is the time before which data in the WAL is being truncated. +func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, mint int64) bool { // Keep the record if the series exists in the head. if h.series.getByID(id) != nil { return true @@ -1297,7 +1297,7 @@ func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool // Keep the record if the series has an expiry set. keepUntil, ok := h.getWALExpiry(id) - return ok && keepUntil > last + return ok && keepUntil >= mint } // truncateWAL removes old data before mint from the WAL. @@ -1349,11 +1349,10 @@ func (h *Head) truncateWAL(mint int64) error { h.logger.Error("truncating segments failed", "err", err) } - // The checkpoint is written and segments before it is truncated, so stop - // tracking expired series. + // The checkpoint is written and data before mint is truncated, so stop tracking expired series. h.walExpiriesMtx.Lock() - for ref, segment := range h.walExpiries { - if segment <= last { + for ref, keepUntil := range h.walExpiries { + if keepUntil < mint { delete(h.walExpiries, ref) } } @@ -1623,16 +1622,13 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { h.tombstones.TruncateBefore(mint) if h.wal != nil { - _, last, _ := wlog.Segments(h.wal.Dir()) h.walExpiriesMtx.Lock() - // Keep series records until we're past segment 'last' - // because the WAL will still have samples records with - // this ref ID. If we didn't keep these series records then - // on start up when we replay the WAL, or any other code - // that reads the WAL, wouldn't be able to use those - // samples since we would have no labels for that ref ID. + // Samples for deleted series are likely still in the WAL, so flag that the deleted series records should be kept during + // WAL checkpointing while the WAL contains data through actualInOrderMint. + // If we didn't keep these series records then on start up when we replay the WAL, or any other code that reads the WAL, + // wouldn't be able to use those samples since we would have no labels for that ref ID. for ref := range deleted { - h.walExpiries[chunks.HeadSeriesRef(ref)] = last + h.walExpiries[chunks.HeadSeriesRef(ref)] = actualInOrderMint } h.walExpiriesMtx.Unlock() } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index a44eadc365..d3e544b69e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -889,14 +889,13 @@ func TestHead_WALMultiRef(t *testing.T) { func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { existingRef := 1 existingLbls := labels.FromStrings("foo", "bar") - deletedKeepUntil := 10 + keepUntil := int64(10) cases := []struct { - name string - prepare func(t *testing.T, h *Head) - seriesRef chunks.HeadSeriesRef - last int - expected bool + name string + prepare func(t *testing.T, h *Head) + mint int64 + expected bool }{ { name: "keep series still in the head", @@ -904,26 +903,22 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false) require.NoError(t, err) }, - seriesRef: chunks.HeadSeriesRef(existingRef), - expected: true, + expected: true, }, { - name: "keep deleted series with keepUntil > last", - prepare: func(_ *testing.T, h *Head) { - h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) - }, - seriesRef: chunks.HeadSeriesRef(existingRef), - last: deletedKeepUntil - 1, - expected: true, + name: "keep series with keepUntil > mint", + mint: keepUntil - 1, + expected: true, }, { - name: "drop deleted series with keepUntil <= last", - prepare: func(_ *testing.T, h *Head) { - h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) - }, - seriesRef: chunks.HeadSeriesRef(existingRef), - last: deletedKeepUntil, - expected: false, + name: "keep series with keepUntil = mint", + mint: keepUntil, + expected: true, + }, + { + name: "drop series with keepUntil < mint", + mint: keepUntil + 1, + expected: false, }, } @@ -936,9 +931,11 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { if tc.prepare != nil { tc.prepare(t, h) + } else { + h.setWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil) } - kept := h.keepSeriesInWALCheckpoint(tc.seriesRef, tc.last) + kept := h.keepSeriesInWALCheckpoint(chunks.HeadSeriesRef(existingRef), tc.mint) require.Equal(t, tc.expected, kept) }) } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ee6557fdad..9d96882b57 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -266,7 +266,7 @@ Outer: if !created { multiRef[walSeries.Ref] = mSeries.ref // Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints. - h.setWALExpiry(walSeries.Ref, lastSegment) + h.setWALExpiry(walSeries.Ref, int64(lastSegment)) } idx := uint64(mSeries.ref) % uint64(concurrency) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 2c1b0c0534..775096f1ce 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -93,7 +93,7 @@ const CheckpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, last int) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, mint int64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser @@ -181,7 +181,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Drop irrelevant series in place. repl := series[:0] for _, s := range series { - if keep(s.Ref, to) { + if keep(s.Ref, mint) { repl = append(repl, s) } } @@ -323,7 +323,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Only keep reference to the latest found metadata for each refID. repl := 0 for _, m := range metadata { - if keep(m.Ref, to) { + if keep(m.Ref, mint) { if _, ok := latestMetadataMap[m.Ref]; !ok { repl++ } diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 7a0687aedc..839673343a 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -291,7 +291,7 @@ func TestCheckpoint(t *testing.T) { } require.NoError(t, w.Close()) - stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef, _ int) bool { + stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef, _ int64) bool { return x%2 == 0 }, last/2) require.NoError(t, err) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 5ea4ee151d..21265fe309 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -400,7 +400,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { } } - Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef, int) bool { return true }, 0) + Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef, int64) bool { return true }, 0) w.Truncate(1) // Write more records after checkpointing. @@ -491,7 +491,7 @@ func TestReadCheckpoint(t *testing.T) { } _, err = w.NextSegmentSync() require.NoError(t, err) - _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef, int) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef, int64) bool { return true }, 0) require.NoError(t, err) require.NoError(t, w.Truncate(32)) @@ -654,7 +654,7 @@ func TestCheckpointSeriesReset(t *testing.T) { return wt.checkNumSeries() == seriesCount }, 10*time.Second, 1*time.Second) - _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(chunks.HeadSeriesRef, int) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(chunks.HeadSeriesRef, int64) bool { return true }, 0) require.NoError(t, err) err = w.Truncate(5) From 687502287389791c8b2f9ebdf20fe8150aaa6a3d Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Fri, 8 Aug 2025 12:52:34 -0400 Subject: [PATCH 2/7] Update head.walExpiries with record timestamps during WAL replay Signed-off-by: Patryk Prus
--- tsdb/head.go | 5 +- tsdb/head_test.go | 254 +++++++++++++++++++++++++++++++++++++++++++++- tsdb/head_wal.go | 12 ++- 3 files changed, 266 insertions(+), 5 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index c710ae8e71..a533b529db 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1280,11 +1280,12 @@ func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int64, bool) { return keepUntil, ok } -func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) { +// updateWALExpiry updates the WAL expiry for a series, keeping the higher of the current value and keepUntil. +func (h *Head) updateWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) { h.walExpiriesMtx.Lock() defer h.walExpiriesMtx.Unlock() - h.walExpiries[id] = keepUntil + h.walExpiries[id] = max(keepUntil, h.walExpiries[id]) } // keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint diff --git a/tsdb/head_test.go b/tsdb/head_test.go index d3e544b69e..934378f844 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -162,6 +162,10 @@ func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}, buf []byte) [] buf = enc.Tombstones(v, buf) case []record.RefExemplar: buf = enc.Exemplars(v, buf) + case []record.RefHistogramSample: + buf, _ = enc.HistogramSamples(v, buf) + case []record.RefFloatHistogramSample: + buf, _ = enc.FloatHistogramSamples(v, buf) case []record.RefMmapMarker: buf = enc.MmapMarkers(v, buf) case []record.RefMetadata: @@ -886,6 +890,254 @@ func TestHead_WALMultiRef(t *testing.T) { }}, series) } +func TestHead_WALCheckpointMultiRef(t *testing.T) { + + cases := []struct { + name string + walEntries []interface{} + expectedWalExpiry int64 + walTruncateMinT int64 + expectedWalEntries []interface{} + }{ + { + name: "Samples only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 1, T: 100, V: 1}, + {Ref: 2, T: 200, V: 2}, + {Ref: 2, T: 500, V: 3}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 3}, + }, + }, + }, + { + name: "Tombstones only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []tombstones.Stone{ + {Ref: 1, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 100}}}, + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 200}}}, + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + }, + }, + { + name: "Exemplars only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 1, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")}, + {Ref: 2, T: 200, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + {Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + }, + { + name: "Histograms only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: 100, H: &histogram.Histogram{}}, + {Ref: 2, T: 200, H: &histogram.Histogram{}}, + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + }, + }, + { + name: "Float histograms only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: 100, FH: &histogram.FloatHistogram{}}, + {Ref: 2, T: 200, FH: &histogram.FloatHistogram{}}, + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + }, + { + name: "All record types; keep needed duplicate series record until last record", + // Series with 2 refs and samples for both + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 3}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 800, + walTruncateMinT: 700, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + }, + { + name: "All record types; drop expired duplicate series record", + // Series with 2 refs and samples for both + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 2}, + {Ref: 1, T: 900, V: 3}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 750}}}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 600, H: &histogram.Histogram{}}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 700, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 800, + walTruncateMinT: 900, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 1, T: 900, V: 3}, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + h, w := newTestHead(t, 1000, compression.None, false) + t.Cleanup(func() { + require.NoError(t, h.Close()) + }) + + populateTestWL(t, w, tc.walEntries, nil) + first, _, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + + require.NoError(t, h.Init(0)) + + keepUntil, ok := h.getWALExpiry(2) + require.True(t, ok) + require.Equal(t, tc.expectedWalExpiry, keepUntil) + + // Each truncation creates a new segment, so attempt truncations until a checkpoint is created + for { + h.lastWALTruncationTime.Store(0) // Reset so that it's always time to truncate the WAL + err := h.truncateWAL(tc.walTruncateMinT) + require.NoError(t, err) + f, _, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + if f > first { + break + } + } + + // Read test WAL , checkpoint first + checkpointDir, _, err := wlog.LastCheckpoint(w.Dir()) + require.NoError(t, err) + cprecs := readTestWAL(t, checkpointDir) + recs := readTestWAL(t, w.Dir()) + recs = append(cprecs, recs...) + require.Equal(t, tc.expectedWalEntries, recs) + }) + } +} + func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { existingRef := 1 existingLbls := labels.FromStrings("foo", "bar") @@ -932,7 +1184,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { if tc.prepare != nil { tc.prepare(t, h) } else { - h.setWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil) + h.updateWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil) } kept := h.keepSeriesInWALCheckpoint(chunks.HeadSeriesRef(existingRef), tc.mint) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 9d96882b57..f648abe7ea 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -265,8 +265,6 @@ Outer: } if !created { multiRef[walSeries.Ref] = mSeries.ref - // Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints. - h.setWALExpiry(walSeries.Ref, int64(lastSegment)) } idx := uint64(mSeries.ref) % uint64(concurrency) @@ -292,6 +290,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) @@ -313,6 +313,8 @@ Outer: continue } if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok { + // This is a tombstone for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(chunks.HeadSeriesRef(s.Ref), itv.Maxt) s.Ref = storage.SeriesRef(r) } if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil { @@ -330,6 +332,8 @@ Outer: continue } if r, ok := multiRef[e.Ref]; ok { + // This is an exemplar for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(e.Ref, e.T) e.Ref = r } exemplarsInput <- e @@ -354,6 +358,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) @@ -387,6 +393,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a float histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) From 0fea41ed53901e79ac260bb6222af607d95373e6 Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Fri, 8 Aug 2025 13:06:37 -0400 Subject: [PATCH 3/7] Refactor keep function to work for both agent and non-agent implementations Signed-off-by: Patryk Prus
--- tsdb/agent/db.go | 24 ++++++++++++------------ tsdb/db_test.go | 4 ++-- tsdb/head.go | 22 ++++++++++++---------- tsdb/head_test.go | 4 ++-- tsdb/wlog/checkpoint.go | 6 +++--- tsdb/wlog/checkpoint_test.go | 2 +- tsdb/wlog/watcher_test.go | 6 +++--- 7 files changed, 35 insertions(+), 33 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index f757e42132..1df0d508ef 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -631,21 +631,21 @@ Loop: } } -// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint +// keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint. // last is the last WAL segment that was considered for checkpointing. -func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int64) bool { - // Keep the record if the series exists in the db. - if db.series.GetByID(id) != nil { - return true +func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef) bool { + return func(id chunks.HeadSeriesRef) bool { + // Keep the record if the series exists in the db. + if db.series.GetByID(id) != nil { + return true + } + + // Keep the record if the series was recently deleted. + seg, ok := db.deleted[id] + return ok && seg > last } - - // Keep the record if the series was recently deleted. - seg, ok := db.deleted[id] - return ok && int(seg) > int(last) } -// !!!! TODO -- this likely breaks?? - func (db *DB) truncate(mint int64) error { db.mtx.RLock() defer db.mtx.RUnlock() @@ -680,7 +680,7 @@ func (db *DB) truncate(mint int64) error { db.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpoint, mint); err != nil { + if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint); err != nil { db.metrics.checkpointCreationFail.Inc() var cerr *wlog.CorruptionErr if errors.As(err, &cerr) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index b4acf6afa0..1267700946 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1552,7 +1552,7 @@ func TestSizeRetention(t *testing.T) { // Create a WAL checkpoint, and compare sizes. first, last, err := wlog.Segments(db.Head().wal.Dir()) require.NoError(t, err) - _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef, int64) bool { return false }, 0) + _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef) bool { return false }, 0) require.NoError(t, err) blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err = db.Head().wal.Size() @@ -4721,7 +4721,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { // Let's create a checkpoint. first, last, err := wlog.Segments(w.Dir()) require.NoError(t, err) - keep := func(id chunks.HeadSeriesRef, _ int64) bool { + keep := func(id chunks.HeadSeriesRef) bool { return id != 3 } _, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0) diff --git a/tsdb/head.go b/tsdb/head.go index a533b529db..1377bb83be 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1288,17 +1288,19 @@ func (h *Head) updateWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) { h.walExpiries[id] = max(keepUntil, h.walExpiries[id]) } -// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint +// keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint. // mint is the time before which data in the WAL is being truncated. -func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, mint int64) bool { - // Keep the record if the series exists in the head. - if h.series.getByID(id) != nil { - return true - } +func (h *Head) keepSeriesInWALCheckpointFn(mint int64) func(id chunks.HeadSeriesRef) bool { + return func(id chunks.HeadSeriesRef) bool { + // Keep the record if the series exists in the head. + if h.series.getByID(id) != nil { + return true + } - // Keep the record if the series has an expiry set. - keepUntil, ok := h.getWALExpiry(id) - return ok && keepUntil >= mint + // Keep the record if the series has an expiry set. + keepUntil, ok := h.getWALExpiry(id) + return ok && keepUntil >= mint + } } // truncateWAL removes old data before mint from the WAL. @@ -1335,7 +1337,7 @@ func (h *Head) truncateWAL(mint int64) error { } h.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpoint, mint); err != nil { + if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpointFn(mint), mint); err != nil { h.metrics.checkpointCreationFail.Inc() var cerr *chunks.CorruptionErr if errors.As(err, &cerr) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 934378f844..97a5a0cce8 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1187,8 +1187,8 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { h.updateWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil) } - kept := h.keepSeriesInWALCheckpoint(chunks.HeadSeriesRef(existingRef), tc.mint) - require.Equal(t, tc.expected, kept) + keep := h.keepSeriesInWALCheckpointFn(tc.mint) + require.Equal(t, tc.expected, keep(chunks.HeadSeriesRef(existingRef))) }) } } diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 775096f1ce..5c607d7030 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -93,7 +93,7 @@ const CheckpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, mint int64) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser @@ -181,7 +181,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Drop irrelevant series in place. repl := series[:0] for _, s := range series { - if keep(s.Ref, mint) { + if keep(s.Ref) { repl = append(repl, s) } } @@ -323,7 +323,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Only keep reference to the latest found metadata for each refID. repl := 0 for _, m := range metadata { - if keep(m.Ref, mint) { + if keep(m.Ref) { if _, ok := latestMetadataMap[m.Ref]; !ok { repl++ } diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 839673343a..1c1b44da29 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -291,7 +291,7 @@ func TestCheckpoint(t *testing.T) { } require.NoError(t, w.Close()) - stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef, _ int64) bool { + stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool { return x%2 == 0 }, last/2) require.NoError(t, err) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 21265fe309..947e297e2a 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -400,7 +400,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { } } - Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef, int64) bool { return true }, 0) + Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef) bool { return true }, 0) w.Truncate(1) // Write more records after checkpointing. @@ -491,7 +491,7 @@ func TestReadCheckpoint(t *testing.T) { } _, err = w.NextSegmentSync() require.NoError(t, err) - _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef, int64) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef) bool { return true }, 0) require.NoError(t, err) require.NoError(t, w.Truncate(32)) @@ -654,7 +654,7 @@ func TestCheckpointSeriesReset(t *testing.T) { return wt.checkNumSeries() == seriesCount }, 10*time.Second, 1*time.Second) - _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(chunks.HeadSeriesRef, int64) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(chunks.HeadSeriesRef) bool { return true }, 0) require.NoError(t, err) err = w.Truncate(5) From 5cb0192626f750044cce179b40e279eab12eeb26 Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Fri, 8 Aug 2025 14:25:14 -0400 Subject: [PATCH 4/7] Address linter errors Signed-off-by: Patryk Prus
--- tsdb/head.go | 4 ++-- tsdb/head_test.go | 1 - tsdb/head_wal.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 1377bb83be..cd2972dda1 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -781,7 +781,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil { + if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil { return fmt.Errorf("backfill checkpoint: %w", err) } h.updateWALReplayStatusRead(startFrom) @@ -815,7 +815,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return fmt.Errorf("segment reader (offset=%d): %w", offset, err) } - err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt) + err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks) if err := sr.Close(); err != nil { h.logger.Warn("Error while closing the wal segments reader", "err", err) } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 97a5a0cce8..ffe62de059 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -891,7 +891,6 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_WALCheckpointMultiRef(t *testing.T) { - cases := []struct { name string walEntries []interface{} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index f648abe7ea..1d1fd89b15 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -75,7 +75,7 @@ func counterAddNonZero(v *prometheus.CounterVec, value float64, lvs ...string) { } } -func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) { +func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { // Track number of missing series records that were referenced by other records. unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}} // Track number of different records that referenced a series we don't know about From ead6dc32b9c8bc56fc2114e919b379f40e20bb63 Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Fri, 8 Aug 2025 14:34:56 -0400 Subject: [PATCH 5/7] Fix test Signed-off-by: Patryk Prus
--- tsdb/head_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ffe62de059..56808b9807 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -770,9 +770,7 @@ func TestHead_ReadWAL(t *testing.T) { // But it should have a WAL expiry set. keepUntil, ok := head.getWALExpiry(101) require.True(t, ok) - _, last, err := wlog.Segments(w.Dir()) - require.NoError(t, err) - require.Equal(t, last, keepUntil) + require.Equal(t, int64(101), keepUntil) // Only the duplicate series record should have a WAL expiry set. _, ok = head.getWALExpiry(50) require.False(t, ok) From 676f7665fa9cd0030ea8fd40a97a9f84a024ad7d Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Fri, 8 Aug 2025 14:52:03 -0400 Subject: [PATCH 6/7] Use testutil.RequireEqual to handle dedupelabels in test Signed-off-by: Patryk Prus
--- tsdb/head_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 56808b9807..c050190330 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1130,7 +1130,9 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) { cprecs := readTestWAL(t, checkpointDir) recs := readTestWAL(t, w.Dir()) recs = append(cprecs, recs...) - require.Equal(t, tc.expectedWalEntries, recs) + + // Use testutil.RequireEqual which handles labels properly with dedupelabels + testutil.RequireEqual(t, tc.expectedWalEntries, recs) }) } } From bbc9e47e42be783d673a7fa282ffde76cb25893f Mon Sep 17 00:00:00 2001 From: Patryk Prus
Date: Tue, 19 Aug 2025 18:33:52 -0400 Subject: [PATCH 7/7] Add comment about differences between agent mode and regular Prometheus Signed-off-by: Patryk Prus
--- tsdb/agent/db.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 1df0d508ef..4e53b6168b 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -633,6 +633,7 @@ Loop: // keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint. // last is the last WAL segment that was considered for checkpointing. +// NOTE: the agent implementation here is different from the Prometheus implementation, in that it uses WAL segment numbers instead of timestamps. func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef) bool { return func(id chunks.HeadSeriesRef) bool { // Keep the record if the series exists in the db.