From 1a5573b4ce832c47fbe9eba0512de2cf4edb5ae6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sun, 27 May 2018 13:05:11 -0400 Subject: [PATCH 1/4] Migrate write ahead log On startup, rewrite the old write ahead log into the new format once. Signed-off-by: Fabian Reinartz --- db.go | 4 +++ head.go | 2 ++ wal.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++ wal_test.go | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 193 insertions(+) diff --git a/db.go b/db.go index 1b508e0666..e6a0a74b4b 100644 --- a/db.go +++ b/db.go @@ -192,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := repairBadIndexVersion(l, dir); err != nil { return nil, err } + // Migrate old WAL. + if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { + return nil, errors.Wrap(err, "migrate WAL") + } db = &DB{ dir: dir, diff --git a/head.go b/head.go index 61457911f1..f52fea726b 100644 --- a/head.go +++ b/head.go @@ -392,6 +392,8 @@ func (h *Head) Init() error { if err == nil { return nil } + level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) + if err := h.wal.Repair(err); err != nil { return errors.Wrap(err, "repair corrupted WAL") } diff --git a/wal.go b/wal.go index c1b9f6b062..773b642825 100644 --- a/wal.go +++ b/wal.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) // WALEntryType indicates what data a WAL entry contains. @@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type WAL interface { Reader() WALReader LogSeries([]RefSeries) error @@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 { } // SegmentWAL is a write ahead log for series data. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type SegmentWAL struct { mtx sync.Mutex metrics *walMetrics @@ -1206,3 +1211,86 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { } return nil } + +// MigrateWAL rewrites the deprecated write ahead log into the new format. +func MigrateWAL(logger log.Logger, dir string) error { + // Detect whether we still have the old WAL. + fns, err := sequenceFiles(dir) + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "list sequence files") + } + if len(fns) == 0 { + return nil // No WAL at all yet. + } + // Check header of first segment. + f, err := os.Open(fns[0]) + if err != nil { + return errors.Wrap(err, "check first existing segment") + } + var hdr [4]byte + if n, err := f.Read(hdr[:]); err != nil { + return errors.Wrap(err, "read header from first segment") + } else if n != 4 { + return errors.New("could not read full header from segment") + } + if binary.BigEndian.Uint32(hdr[:]) != WALMagic { + return nil // Not the old WAL anymore. + } + + level.Info(logger).Log("msg", "migrating WAL format") + + tmpdir := dir + ".tmp" + if err := os.RemoveAll(tmpdir); err != nil { + return errors.Wrap(err, "cleanup replacement dir") + } + repl, err := wal.New(logger, nil, tmpdir) + if err != nil { + return errors.Wrap(err, "open new WAL") + } + w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) + if err != nil { + return errors.Wrap(err, "open old WAL") + } + rdr := w.Reader() + + var ( + enc RecordEncoder + b []byte + ) + decErr := rdr.Read( + func(s []RefSeries) { + if err != nil { + return + } + err = repl.Log(enc.Series(s, b[:0])) + }, + func(s []RefSample) { + if err != nil { + return + } + err = repl.Log(enc.Samples(s, b[:0])) + }, + func(s []Stone) { + if err != nil { + return + } + err = repl.Log(enc.Tombstones(s, b[:0])) + }, + ) + if decErr != nil { + return errors.Wrap(err, "decode old entries") + } + if err != nil { + return errors.Wrap(err, "write new entries") + } + if err := w.Close(); err != nil { + return errors.Wrap(err, "close old WAL") + } + if err := repl.Close(); err != nil { + return errors.Wrap(err, "close new WAL") + } + if err := fileutil.Rename(tmpdir, dir); err != nil { + return errors.Wrap(err, "replace old WAL") + } + return nil +} diff --git a/wal_test.go b/wal_test.go index 6d559f5d9d..680ebe06ab 100644 --- a/wal_test.go +++ b/wal_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "path" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func TestSegmentWAL_cut(t *testing.T) { @@ -431,3 +433,100 @@ func TestWALRestoreCorrupted(t *testing.T) { }) } } + +func TestMigrateWAL_Fuzz(t *testing.T) { + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Should pass if no WAL exists yet. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil) + testutil.Ok(t, err) + + // Write some data. + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 1, T: 100, V: 200}, + {Ref: 2, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 3, T: 100, V: 200}, + {Ref: 4, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogDeletes([]Stone{ + {ref: 1, intervals: []Interval{{100, 200}}}, + })) + + testutil.Ok(t, oldWAL.Close()) + + // Perform migration. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + + // We can properly write some new data after migration. + var enc RecordEncoder + testutil.Ok(t, w.Log(enc.Samples([]RefSample{ + {Ref: 500, T: 1, V: 1}, + }, nil))) + + testutil.Ok(t, w.Close()) + + // Read back all data. + sr, err := wal.NewSegmentsReader(wdir) + testutil.Ok(t, err) + + r := wal.NewReader(sr) + var res []interface{} + var dec RecordDecoder + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + s, err := dec.Series(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordSamples: + s, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordTombstones: + s, err := dec.Tombstones(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + default: + t.Fatalf("unknown record type %d", dec.Type(rec)) + } + } + testutil.Ok(t, r.Err()) + + testutil.Equals(t, []interface{}{ + []RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + }, + []RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, + []RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + }, + []RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, + []Stone{{ref: 1, intervals: []Interval{{100, 200}}}}, + []RefSample{{Ref: 500, T: 1, V: 1}}, + }, res) + + // Migrating an already migrated WAL shouldn't do anything. + testutil.Ok(t, MigrateWAL(nil, wdir)) +} From 92e1b209575e98c8fe8edc3a70ea437beaae2f22 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 5 Jun 2018 04:21:27 -0400 Subject: [PATCH 2/4] Fix close handling Signed-off-by: Fabian Reinartz --- wal.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/wal.go b/wal.go index 773b642825..0fcadb1fc1 100644 --- a/wal.go +++ b/wal.go @@ -1213,7 +1213,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { } // MigrateWAL rewrites the deprecated write ahead log into the new format. -func MigrateWAL(logger log.Logger, dir string) error { +func MigrateWAL(logger log.Logger, dir string) (err error) { // Detect whether we still have the old WAL. fns, err := sequenceFiles(dir) if err != nil && !os.IsNotExist(err) { @@ -1227,6 +1227,8 @@ func MigrateWAL(logger log.Logger, dir string) error { if err != nil { return errors.Wrap(err, "check first existing segment") } + defer f.Close() + var hdr [4]byte if n, err := f.Read(hdr[:]); err != nil { return errors.Wrap(err, "read header from first segment") @@ -1247,10 +1249,20 @@ func MigrateWAL(logger log.Logger, dir string) error { if err != nil { return errors.Wrap(err, "open new WAL") } + // We close it once already before as part of finalization. + // Do it once again in case of prior errors. + defer func() { + if err != nil { + repl.Close() + } + }() + w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) if err != nil { return errors.Wrap(err, "open old WAL") } + defer w.Close() + rdr := w.Reader() var ( @@ -1283,9 +1295,6 @@ func MigrateWAL(logger log.Logger, dir string) error { if err != nil { return errors.Wrap(err, "write new entries") } - if err := w.Close(); err != nil { - return errors.Wrap(err, "close old WAL") - } if err := repl.Close(); err != nil { return errors.Wrap(err, "close new WAL") } From 22fd3ef24e3bf9450fbeec574e46049d0b70e458 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 5 Jun 2018 04:50:20 -0400 Subject: [PATCH 3/4] Deal with zero-length segments Signed-off-by: Fabian Reinartz --- repair_test.go | 2 +- wal.go | 15 ++++++++++----- wal_test.go | 17 +++++++++++++++++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/repair_test.go b/repair_test.go index f4c9d20874..c80976002a 100644 --- a/repair_test.go +++ b/repair_test.go @@ -76,7 +76,7 @@ func TestRepairBadIndexVersion(t *testing.T) { } // On DB opening all blocks in the base dir should be repaired. - db, _ := Open("testdata/repair_index_version", nil, nil, nil) + db, err := Open("testdata/repair_index_version", nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/wal.go b/wal.go index 0fcadb1fc1..6685dbd063 100644 --- a/wal.go +++ b/wal.go @@ -1214,6 +1214,9 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { // MigrateWAL rewrites the deprecated write ahead log into the new format. func MigrateWAL(logger log.Logger, dir string) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } // Detect whether we still have the old WAL. fns, err := sequenceFiles(dir) if err != nil && !os.IsNotExist(err) { @@ -1222,7 +1225,8 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if len(fns) == 0 { return nil // No WAL at all yet. } - // Check header of first segment. + // Check header of first segment to see whether we are still dealing with an + // old WAL. f, err := os.Open(fns[0]) if err != nil { return errors.Wrap(err, "check first existing segment") @@ -1230,13 +1234,14 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { defer f.Close() var hdr [4]byte - if n, err := f.Read(hdr[:]); err != nil { + if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { return errors.Wrap(err, "read header from first segment") - } else if n != 4 { - return errors.New("could not read full header from segment") } + // If we cannot read the magic header for segments of the old WAL, abort. + // Either it's migrated already or there's a corruption issue with which + // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. if binary.BigEndian.Uint32(hdr[:]) != WALMagic { - return nil // Not the old WAL anymore. + return nil } level.Info(logger).Log("msg", "migrating WAL format") diff --git a/wal_test.go b/wal_test.go index 680ebe06ab..b16680a994 100644 --- a/wal_test.go +++ b/wal_test.go @@ -434,6 +434,23 @@ func TestWALRestoreCorrupted(t *testing.T) { } } +func TestMigrateWAL_Empty(t *testing.T) { + // The migration proecedure must properly deal with a zero-length segment, + // which is valid in the new format. + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Initialize empty WAL. + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + testutil.Ok(t, MigrateWAL(nil, wdir)) +} + func TestMigrateWAL_Fuzz(t *testing.T) { dir, err := ioutil.TempDir("", "walmigrate") testutil.Ok(t, err) From ee7ee059efb573a1c3f850a1c2f7a43556e98ea8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 2 Aug 2018 17:57:34 -0400 Subject: [PATCH 4/4] Fix doc comments Signed-off-by: Fabian Reinartz --- wal.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wal.go b/wal.go index 6685dbd063..26857a9dc0 100644 --- a/wal.go +++ b/wal.go @@ -84,7 +84,7 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. // -// DEPRECATED: use wal pkg combined with the record coders instead. +// DEPRECATED: use wal pkg combined with the record codex instead. type WAL interface { Reader() WALReader LogSeries([]RefSeries) error @@ -1254,7 +1254,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err != nil { return errors.Wrap(err, "open new WAL") } - // We close it once already before as part of finalization. + // It should've already been closed as part of the previous finalization. // Do it once again in case of prior errors. defer func() { if err != nil {