diff --git a/head_test.go b/head_test.go index b707d63c38..83c142c4f7 100644 --- a/head_test.go +++ b/head_test.go @@ -1,11 +1,17 @@ package tsdb import ( + "io" "io/ioutil" "os" "sort" "testing" + "unsafe" + "github.com/fabxc/tsdb/labels" + + promlabels "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" "github.com/stretchr/testify/require" ) @@ -60,3 +66,29 @@ func BenchmarkCreateSeries(b *testing.B) { } }) } + +func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { + b, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + p := textparse.New(b) + i := 0 + var mets []labels.Labels + hashes := map[uint64]struct{}{} + + for p.Next() && i < n { + m := make(labels.Labels, 0, 10) + p.Metric((*promlabels.Labels)(unsafe.Pointer(&m))) + + h := m.Hash() + if _, ok := hashes[h]; ok { + continue + } + mets = append(mets, m) + hashes[h] = struct{}{} + i++ + } + return mets, p.Err() +} diff --git a/wal.go b/wal.go index f8e480464a..89de63182a 100644 --- a/wal.go +++ b/wal.go @@ -21,15 +21,18 @@ import ( type WALEntryType byte const ( - WALMagic = 0x43AF00EF + // WALMagic is a 4 byte number every WAL segment file starts with. + WALMagic = uint32(0x43AF00EF) - // Format versioning flag of a WAL segment file. - WALFormatDefault byte = 1 + // WALFormatDefault is the version flag for the default outer segment file format. + WALFormatDefault = byte(1) +) - // Entry types in a segment file. - WALEntrySymbols = 1 - WALEntrySeries = 2 - WALEntrySamples = 3 +// Entry types in a segment file. +const ( + WALEntrySymbols WALEntryType = 1 + WALEntrySeries WALEntryType = 2 + WALEntrySamples WALEntryType = 3 ) // WAL is a write ahead log for series data. It can only be written to. @@ -42,9 +45,10 @@ type WAL struct { logger log.Logger flushInterval time.Duration + segmentSize int64 cur *bufio.Writer - curN int + curN int64 stopc chan struct{} donec chan struct{} @@ -74,6 +78,7 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error flushInterval: flushInterval, donec: make(chan struct{}), stopc: make(chan struct{}), + segmentSize: walSegmentSizeBytes, } if err := w.initSegments(); err != nil { return nil, err @@ -101,7 +106,7 @@ func (w *WAL) ReadAll(h *walHandler) error { dec := newWALDecoder(f, h) for { - if err := dec.entry(); err != nil { + if err := dec.next(); err != nil { if err == io.EOF { // If file end was reached, move on to the next segment. break @@ -145,7 +150,7 @@ func (w *WAL) initSegments() error { return nil } if len(fns) > 1 { - for _, fn := range fns[:len(fns)-2] { + for _, fn := range fns[:len(fns)-1] { lf, err := fileutil.TryLockFile(fn, os.O_RDONLY, 0666) if err != nil { return err @@ -184,19 +189,14 @@ func (w *WAL) initSegments() error { // cut finishes the currently active segments and open the next one. // The encoder is reset to point to the new segment. func (w *WAL) cut() error { - // If there's a previous segment, truncate it to its final size - // and sync everything to disc. + // Sync current tail to disc and close. if tf := w.tail(); tf != nil { - off, err := tf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := tf.Truncate(off); err != nil { - return err - } if err := w.sync(); err != nil { return err } + if err := tf.Close(); err != nil { + return err + } } p, _, err := nextSequenceFile(w.dirFile.Name(), "") @@ -207,10 +207,7 @@ func (w *WAL) cut() error { if err != nil { return err } - if _, err = f.Seek(0, os.SEEK_SET); err != nil { - return err - } - if err = fileutil.Preallocate(f.File, walSegmentSizeBytes, true); err != nil { + if err = fileutil.Preallocate(f.File, w.segmentSize, true); err != nil { return err } if err = w.dirFile.Sync(); err != nil { @@ -228,7 +225,7 @@ func (w *WAL) cut() error { w.files = append(w.files, f) w.cur = bufio.NewWriterSize(f, 4*1024*1024) - w.curN = len(metab) + w.curN = 8 return nil } @@ -284,15 +281,12 @@ func (w *WAL) Close() error { w.mtx.Lock() defer w.mtx.Unlock() - var merr MultiError - if err := w.sync(); err != nil { return err } - for _, f := range w.files { - merr.Add(f.Close()) - } - return merr.Err() + // On opening, a WAL must be fully consumed once. Afterwards + // only the current segment will still be open. + return w.tail().Close() } const ( @@ -308,16 +302,16 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { w.mtx.Lock() defer w.mtx.Unlock() - sz := 6 + 4 + len(buf) + sz := int64(6 + 4 + len(buf)) - if w.curN+sz > walSegmentSizeBytes { + if w.curN+sz > w.segmentSize { if err := w.cut(); err != nil { return err } } h := crc32.NewIEEE() - wr := io.MultiWriter(h, w.cur) + wr := io.MultiWriter(h, w.cur, os.Stdout) b := make([]byte, 6) b[0] = byte(et) @@ -437,6 +431,20 @@ func newWALDecoder(r io.Reader, h *walHandler) *walDecoder { } } +func (d *walDecoder) next() error { + t, flag, b, err := d.entry() + if err != nil { + return err + } + switch t { + case WALEntrySamples: + return d.decodeSamples(flag, b) + case WALEntrySeries: + return d.decodeSamples(flag, b) + } + return errors.Errorf("unknown WAL entry type %q", t) +} + func (d *walDecoder) decodeSeries(flag byte, b []byte) error { for len(b) > 0 { l, n := binary.Uvarint(b) @@ -510,10 +518,13 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error { return nil } -func (d *walDecoder) entry() error { +func (d *walDecoder) entry() (WALEntryType, byte, []byte, error) { + cw := crc32.NewIEEE() + tr := io.TeeReader(d.r, cw) + b := make([]byte, 6) - if _, err := d.r.Read(b); err != nil { - return err + if _, err := tr.Read(b); err != nil { + return 0, 0, nil, err } var ( @@ -523,7 +534,7 @@ func (d *walDecoder) entry() error { ) // Exit if we reached pre-allocated space. if etype == 0 { - return io.EOF + return 0, 0, nil, io.EOF } if length > len(d.buf) { @@ -531,26 +542,16 @@ func (d *walDecoder) entry() error { } buf := d.buf[:length] - cw := crc32.NewIEEE() - tr := io.TeeReader(d.r, cw) - if _, err := tr.Read(buf); err != nil { - return err + return 0, 0, nil, err } _, err := d.r.Read(b[:4]) if err != nil { - return err + return 0, 0, nil, err } if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp { - return errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) + return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) } - switch etype { - case WALEntrySeries: - return d.decodeSeries(flag, buf) - case WALEntrySamples: - return d.decodeSamples(flag, buf) - } - - return errors.Errorf("unknown WAL entry type %q", etype) + return etype, flag, buf, nil } diff --git a/wal_test.go b/wal_test.go index 35ea053132..5d48197728 100644 --- a/wal_test.go +++ b/wal_test.go @@ -1,240 +1,115 @@ package tsdb import ( - "io" + "encoding/binary" + "fmt" "io/ioutil" "os" "testing" - "time" - "github.com/fabxc/tsdb/labels" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "github.com/prometheus/common/model" + "github.com/coreos/etcd/pkg/fileutil" "github.com/stretchr/testify/require" ) -func BenchmarkWALWrite(b *testing.B) { - d, err := ioutil.TempDir("", "wal_read_test") - require.NoError(b, err) +func TestWAL_initSegments(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test_wal_open") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) - defer func() { - require.NoError(b, os.RemoveAll(d)) - }() + df, err := fileutil.OpenDir(tmpdir) + require.NoError(t, err) - wal, err := OpenWAL(d, nil, 500*time.Millisecond) - require.NoError(b, err) + w := &WAL{dirFile: df} - f, err := os.Open("cmd/tsdb/testdata.1m") - require.NoError(b, err) + // Create segment files with an appropriate header. + for i := 1; i <= 5; i++ { + metab := make([]byte, 8) + binary.BigEndian.PutUint32(metab[:4], WALMagic) + metab[4] = WALFormatDefault - series, err := readPrometheusLabels(f, b.N/300) - require.NoError(b, err) - - var ( - samples [][]refdSample - ts int64 - ) - for i := 0; i < 300; i++ { - ts += int64(30000) - scrape := make([]refdSample, 0, len(series)) - - for ref := range series { - scrape = append(scrape, refdSample{ - ref: uint64(ref), - t: ts, - v: 12345788, - }) - } - samples = append(samples, scrape) + f, err := os.Create(fmt.Sprintf("%s/000%d", tmpdir, i)) + require.NoError(t, err) + _, err = f.Write(metab) + require.NoError(t, err) + require.NoError(t, f.Close()) } - b.ResetTimer() + // Initialize 5 correct segment files. + require.NoError(t, w.initSegments()) - err = wal.Log(series, samples[0]) - require.NoError(b, err) + require.Equal(t, 5, len(w.files), "unexpected number of segments loaded") - for _, s := range samples[1:] { - err = wal.Log(nil, s) - require.NoError(b, err) + // Validate that files are locked properly. + for _, of := range w.files { + f, err := os.Open(of.Name()) + require.NoError(t, err, "open locked segment %s", f.Name()) + + _, err = f.Read([]byte{0}) + require.NoError(t, err, "read locked segment %s", f.Name()) + + _, err = f.Write([]byte{0}) + require.Error(t, err, "write to tail segment file %s", f.Name()) + + require.NoError(t, f.Close()) } - require.NoError(b, wal.Close()) -} - -func BenchmarkWALRead(b *testing.B) { - f, err := os.Open("cmd/tsdb/testdata.1m") - require.NoError(b, err) - - series, err := readPrometheusLabels(f, 1000000) - require.NoError(b, err) - - b.Run("test", func(b *testing.B) { - bseries := series[:b.N/300] - - d, err := ioutil.TempDir("", "wal_read_test") - require.NoError(b, err) - - defer func() { - require.NoError(b, os.RemoveAll(d)) - }() - - wal, err := OpenWAL(d, nil, 500*time.Millisecond) - require.NoError(b, err) - - var ( - samples [][]refdSample - ts int64 - ) - for i := 0; i < 300; i++ { - ts += int64(30000) - scrape := make([]refdSample, 0, len(bseries)) - - for ref := range bseries { - scrape = append(scrape, refdSample{ - ref: uint64(ref), - t: ts, - v: 12345788, - }) - } - samples = append(samples, scrape) - } - - err = wal.Log(bseries, samples[0]) - require.NoError(b, err) - - for _, s := range samples[1:] { - err = wal.Log(nil, s) - require.NoError(b, err) - } - - require.NoError(b, wal.Close()) - - b.ResetTimer() - - wal, err = OpenWAL(d, nil, 500*time.Millisecond) - require.NoError(b, err) - - var numSeries, numSamples int - - err = wal.ReadAll(&walHandler{ - series: func(lset labels.Labels) error { - numSeries++ - return nil - }, - sample: func(smpl refdSample) error { - numSamples++ - return nil - }, - }) - require.NoError(b, err) - - // stat, _ := wal.f.Stat() - // fmt.Println("read series", numSeries, "read samples", numSamples, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) - }) -} - -func BenchmarkWALReadIntoHead(b *testing.B) { - f, err := os.Open("cmd/tsdb/testdata.1m") - require.NoError(b, err) - - series, err := readPrometheusLabels(f, 1000000) - require.NoError(b, err) - - b.Run("test", func(b *testing.B) { - bseries := series[:b.N/300] - - d, err := ioutil.TempDir("", "wal_read_test") - require.NoError(b, err) - - defer func() { - require.NoError(b, os.RemoveAll(d)) - }() - - wal, err := OpenWAL(d, nil, 500*time.Millisecond) - require.NoError(b, err) - - var ( - samples [][]refdSample - ts int64 - ) - for i := 0; i < 300; i++ { - ts += int64(30000) - scrape := make([]refdSample, 0, len(bseries)) - - for ref := range bseries { - scrape = append(scrape, refdSample{ - ref: uint64(ref), - t: ts, - v: 12345788, - }) - } - samples = append(samples, scrape) - } - - err = wal.Log(bseries, samples[0]) - require.NoError(b, err) - - for _, s := range samples[1:] { - err = wal.Log(nil, s) - require.NoError(b, err) - } - - require.NoError(b, wal.Close()) - - b.ResetTimer() - - _, err = OpenWAL(d, nil, 500*time.Millisecond) - require.NoError(b, err) - - // stat, _ := head.wal.f.Stat() - // fmt.Println("head block initialized from WAL") - // fmt.Println("read series", head.stats.SeriesCount, "read samples", head.stats.SampleCount, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) - }) -} - -func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { - dec := expfmt.NewDecoder(r, expfmt.FmtProtoText) - - var mets []model.Metric - fps := map[model.Fingerprint]struct{}{} - var mf dto.MetricFamily - var dups int - - for i := 0; i < n; { - if err := dec.Decode(&mf); err != nil { - if err == io.EOF { - break - } - return nil, err - } - - for _, m := range mf.GetMetric() { - met := make(model.Metric, len(m.GetLabel())+1) - met["__name__"] = model.LabelValue(mf.GetName()) - - for _, l := range m.GetLabel() { - met[model.LabelName(l.GetName())] = model.LabelValue(l.GetValue()) - } - if _, ok := fps[met.Fingerprint()]; ok { - dups++ - } else { - mets = append(mets, met) - fps[met.Fingerprint()] = struct{}{} - } - i++ - } + for _, f := range w.files { + require.NoError(t, f.Close()) } - lbls := make([]labels.Labels, 0, n) + // Make initialization fail by corrupting the header of one file. + f, err := os.OpenFile(w.files[3].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) - for _, m := range mets[:n] { - lset := make(labels.Labels, 0, len(m)) - for k, v := range m { - lset = append(lset, labels.Label{Name: string(k), Value: string(v)}) - } - lbls = append(lbls, lset) + _, err = f.WriteAt([]byte{0}, 4) + require.NoError(t, err) + + w = &WAL{dirFile: df} + require.Error(t, w.initSegments(), "init corrupted segments") + + for _, f := range w.files { + require.NoError(t, f.Close()) + } +} + +func TestWAL_cut(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test_wal_cut") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + // This calls cut() implicitly the first time without a previous tail. + w, err := OpenWAL(tmpdir, nil, 0) + require.NoError(t, err) + + require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!"))) + + require.NoError(t, w.cut(), "cut failed") + + // Cutting creates a new file and close the previous tail file. + require.Equal(t, 2, len(w.files)) + require.Equal(t, os.ErrInvalid.Error(), w.files[0].Close().Error()) + + require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!"))) + + require.NoError(t, w.Close()) + + for _, of := range w.files { + f, err := os.Open(of.Name()) + require.NoError(t, err) + + // Verify header data. + metab := make([]byte, 8) + _, err = f.Read(metab) + require.NoError(t, err, "read meta data %s", f.Name()) + require.Equal(t, WALMagic, binary.BigEndian.Uint32(metab[:4]), "verify magic") + require.Equal(t, WALFormatDefault, metab[4], "verify format flag") + + // We cannot actually check for correct pre-allocation as it is + // optional per filesystem and handled transparently. + et, flag, b, err := newWALDecoder(f, nil).entry() + require.NoError(t, err) + require.Equal(t, WALEntrySeries, et) + require.Equal(t, flag, byte(walSeriesSimple)) + require.Equal(t, []byte("Hello World!!"), b) } - - return lbls, nil }