diff --git a/head.go b/head.go index db9e97ec73..33bcb7088e 100644 --- a/head.go +++ b/head.go @@ -98,25 +98,26 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { meta: *meta, } - // Replay contents of the write ahead log. - if err = wal.ReadAll(&walHandler{ - series: func(lset labels.Labels) error { + r := wal.Reader() + + for r.Next() { + series, samples := r.At() + + for _, lset := range series { h.create(lset.Hash(), lset) h.meta.Stats.NumSeries++ - return nil - }, - sample: func(s refdSample) error { + } + for _, s := range samples { h.series[s.ref].append(s.t, s.v) if !h.inBounds(s.t) { - return ErrOutOfBounds + return nil, errors.Wrap(ErrOutOfBounds, "consume WAL") } - h.meta.Stats.NumSamples++ - return nil - }, - }); err != nil { - return nil, err + } + } + if err := r.Err(); err != nil { + return nil, errors.Wrap(err, "consume WAL") } h.updateMapping() diff --git a/head_test.go b/head_test.go index 83c142c4f7..6f33ab0d1b 100644 --- a/head_test.go +++ b/head_test.go @@ -1,7 +1,6 @@ package tsdb import ( - "io" "io/ioutil" "os" "sort" @@ -43,11 +42,7 @@ func TestPositionMapper(t *testing.T) { } func BenchmarkCreateSeries(b *testing.B) { - f, err := os.Open("cmd/tsdb/testdata.1m") - require.NoError(b, err) - defer f.Close() - - lbls, err := readPrometheusLabels(f, 1e6) + lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6) require.NoError(b, err) b.Run("", func(b *testing.B) { @@ -67,8 +62,14 @@ func BenchmarkCreateSeries(b *testing.B) { }) } -func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { - b, err := ioutil.ReadAll(r) +func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { + f, err := os.Open(fn) + if err != nil { + return nil, err + } + defer f.Close() + + b, err := ioutil.ReadAll(f) if err != nil { return nil, err } diff --git a/cmd/tsdb/testdata.20k b/testdata/20k.series similarity index 100% rename from cmd/tsdb/testdata.20k rename to testdata/20k.series diff --git a/wal.go b/wal.go index 89de63182a..a930252483 100644 --- a/wal.go +++ b/wal.go @@ -83,46 +83,20 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error if err := w.initSegments(); err != nil { return nil, err } - // If there are no existing segments yet, create the initial one. - if len(w.files) == 0 { - if err := w.cut(); err != nil { - return nil, err - } - } go w.run(flushInterval) return w, nil } -type walHandler struct { - sample func(refdSample) error - series func(labels.Labels) error -} - -// ReadAll consumes all entries in the WAL and triggers the registered handlers. -func (w *WAL) ReadAll(h *walHandler) error { - for i, f := range w.files { - dec := newWALDecoder(f, h) - - for { - if err := dec.next(); err != nil { - if err == io.EOF { - // If file end was reached, move on to the next segment. - break - } - return err - } - } - - // Close completed file after we are done reading it. - if i < len(w.files)-1 { - if err := f.Close(); err != nil { - return err - } - } +// Reader returns a new reader over the the write ahead log data. +// It must be completely consumed before writing to the WAL. +func (w *WAL) Reader() *WALReader { + var rs []io.ReadCloser + for _, f := range w.files { + rs = append(rs, f) } - return nil + return &WALReader{rs: rs} } // Log writes a batch of new series labels and samples to the log. @@ -194,6 +168,13 @@ func (w *WAL) cut() error { if err := w.sync(); err != nil { return err } + off, err := tf.Seek(0, os.SEEK_CUR) + if err != nil { + return err + } + if err := tf.Truncate(off); err != nil { + return err + } if err := tf.Close(); err != nil { return err } @@ -245,6 +226,9 @@ func (w *WAL) Sync() error { } func (w *WAL) sync() error { + if w.cur == nil { + return nil + } if err := w.cur.Flush(); err != nil { return err } @@ -286,7 +270,10 @@ func (w *WAL) Close() error { } // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. - return w.tail().Close() + if tf := w.tail(); tf != nil { + return tf.Close() + } + return nil } const ( @@ -302,16 +289,20 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { w.mtx.Lock() defer w.mtx.Unlock() - sz := int64(6 + 4 + len(buf)) - - if w.curN+sz > w.segmentSize { + // Cut to the next segment if exceeds the file size unless it would also + // exceed the size of a new segment. + var ( + sz = int64(6 + 4 + len(buf)) + newsz = w.curN + sz + ) + if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { if err := w.cut(); err != nil { return err } } h := crc32.NewIEEE() - wr := io.MultiWriter(h, w.cur, os.Stdout) + wr := io.MultiWriter(h, w.cur) b := make([]byte, 6) b[0] = byte(et) @@ -414,38 +405,122 @@ func (w *WAL) encodeSamples(samples []refdSample) error { return w.entry(WALEntrySamples, walSamplesSimple, buf) } -type walDecoder struct { - r io.Reader - handler *walHandler - +// WALReader decodes and emits write ahead log entries. +type WALReader struct { + rs []io.ReadCloser + cur int buf []byte + + err error + labels []labels.Labels + samples []refdSample } -// newWALDecoder returns a new decoder for the default WAL format. The meta -// headers of a segment must already have been consumed. -func newWALDecoder(r io.Reader, h *walHandler) *walDecoder { - return &walDecoder{ - r: r, - handler: h, - buf: make([]byte, 0, 1024*1024), +// NewWALReader returns a new WALReader over the sequence of the given ReadClosers. +func NewWALReader(rs ...io.ReadCloser) *WALReader { + return &WALReader{ + rs: rs, + buf: make([]byte, 0, 1024*1024), } } -func (d *walDecoder) next() error { - t, flag, b, err := d.entry() +// At returns the last decoded entry of labels or samples. +func (r *WALReader) At() ([]labels.Labels, []refdSample) { + return r.labels, r.samples +} + +// Err returns the last error the reader encountered. +func (r *WALReader) Err() error { + return r.err +} + +// nextEntry retrieves the next entry. It is also used as a testing hook. +func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { + if r.cur >= len(r.rs) { + return 0, 0, nil, io.EOF + } + cr := r.rs[r.cur] + + et, flag, b, err := r.entry(cr) + if err == io.EOF { + // Current reader completed, close and move to the next one. + if err := cr.Close(); err != nil { + return 0, 0, nil, err + } + r.cur++ + return r.nextEntry() + } + return et, flag, b, err +} + +// Next returns decodes the next entry pair and returns true +// if it was succesful. +func (r *WALReader) Next() bool { + r.labels = r.labels[:0] + r.samples = r.samples[:0] + + et, flag, b, err := r.nextEntry() if err != nil { - return err + if err != io.EOF { + r.err = err + } + return false } - switch t { + + switch et { case WALEntrySamples: - return d.decodeSamples(flag, b) + if err := r.decodeSamples(flag, b); err != nil { + r.err = err + } case WALEntrySeries: - return d.decodeSamples(flag, b) + if err := r.decodeSeries(flag, b); err != nil { + r.err = err + } + default: + r.err = errors.Errorf("unknown WAL entry type %d", et) } - return errors.Errorf("unknown WAL entry type %q", t) + return r.err == nil } -func (d *walDecoder) decodeSeries(flag byte, b []byte) error { +func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { + cw := crc32.NewIEEE() + tr := io.TeeReader(cr, cw) + + b := make([]byte, 6) + if _, err := tr.Read(b); err != nil { + return 0, 0, nil, err + } + + var ( + etype = WALEntryType(b[0]) + flag = b[1] + length = int(binary.BigEndian.Uint32(b[2:])) + ) + // Exit if we reached pre-allocated space. + if etype == 0 { + return 0, 0, nil, io.EOF + } + + if length > len(r.buf) { + r.buf = make([]byte, length) + } + buf := r.buf[:length] + + if _, err := tr.Read(buf); err != nil { + return 0, 0, nil, err + } + _, err := cr.Read(b[:4]) + if err != nil { + return 0, 0, nil, err + } + if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp { + return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) + } + + return etype, flag, buf, nil +} + +func (r *WALReader) decodeSeries(flag byte, b []byte) error { for len(b) > 0 { l, n := binary.Uvarint(b) if n < 1 { @@ -470,14 +545,12 @@ func (d *walDecoder) decodeSeries(flag byte, b []byte) error { b = b[n+int(vl):] } - if err := d.handler.series(lset); err != nil { - return err - } + r.labels = append(r.labels, lset) } return nil } -func (d *walDecoder) decodeSamples(flag byte, b []byte) error { +func (r *WALReader) decodeSamples(flag byte, b []byte) error { if len(b) < 16 { return errors.Wrap(errInvalidSize, "header length") } @@ -511,47 +584,7 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error { smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) b = b[8:] - if err := d.handler.sample(smpl); err != nil { - return err - } + r.samples = append(r.samples, smpl) } return nil } - -func (d *walDecoder) entry() (WALEntryType, byte, []byte, error) { - cw := crc32.NewIEEE() - tr := io.TeeReader(d.r, cw) - - b := make([]byte, 6) - if _, err := tr.Read(b); err != nil { - return 0, 0, nil, err - } - - var ( - etype = WALEntryType(b[0]) - flag = b[1] - length = int(binary.BigEndian.Uint32(b[2:])) - ) - // Exit if we reached pre-allocated space. - if etype == 0 { - return 0, 0, nil, io.EOF - } - - if length > len(d.buf) { - d.buf = make([]byte, length) - } - buf := d.buf[:length] - - if _, err := tr.Read(buf); err != nil { - return 0, 0, nil, err - } - _, err := d.r.Read(b[:4]) - if err != nil { - return 0, 0, nil, err - } - if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp { - return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) - } - - return etype, flag, buf, nil -} diff --git a/wal_test.go b/wal_test.go index 5d48197728..773fc4e085 100644 --- a/wal_test.go +++ b/wal_test.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "io/ioutil" + "math/rand" "os" "testing" @@ -106,10 +107,62 @@ func TestWAL_cut(t *testing.T) { // We cannot actually check for correct pre-allocation as it is // optional per filesystem and handled transparently. - et, flag, b, err := newWALDecoder(f, nil).entry() + et, flag, b, err := NewWALReader(f).nextEntry() require.NoError(t, err) require.Equal(t, WALEntrySeries, et) require.Equal(t, flag, byte(walSeriesSimple)) require.Equal(t, []byte("Hello World!!"), b) } } + +// Symmetrical test of reading and writing to the WAL via its main interface. +func TestWAL_Log_Restore(t *testing.T) { + // Generate testing data. It does not make semantical sense but + // for the purpose of this test. + series, err := readPrometheusLabels("testdata/20k.series", 10000) + require.NoError(t, err) + + var samples []refdSample + for i := 0; i < 200000; i++ { + samples = append(samples, refdSample{ + ref: uint64(i % 10000), + t: int64(i * 2), + v: rand.Float64(), + }) + } + + dir, err := ioutil.TempDir("", "test_wal_log_restore") + require.NoError(t, err) + defer os.RemoveAll(dir) + + w, err := OpenWAL(dir, nil, 0) + require.NoError(t, err) + + // Set smaller segment size so we can actually write several files. + w.segmentSize = 300 * 1000 + + for i := 0; i < len(series); i += 100 { + require.NoError(t, w.Log(series[i:i+100], samples[i*10:(i+100)*10])) + } + + require.NoError(t, w.Close()) + + w, err = OpenWAL(dir, nil, 0) + r := w.Reader() + + var i, j int + + for r.Next() { + lsets, smpls := r.At() + + if l := len(lsets); l > 0 { + require.Equal(t, series[i:i+l], lsets) + i += l + } + if l := len(smpls); l > 0 { + require.Equal(t, samples[j:j+l], smpls) + j += l + } + } + require.NoError(t, r.Err()) +}