TSDB: Handle metadata/tombstones/exemplars for duplicate series during WAL replay

Signed-off-by: Patryk Prus <p@trykpr.us>
This commit is contained in:
Patryk Prus 2025-03-18 11:34:42 -04:00
parent 7b4f093641
commit e4e1b515bc
No known key found for this signature in database
GPG Key ID: 795650115CA6A58F
2 changed files with 46 additions and 3 deletions

View File

@ -164,6 +164,8 @@ func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}) {
require.NoError(t, w.Log(enc.Exemplars(v, nil)))
case []record.RefMmapMarker:
require.NoError(t, w.Log(enc.MmapMarkers(v, nil)))
case []record.RefMetadata:
require.NoError(t, w.Log(enc.Metadata(v, nil)))
}
}
}
@ -714,13 +716,22 @@ func TestHead_ReadWAL(t *testing.T) {
[]record.RefSample{
{Ref: 10, T: 101, V: 5},
{Ref: 50, T: 101, V: 6},
// Sample for duplicate series record.
{Ref: 101, T: 101, V: 7},
},
[]tombstones.Stone{
{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
// Tombstone for duplicate series record.
{Ref: 101, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 100}}},
},
[]record.RefExemplar{
{Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")},
// Exemplar for duplicate series record.
{Ref: 101, T: 101, V: 7, Labels: labels.FromStrings("trace_id", "zxcv")},
},
[]record.RefMetadata{
// Metadata for duplicate series record.
{Ref: 101, Type: uint8(record.Counter), Unit: "foo", Help: "total foo"},
},
}
@ -766,23 +777,46 @@ func TestHead_ReadWAL(t *testing.T) {
return x
}
// Verify samples and exemplar for series 10.
c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")})
require.NoError(t, err)
require.NotEmpty(t, e)
require.NotEmpty(t, e[0].Exemplars)
require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0]))
// Verify samples for series 50
c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
// Verify records for series 100 and its duplicate, series 101.
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
q, err := head.ExemplarQuerier(context.Background())
q, err = head.ExemplarQuerier(context.Background())
require.NoError(t, err)
e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")})
e, err = q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "3")})
require.NoError(t, err)
require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0]))
require.NotEmpty(t, e)
require.NotEmpty(t, e[0].Exemplars)
require.True(t, exemplar.Exemplar{Ts: 101, Value: 7, Labels: labels.FromStrings("trace_id", "zxcv")}.Equals(e[0].Exemplars[0]))
require.NotNil(t, s100.meta)
require.Equal(t, "foo", s100.meta.Unit)
require.Equal(t, "total foo", s100.meta.Help)
intervals, err := head.tombstones.Get(storage.SeriesRef(s100.ref))
require.NoError(t, err)
require.Equal(t, tombstones.Intervals{{Mint: 0, Maxt: 100}}, intervals)
})
}
}

View File

@ -320,6 +320,9 @@ Outer:
if itv.Maxt < h.minValidTime.Load() {
continue
}
if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok {
s.Ref = storage.SeriesRef(r)
}
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
unknownTombstoneRefs.Inc()
missingSeries[chunks.HeadSeriesRef(s.Ref)] = struct{}{}
@ -331,6 +334,9 @@ Outer:
h.wlReplaytStonesPool.Put(v)
case []record.RefExemplar:
for _, e := range v {
if r, ok := multiRef[e.Ref]; ok {
e.Ref = r
}
exemplarsInput <- e
}
h.wlReplayExemplarsPool.Put(v)
@ -408,6 +414,9 @@ Outer:
h.wlReplayFloatHistogramsPool.Put(v)
case []record.RefMetadata:
for _, m := range v {
if r, ok := multiRef[m.Ref]; ok {
m.Ref = r
}
s := h.series.getByID(m.Ref)
if s == nil {
unknownMetadataRefs.Inc()