diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 158bedc31b..4e53b6168b 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -631,17 +631,20 @@ Loop: } } -// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint +// keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint. // last is the last WAL segment that was considered for checkpointing. -func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { - // Keep the record if the series exists in the db. - if db.series.GetByID(id) != nil { - return true - } +// NOTE: the agent implementation here is different from the Prometheus implementation, in that it uses WAL segment numbers instead of timestamps. +func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef) bool { + return func(id chunks.HeadSeriesRef) bool { + // Keep the record if the series exists in the db. + if db.series.GetByID(id) != nil { + return true + } - // Keep the record if the series was recently deleted. - seg, ok := db.deleted[id] - return ok && seg > last + // Keep the record if the series was recently deleted. + seg, ok := db.deleted[id] + return ok && seg > last + } } func (db *DB) truncate(mint int64) error { @@ -678,7 +681,7 @@ func (db *DB) truncate(mint int64) error { db.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpoint, mint); err != nil { + if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint); err != nil { db.metrics.checkpointCreationFail.Inc() var cerr *wlog.CorruptionErr if errors.As(err, &cerr) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 1b492a695e..87b3619705 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1557,7 +1557,7 @@ func TestSizeRetention(t *testing.T) { // Create a WAL checkpoint, and compare sizes. first, last, err := wlog.Segments(db.Head().wal.Dir()) require.NoError(t, err) - _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef, int) bool { return false }, 0) + _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef) bool { return false }, 0) require.NoError(t, err) blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err = db.Head().wal.Size() @@ -4738,7 +4738,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) { // Let's create a checkpoint. first, last, err := wlog.Segments(w.Dir()) require.NoError(t, err) - keep := func(id chunks.HeadSeriesRef, _ int) bool { + keep := func(id chunks.HeadSeriesRef) bool { return id != 3 } _, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0) diff --git a/tsdb/head.go b/tsdb/head.go index 20ab6e7731..cd0f771f96 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -110,7 +110,7 @@ type Head struct { series *stripeSeries walExpiriesMtx sync.Mutex - walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until. + walExpiries map[chunks.HeadSeriesRef]int64 // Series no longer in the head, and what time they must be kept until. // TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings. postings *index.MemPostings // Postings lists for terms. @@ -337,7 +337,7 @@ func (h *Head) resetInMemoryState() error { h.exemplars = es h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() - h.walExpiries = map[chunks.HeadSeriesRef]int{} + h.walExpiries = map[chunks.HeadSeriesRef]int64{} h.chunkRange.Store(h.opts.ChunkRange) h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) @@ -791,7 +791,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil { + if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil { return fmt.Errorf("backfill checkpoint: %w", err) } h.updateWALReplayStatusRead(startFrom) @@ -825,7 +825,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return fmt.Errorf("segment reader (offset=%d): %w", offset, err) } - err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt) + err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks) if err := sr.Close(); err != nil { h.logger.Warn("Error while closing the wal segments reader", "err", err) } @@ -1282,7 +1282,7 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) return false, false, 0 } -func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) { +func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int64, bool) { h.walExpiriesMtx.Lock() defer h.walExpiriesMtx.Unlock() @@ -1290,24 +1290,27 @@ func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) { return keepUntil, ok } -func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) { +// updateWALExpiry updates the WAL expiry for a series, keeping the higher of the current value and keepUntil. +func (h *Head) updateWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) { h.walExpiriesMtx.Lock() defer h.walExpiriesMtx.Unlock() - h.walExpiries[id] = keepUntil + h.walExpiries[id] = max(keepUntil, h.walExpiries[id]) } -// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint -// last is the last WAL segment that was considered for checkpointing. -func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { - // Keep the record if the series exists in the head. - if h.series.getByID(id) != nil { - return true - } +// keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint. +// mint is the time before which data in the WAL is being truncated. +func (h *Head) keepSeriesInWALCheckpointFn(mint int64) func(id chunks.HeadSeriesRef) bool { + return func(id chunks.HeadSeriesRef) bool { + // Keep the record if the series exists in the head. + if h.series.getByID(id) != nil { + return true + } - // Keep the record if the series has an expiry set. - keepUntil, ok := h.getWALExpiry(id) - return ok && keepUntil > last + // Keep the record if the series has an expiry set. + keepUntil, ok := h.getWALExpiry(id) + return ok && keepUntil >= mint + } } // truncateWAL removes old data before mint from the WAL. @@ -1344,7 +1347,7 @@ func (h *Head) truncateWAL(mint int64) error { } h.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpoint, mint); err != nil { + if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpointFn(mint), mint); err != nil { h.metrics.checkpointCreationFail.Inc() var cerr *chunks.CorruptionErr if errors.As(err, &cerr) { @@ -1359,11 +1362,10 @@ func (h *Head) truncateWAL(mint int64) error { h.logger.Error("truncating segments failed", "err", err) } - // The checkpoint is written and segments before it is truncated, so stop - // tracking expired series. + // The checkpoint is written and data before mint is truncated, so stop tracking expired series. h.walExpiriesMtx.Lock() - for ref, segment := range h.walExpiries { - if segment <= last { + for ref, keepUntil := range h.walExpiries { + if keepUntil < mint { delete(h.walExpiries, ref) } } @@ -1633,16 +1635,13 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { h.tombstones.TruncateBefore(mint) if h.wal != nil { - _, last, _ := wlog.Segments(h.wal.Dir()) h.walExpiriesMtx.Lock() - // Keep series records until we're past segment 'last' - // because the WAL will still have samples records with - // this ref ID. If we didn't keep these series records then - // on start up when we replay the WAL, or any other code - // that reads the WAL, wouldn't be able to use those - // samples since we would have no labels for that ref ID. + // Samples for deleted series are likely still in the WAL, so flag that the deleted series records should be kept during + // WAL checkpointing while the WAL contains data through actualInOrderMint. + // If we didn't keep these series records then on start up when we replay the WAL, or any other code that reads the WAL, + // wouldn't be able to use those samples since we would have no labels for that ref ID. for ref := range deleted { - h.walExpiries[chunks.HeadSeriesRef(ref)] = last + h.walExpiries[chunks.HeadSeriesRef(ref)] = actualInOrderMint } h.walExpiriesMtx.Unlock() } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b4a238f685..e141c1dcfd 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -162,6 +162,10 @@ func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}, buf []byte) [] buf = enc.Tombstones(v, buf) case []record.RefExemplar: buf = enc.Exemplars(v, buf) + case []record.RefHistogramSample: + buf, _ = enc.HistogramSamples(v, buf) + case []record.RefFloatHistogramSample: + buf, _ = enc.FloatHistogramSamples(v, buf) case []record.RefMmapMarker: buf = enc.MmapMarkers(v, buf) case []record.RefMetadata: @@ -766,9 +770,7 @@ func TestHead_ReadWAL(t *testing.T) { // But it should have a WAL expiry set. keepUntil, ok := head.getWALExpiry(101) require.True(t, ok) - _, last, err := wlog.Segments(w.Dir()) - require.NoError(t, err) - require.Equal(t, last, keepUntil) + require.Equal(t, int64(101), keepUntil) // Only the duplicate series record should have a WAL expiry set. _, ok = head.getWALExpiry(50) require.False(t, ok) @@ -886,17 +888,265 @@ func TestHead_WALMultiRef(t *testing.T) { }}, series) } +func TestHead_WALCheckpointMultiRef(t *testing.T) { + cases := []struct { + name string + walEntries []interface{} + expectedWalExpiry int64 + walTruncateMinT int64 + expectedWalEntries []interface{} + }{ + { + name: "Samples only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 1, T: 100, V: 1}, + {Ref: 2, T: 200, V: 2}, + {Ref: 2, T: 500, V: 3}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 3}, + }, + }, + }, + { + name: "Tombstones only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []tombstones.Stone{ + {Ref: 1, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 100}}}, + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 200}}}, + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + }, + }, + { + name: "Exemplars only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 1, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")}, + {Ref: 2, T: 200, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + {Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + }, + { + name: "Histograms only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: 100, H: &histogram.Histogram{}}, + {Ref: 2, T: 200, H: &histogram.Histogram{}}, + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + }, + }, + { + name: "Float histograms only; keep needed duplicate series record", + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: 100, FH: &histogram.FloatHistogram{}}, + {Ref: 2, T: 200, FH: &histogram.FloatHistogram{}}, + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 500, + walTruncateMinT: 500, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + }, + { + name: "All record types; keep needed duplicate series record until last record", + // Series with 2 refs and samples for both + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 3}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 500, H: &histogram.Histogram{}}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 500, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 800, + walTruncateMinT: 700, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + }, + }, + { + name: "All record types; drop expired duplicate series record", + // Series with 2 refs and samples for both + walEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + {Ref: 2, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 2, T: 500, V: 2}, + {Ref: 1, T: 900, V: 3}, + }, + []tombstones.Stone{ + {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 750}}}, + }, + []record.RefExemplar{ + {Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: 600, H: &histogram.Histogram{}}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: 700, FH: &histogram.FloatHistogram{}}, + }, + }, + expectedWalExpiry: 800, + walTruncateMinT: 900, + expectedWalEntries: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: labels.FromStrings("a", "1")}, + }, + []record.RefSample{ + {Ref: 1, T: 900, V: 3}, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + h, w := newTestHead(t, 1000, compression.None, false) + t.Cleanup(func() { + require.NoError(t, h.Close()) + }) + + populateTestWL(t, w, tc.walEntries, nil) + first, _, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + + require.NoError(t, h.Init(0)) + + keepUntil, ok := h.getWALExpiry(2) + require.True(t, ok) + require.Equal(t, tc.expectedWalExpiry, keepUntil) + + // Each truncation creates a new segment, so attempt truncations until a checkpoint is created + for { + h.lastWALTruncationTime.Store(0) // Reset so that it's always time to truncate the WAL + err := h.truncateWAL(tc.walTruncateMinT) + require.NoError(t, err) + f, _, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + if f > first { + break + } + } + + // Read test WAL , checkpoint first + checkpointDir, _, err := wlog.LastCheckpoint(w.Dir()) + require.NoError(t, err) + cprecs := readTestWAL(t, checkpointDir) + recs := readTestWAL(t, w.Dir()) + recs = append(cprecs, recs...) + + // Use testutil.RequireEqual which handles labels properly with dedupelabels + testutil.RequireEqual(t, tc.expectedWalEntries, recs) + }) + } +} + func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { existingRef := 1 existingLbls := labels.FromStrings("foo", "bar") - deletedKeepUntil := 10 + keepUntil := int64(10) cases := []struct { - name string - prepare func(t *testing.T, h *Head) - seriesRef chunks.HeadSeriesRef - last int - expected bool + name string + prepare func(t *testing.T, h *Head) + mint int64 + expected bool }{ { name: "keep series still in the head", @@ -904,26 +1154,22 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false) require.NoError(t, err) }, - seriesRef: chunks.HeadSeriesRef(existingRef), - expected: true, + expected: true, }, { - name: "keep deleted series with keepUntil > last", - prepare: func(_ *testing.T, h *Head) { - h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) - }, - seriesRef: chunks.HeadSeriesRef(existingRef), - last: deletedKeepUntil - 1, - expected: true, + name: "keep series with keepUntil > mint", + mint: keepUntil - 1, + expected: true, }, { - name: "drop deleted series with keepUntil <= last", - prepare: func(_ *testing.T, h *Head) { - h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) - }, - seriesRef: chunks.HeadSeriesRef(existingRef), - last: deletedKeepUntil, - expected: false, + name: "keep series with keepUntil = mint", + mint: keepUntil, + expected: true, + }, + { + name: "drop series with keepUntil < mint", + mint: keepUntil + 1, + expected: false, }, } @@ -936,10 +1182,12 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { if tc.prepare != nil { tc.prepare(t, h) + } else { + h.updateWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil) } - kept := h.keepSeriesInWALCheckpoint(tc.seriesRef, tc.last) - require.Equal(t, tc.expected, kept) + keep := h.keepSeriesInWALCheckpointFn(tc.mint) + require.Equal(t, tc.expected, keep(chunks.HeadSeriesRef(existingRef))) }) } } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 3e0dadb526..9b0982423f 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -76,7 +76,7 @@ func counterAddNonZero(v *prometheus.CounterVec, value float64, lvs ...string) { } } -func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) { +func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { // Track number of missing series records that were referenced by other records. unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}} // Track number of different records that referenced a series we don't know about @@ -266,8 +266,6 @@ Outer: } if !created { multiRef[walSeries.Ref] = mSeries.ref - // Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints. - h.setWALExpiry(walSeries.Ref, lastSegment) } idx := uint64(mSeries.ref) % uint64(concurrency) @@ -293,6 +291,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) @@ -314,6 +314,8 @@ Outer: continue } if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok { + // This is a tombstone for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(chunks.HeadSeriesRef(s.Ref), itv.Maxt) s.Ref = storage.SeriesRef(r) } if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil { @@ -331,6 +333,8 @@ Outer: continue } if r, ok := multiRef[e.Ref]; ok { + // This is an exemplar for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(e.Ref, e.T) e.Ref = r } exemplarsInput <- e @@ -355,6 +359,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) @@ -388,6 +394,8 @@ Outer: continue // Before minValidTime: discard. } if r, ok := multiRef[sam.Ref]; ok { + // This is a float histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp. + h.updateWALExpiry(sam.Ref, sam.T) sam.Ref = r } mod := uint64(sam.Ref) % uint64(concurrency) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 2c1b0c0534..5c607d7030 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -93,7 +93,7 @@ const CheckpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, last int) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser @@ -181,7 +181,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Drop irrelevant series in place. repl := series[:0] for _, s := range series { - if keep(s.Ref, to) { + if keep(s.Ref) { repl = append(repl, s) } } @@ -323,7 +323,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Only keep reference to the latest found metadata for each refID. repl := 0 for _, m := range metadata { - if keep(m.Ref, to) { + if keep(m.Ref) { if _, ok := latestMetadataMap[m.Ref]; !ok { repl++ } diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 90c1035229..0b0d11ac45 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -292,7 +292,7 @@ func TestCheckpoint(t *testing.T) { } require.NoError(t, w.Close()) - stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef, _ int) bool { + stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool { return x%2 == 0 }, last/2) require.NoError(t, err) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 4b8b408edc..57a4bbe401 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -400,7 +400,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { } } - Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef, int) bool { return true }, 0) + Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef) bool { return true }, 0) w.Truncate(1) // Write more records after checkpointing. @@ -492,7 +492,7 @@ func TestReadCheckpoint(t *testing.T) { } _, err = w.NextSegmentSync() require.NoError(t, err) - _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef, int) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef) bool { return true }, 0) require.NoError(t, err) require.NoError(t, w.Truncate(32)) @@ -655,7 +655,7 @@ func TestCheckpointSeriesReset(t *testing.T) { return wt.checkNumSeries() == seriesCount }, 10*time.Second, 1*time.Second) - _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(chunks.HeadSeriesRef, int) bool { return true }, 0) + _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(chunks.HeadSeriesRef) bool { return true }, 0) require.NoError(t, err) err = w.Truncate(5)