mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-06 06:07:11 +02:00
Merge pull request #16231 from pr00se/multiref-improvements
TSDB: Handle metadata/tombstones/exemplars for duplicate series during WAL replay
This commit is contained in:
commit
bc595263c1
@ -164,6 +164,8 @@ func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}) {
|
||||
require.NoError(t, w.Log(enc.Exemplars(v, nil)))
|
||||
case []record.RefMmapMarker:
|
||||
require.NoError(t, w.Log(enc.MmapMarkers(v, nil)))
|
||||
case []record.RefMetadata:
|
||||
require.NoError(t, w.Log(enc.Metadata(v, nil)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -714,13 +716,22 @@ func TestHead_ReadWAL(t *testing.T) {
|
||||
[]record.RefSample{
|
||||
{Ref: 10, T: 101, V: 5},
|
||||
{Ref: 50, T: 101, V: 6},
|
||||
// Sample for duplicate series record.
|
||||
{Ref: 101, T: 101, V: 7},
|
||||
},
|
||||
[]tombstones.Stone{
|
||||
{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
|
||||
// Tombstone for duplicate series record.
|
||||
{Ref: 101, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 100}}},
|
||||
},
|
||||
[]record.RefExemplar{
|
||||
{Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")},
|
||||
// Exemplar for duplicate series record.
|
||||
{Ref: 101, T: 101, V: 7, Labels: labels.FromStrings("trace_id", "zxcv")},
|
||||
},
|
||||
[]record.RefMetadata{
|
||||
// Metadata for duplicate series record.
|
||||
{Ref: 101, Type: uint8(record.Counter), Unit: "foo", Help: "total foo"},
|
||||
},
|
||||
}
|
||||
|
||||
@ -766,23 +777,46 @@ func TestHead_ReadWAL(t *testing.T) {
|
||||
return x
|
||||
}
|
||||
|
||||
// Verify samples and exemplar for series 10.
|
||||
c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
|
||||
q, err := head.ExemplarQuerier(context.Background())
|
||||
require.NoError(t, err)
|
||||
e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, e)
|
||||
require.NotEmpty(t, e[0].Exemplars)
|
||||
require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0]))
|
||||
|
||||
// Verify samples for series 50
|
||||
c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
|
||||
// Verify records for series 100 and its duplicate, series 101.
|
||||
// The samples before the new series record should be discarded since a duplicate record
|
||||
// is only possible when old samples were compacted.
|
||||
c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
|
||||
q, err := head.ExemplarQuerier(context.Background())
|
||||
q, err = head.ExemplarQuerier(context.Background())
|
||||
require.NoError(t, err)
|
||||
e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")})
|
||||
e, err = q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "3")})
|
||||
require.NoError(t, err)
|
||||
require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0]))
|
||||
require.NotEmpty(t, e)
|
||||
require.NotEmpty(t, e[0].Exemplars)
|
||||
require.True(t, exemplar.Exemplar{Ts: 101, Value: 7, Labels: labels.FromStrings("trace_id", "zxcv")}.Equals(e[0].Exemplars[0]))
|
||||
|
||||
require.NotNil(t, s100.meta)
|
||||
require.Equal(t, "foo", s100.meta.Unit)
|
||||
require.Equal(t, "total foo", s100.meta.Help)
|
||||
|
||||
intervals, err := head.tombstones.Get(storage.SeriesRef(s100.ref))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tombstones.Intervals{{Mint: 0, Maxt: 100}}, intervals)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -320,6 +320,9 @@ Outer:
|
||||
if itv.Maxt < h.minValidTime.Load() {
|
||||
continue
|
||||
}
|
||||
if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok {
|
||||
s.Ref = storage.SeriesRef(r)
|
||||
}
|
||||
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
|
||||
unknownTombstoneRefs.Inc()
|
||||
missingSeries[chunks.HeadSeriesRef(s.Ref)] = struct{}{}
|
||||
@ -331,6 +334,9 @@ Outer:
|
||||
h.wlReplaytStonesPool.Put(v)
|
||||
case []record.RefExemplar:
|
||||
for _, e := range v {
|
||||
if r, ok := multiRef[e.Ref]; ok {
|
||||
e.Ref = r
|
||||
}
|
||||
exemplarsInput <- e
|
||||
}
|
||||
h.wlReplayExemplarsPool.Put(v)
|
||||
@ -408,6 +414,9 @@ Outer:
|
||||
h.wlReplayFloatHistogramsPool.Put(v)
|
||||
case []record.RefMetadata:
|
||||
for _, m := range v {
|
||||
if r, ok := multiRef[m.Ref]; ok {
|
||||
m.Ref = r
|
||||
}
|
||||
s := h.series.getByID(m.Ref)
|
||||
if s == nil {
|
||||
unknownMetadataRefs.Inc()
|
||||
|
Loading…
Reference in New Issue
Block a user