diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 164de7b73e..8791707273 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -164,6 +164,8 @@ func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}) { require.NoError(t, w.Log(enc.Exemplars(v, nil))) case []record.RefMmapMarker: require.NoError(t, w.Log(enc.MmapMarkers(v, nil))) + case []record.RefMetadata: + require.NoError(t, w.Log(enc.Metadata(v, nil))) } } } @@ -714,13 +716,22 @@ func TestHead_ReadWAL(t *testing.T) { []record.RefSample{ {Ref: 10, T: 101, V: 5}, {Ref: 50, T: 101, V: 6}, + // Sample for duplicate series record. {Ref: 101, T: 101, V: 7}, }, []tombstones.Stone{ {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, + // Tombstone for duplicate series record. + {Ref: 101, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 100}}}, }, []record.RefExemplar{ {Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")}, + // Exemplar for duplicate series record. + {Ref: 101, T: 101, V: 7, Labels: labels.FromStrings("trace_id", "zxcv")}, + }, + []record.RefMetadata{ + // Metadata for duplicate series record. + {Ref: 101, Type: uint8(record.Counter), Unit: "foo", Help: "total foo"}, }, } @@ -766,23 +777,46 @@ func TestHead_ReadWAL(t *testing.T) { return x } + // Verify samples and exemplar for series 10. c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) require.NoError(t, err) require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) + + q, err := head.ExemplarQuerier(context.Background()) + require.NoError(t, err) + e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}) + require.NoError(t, err) + require.NotEmpty(t, e) + require.NotEmpty(t, e[0].Exemplars) + require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0])) + + // Verify samples for series 50 c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool) require.NoError(t, err) require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) + + // Verify records for series 100 and its duplicate, series 101. // The samples before the new series record should be discarded since a duplicate record // is only possible when old samples were compacted. c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool) require.NoError(t, err) require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) - q, err := head.ExemplarQuerier(context.Background()) + q, err = head.ExemplarQuerier(context.Background()) require.NoError(t, err) - e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}) + e, err = q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "3")}) require.NoError(t, err) - require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0])) + require.NotEmpty(t, e) + require.NotEmpty(t, e[0].Exemplars) + require.True(t, exemplar.Exemplar{Ts: 101, Value: 7, Labels: labels.FromStrings("trace_id", "zxcv")}.Equals(e[0].Exemplars[0])) + + require.NotNil(t, s100.meta) + require.Equal(t, "foo", s100.meta.Unit) + require.Equal(t, "total foo", s100.meta.Help) + + intervals, err := head.tombstones.Get(storage.SeriesRef(s100.ref)) + require.NoError(t, err) + require.Equal(t, tombstones.Intervals{{Mint: 0, Maxt: 100}}, intervals) }) } } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 4de091c291..e97f06a2ce 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -320,6 +320,9 @@ Outer: if itv.Maxt < h.minValidTime.Load() { continue } + if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok { + s.Ref = storage.SeriesRef(r) + } if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil { unknownTombstoneRefs.Inc() missingSeries[chunks.HeadSeriesRef(s.Ref)] = struct{}{} @@ -331,6 +334,9 @@ Outer: h.wlReplaytStonesPool.Put(v) case []record.RefExemplar: for _, e := range v { + if r, ok := multiRef[e.Ref]; ok { + e.Ref = r + } exemplarsInput <- e } h.wlReplayExemplarsPool.Put(v) @@ -408,6 +414,9 @@ Outer: h.wlReplayFloatHistogramsPool.Put(v) case []record.RefMetadata: for _, m := range v { + if r, ok := multiRef[m.Ref]; ok { + m.Ref = r + } s := h.series.getByID(m.Ref) if s == nil { unknownMetadataRefs.Inc()