diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 26c9658c49..60f39e5d56 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -26,6 +26,7 @@ import ( "time" "unsafe" + "github.com/go-kit/kit/log" "github.com/pkg/errors" promlabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" @@ -88,7 +89,10 @@ func (b *writeBenchmark) run() { dir := filepath.Join(b.outPath, "storage") - st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ + l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + + st, err := tsdb.Open(dir, l, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3), diff --git a/head.go b/head.go index 647a3d3d76..a5ce94e455 100644 --- a/head.go +++ b/head.go @@ -192,7 +192,11 @@ func (h *Head) ReadWAL() error { seriesFunc := func(series []RefSeries) error { for _, s := range series { - h.getOrCreate(s.Labels.Hash(), s.Labels) + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref + } } return nil } @@ -203,7 +207,8 @@ func (h *Head) ReadWAL() error { } ms := h.series.getByID(s.Ref) if ms == nil { - return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) + h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref) + continue } _, chunkCreated := ms.append(s.T, s.V) if chunkCreated { @@ -211,7 +216,6 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } - return nil } deletesFunc := func(stones []Stone) error { @@ -223,7 +227,6 @@ func (h *Head) ReadWAL() error { h.tombstones.add(s.ref, itv) } } - return nil } @@ -846,7 +849,12 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) { // Optimistically assume that we are the first one to create the series. id := atomic.AddUint64(&h.lastSeriesID, 1) - s = newMemSeries(lset, id, h.chunkRange) + + return h.getOrCreateWithID(id, hash, lset) +} + +func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) { + s := newMemSeries(lset, id, h.chunkRange) s, created := h.series.getOrSet(hash, s) if !created { diff --git a/head_test.go b/head_test.go index b4901b7cd5..b7603e78ee 100644 --- a/head_test.go +++ b/head_test.go @@ -21,6 +21,7 @@ import ( "unsafe" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" promlabels "github.com/prometheus/prometheus/pkg/labels" @@ -83,6 +84,82 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { return mets, nil } +type memoryWAL struct { + nopWAL + entries []interface{} +} + +func (w *memoryWAL) Reader() WALReader { + return w +} + +func (w *memoryWAL) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { + for _, e := range w.entries { + switch v := e.(type) { + case []RefSeries: + series(v) + case []RefSample: + samples(v) + case []Stone: + deletes(v) + } + } + return nil +} + +func TestHead_ReadWAL(t *testing.T) { + entries := []interface{}{ + []RefSeries{ + {Ref: 10, Labels: labels.FromStrings("a", "1")}, + {Ref: 11, Labels: labels.FromStrings("a", "2")}, + {Ref: 100, Labels: labels.FromStrings("a", "3")}, + }, + []RefSample{ + {Ref: 0, T: 99, V: 1}, + {Ref: 10, T: 100, V: 2}, + {Ref: 100, T: 100, V: 3}, + }, + []RefSeries{ + {Ref: 50, Labels: labels.FromStrings("a", "4")}, + }, + []RefSample{ + {Ref: 10, T: 101, V: 5}, + {Ref: 50, T: 101, V: 6}, + }, + } + wal := &memoryWAL{entries: entries} + + head, err := NewHead(nil, nil, wal, 1000) + require.NoError(t, err) + + require.NoError(t, head.ReadWAL()) + require.Equal(t, uint64(100), head.lastSeriesID) + + s10 := head.series.getByID(10) + s11 := head.series.getByID(11) + s50 := head.series.getByID(50) + s100 := head.series.getByID(100) + + require.Equal(t, labels.FromStrings("a", "1"), s10.lset) + require.Equal(t, labels.FromStrings("a", "2"), s11.lset) + require.Equal(t, labels.FromStrings("a", "4"), s50.lset) + require.Equal(t, labels.FromStrings("a", "3"), s100.lset) + + expandChunk := func(c chunks.Iterator) (x []sample) { + for c.Next() { + t, v := c.At() + x = append(x, sample{t: t, v: v}) + } + require.NoError(t, c.Err()) + return x + } + + require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) + require.Equal(t, 0, len(s11.chunks)) + require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) + require.Equal(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) +} + func TestHead_Truncate(t *testing.T) { h, err := NewHead(nil, nil, nil, 1000) require.NoError(t, err) diff --git a/wal.go b/wal.go index 695e8d31b5..27984ea0ce 100644 --- a/wal.go +++ b/wal.go @@ -824,7 +824,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode series entry") } - seriesf(series) + if err := seriesf(series); err != nil { + return err + } cf := r.current() @@ -839,7 +841,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode samples entry") } - samplesf(samples) + if err := samplesf(samples); err != nil { + return err + } // Update the times for the WAL segment file. cf := r.current() @@ -855,7 +859,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode delete entry") } - deletesf(stones) + if err := deletesf(stones); err != nil { + return err + } // Update the times for the WAL segment file. cf := r.current()