diff --git a/checkpoint.go b/checkpoint.go new file mode 100644 index 0000000000..2ab5f8d95c --- /dev/null +++ b/checkpoint.go @@ -0,0 +1,279 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/wal" +) + +// CheckpointStats returns stats about a created checkpoint. +type CheckpointStats struct { + DroppedSeries int + DroppedSamples int + DroppedTombstones int + TotalSeries int + TotalSamples int + TotalTombstones int +} + +// LastCheckpoint returns the directory name of the most recent checkpoint. +// If dir does not contain any checkpoints, ErrNotFound is returned. +func LastCheckpoint(dir string) (string, int, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return "", 0, err + } + // Traverse list backwards since there may be multiple checkpoints left. + for i := len(files) - 1; i >= 0; i-- { + fi := files[i] + + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + if !fi.IsDir() { + return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name()) + } + k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil { + continue + } + return fi.Name(), k, nil + } + return "", 0, ErrNotFound +} + +// DeleteCheckpoints deletes all checkpoints in dir that have an index +// below n. +func DeleteCheckpoints(dir string, n int) error { + var errs MultiError + + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + for _, fi := range files { + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil || k >= n { + continue + } + if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { + errs.Add(err) + } + } + return errs.Err() +} + +const checkpointPrefix = "checkpoint." + +// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL. +// It includes the most recent checkpoint if it exists. +// All series not satisfying keep and samples below mint are dropped. +// +// The checkpoint is stored in a directory named checkpoint.N in the same +// segmented format as the original WAL itself. +// This makes it easy to read it through the WAL package and concatenate +// it with the original WAL. +// +// Non-critical errors are logged and not returned. +func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { + if logger == nil { + logger = log.NewNopLogger() + } + stats := &CheckpointStats{} + + var sr io.Reader + { + lastFn, k, err := LastCheckpoint(w.Dir()) + if err != nil && err != ErrNotFound { + return nil, errors.Wrap(err, "find last checkpoint") + } + if err == nil { + if m > k+1 { + return nil, errors.New("unexpected gap to last checkpoint") + } + // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. + m = k + 1 + + last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn)) + if err != nil { + return nil, errors.Wrap(err, "open last checkpoint") + } + defer last.Close() + sr = last + } + + segs, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) + if err != nil { + return nil, errors.Wrap(err, "create segment reader") + } + defer segs.Close() + + if sr != nil { + sr = io.MultiReader(sr, segs) + } else { + sr = segs + } + } + + cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n)) + cpdirtmp := cpdir + ".tmp" + + if err := os.MkdirAll(cpdirtmp, 0777); err != nil { + return nil, errors.Wrap(err, "create checkpoint dir") + } + cp, err := wal.New(nil, nil, cpdirtmp) + if err != nil { + return nil, errors.Wrap(err, "open checkpoint") + } + + r := wal.NewReader(sr) + + var ( + series []RefSeries + samples []RefSample + tstones []Stone + dec RecordDecoder + enc RecordEncoder + buf []byte + recs [][]byte + ) + for r.Next() { + series, samples, tstones = series[:0], samples[:0], tstones[:0] + + // We don't reset the buffer since we batch up multiple records + // before writing them to the checkpoint. + // Remember where the record for this iteration starts. + start := len(buf) + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + if err != nil { + return nil, errors.Wrap(err, "decode series") + } + // Drop irrelevant series in place. + repl := series[:0] + for _, s := range series { + if keep(s.Ref) { + repl = append(repl, s) + } + } + if len(repl) > 0 { + buf = enc.Series(repl, buf) + } + stats.TotalSeries += len(series) + stats.DroppedSeries += len(series) - len(repl) + + case RecordSamples: + samples, err = dec.Samples(rec, samples) + if err != nil { + return nil, errors.Wrap(err, "decode samples") + } + // Drop irrelevant samples in place. + repl := samples[:0] + for _, s := range samples { + if s.T >= mint { + repl = append(repl, s) + } + } + if len(repl) > 0 { + buf = enc.Samples(repl, buf) + } + stats.TotalSamples += len(samples) + stats.DroppedSamples += len(samples) - len(repl) + + case RecordTombstones: + tstones, err = dec.Tombstones(rec, tstones) + if err != nil { + return nil, errors.Wrap(err, "decode deletes") + } + // Drop irrelevant tombstones in place. + repl := tstones[:0] + for _, s := range tstones { + for _, iv := range s.intervals { + if iv.Maxt >= mint { + repl = append(repl, s) + break + } + } + } + if len(repl) > 0 { + buf = enc.Tombstones(repl, buf) + } + stats.TotalTombstones += len(tstones) + stats.DroppedTombstones += len(tstones) - len(repl) + + default: + return nil, errors.New("invalid record type") + } + if len(buf[start:]) == 0 { + continue // All contents discarded. + } + recs = append(recs, buf[start:]) + + // Flush records in 1 MB increments. + if len(buf) > 1*1024*1024 { + if err := cp.Log(recs...); err != nil { + return nil, errors.Wrap(err, "flush records") + } + buf, recs = buf[:0], recs[:0] + } + } + // If we hit any corruption during checkpointing, repairing is not an option. + // The head won't know which series records are lost. + if r.Err() != nil { + return nil, errors.Wrap(r.Err(), "read segments") + } + + // Flush remaining records. + if err := cp.Log(recs...); err != nil { + return nil, errors.Wrap(err, "flush records") + } + if err := cp.Close(); err != nil { + return nil, errors.Wrap(err, "close checkpoint") + } + if err := fileutil.Rename(cpdirtmp, cpdir); err != nil { + return nil, errors.Wrap(err, "rename checkpoint file") + } + if err := w.Truncate(n + 1); err != nil { + // If truncating fails, we'll just try again at the next checkpoint. + // Leftover segments will just be ignored in the future if there's a checkpoint + // that supersedes them. + level.Error(logger).Log("msg", "truncating segments failed", "err", err) + } + if err := DeleteCheckpoints(w.Dir(), n); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher checkpoint exists. + level.Error(logger).Log("msg", "delete old checkpoints", "err", err) + } + return stats, nil +} diff --git a/checkpoint_test.go b/checkpoint_test.go new file mode 100644 index 0000000000..074bb46e8c --- /dev/null +++ b/checkpoint_test.go @@ -0,0 +1,182 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" +) + +func TestLastCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s, k, err := LastCheckpoint(dir) + testutil.Equals(t, ErrNotFound, err) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, 0, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, 0, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.1", s) + testutil.Equals(t, 1, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.1000", s) + testutil.Equals(t, 1000, k) +} + +func TestDeleteCheckpoints(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + testutil.Ok(t, DeleteCheckpoints(dir, 0)) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.01"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.02"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.03"), 0777)) + + testutil.Ok(t, DeleteCheckpoints(dir, 2)) + + files, err := fileutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, []string{"checkpoint.02", "checkpoint.03"}, files) +} + +func TestCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + fmt.Println(dir) + + var enc RecordEncoder + // Create a dummy segment to bump the initial number. + seg, err := wal.CreateSegment(dir, 100) + testutil.Ok(t, err) + testutil.Ok(t, seg.Close()) + + // Manually create checkpoint for 99 and earlier. + w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099")) + testutil.Ok(t, err) + + // Add some data we expect to be around later. + err = w.Log(enc.Series([]RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, + }, nil)) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + // Start a WAL and write records to it as usual. + w, err = wal.NewSize(nil, nil, dir, 64*1024) + testutil.Ok(t, err) + + var last int64 + for i := 0; ; i++ { + _, n, err := w.Segments() + testutil.Ok(t, err) + if n >= 106 { + break + } + // Write some series initially. + if i == 0 { + b := enc.Series([]RefSeries{ + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, + }, nil) + testutil.Ok(t, w.Log(b)) + } + // Write samples until the WAL has enough segments. + // Make them have drifting timestamps within a record to see that they + // get filtered properly. + b := enc.Samples([]RefSample{ + {Ref: 0, T: last, V: float64(i)}, + {Ref: 1, T: last + 10000, V: float64(i)}, + {Ref: 2, T: last + 20000, V: float64(i)}, + {Ref: 3, T: last + 30000, V: float64(i)}, + }, nil) + testutil.Ok(t, w.Log(b)) + + last += 100 + } + testutil.Ok(t, w.Close()) + + stats, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { + return x%2 == 0 + }, last/2) + testutil.Ok(t, err) + testutil.Equals(t, 106, stats.HighSegment) + + // Only the new checkpoint should be left. + files, err := fileutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, 1, len(files)) + testutil.Equals(t, "checkpoint.000106", files[0]) + + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) + testutil.Ok(t, err) + defer sr.Close() + + var dec RecordDecoder + var series []RefSeries + r := wal.NewReader(sr) + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + testutil.Ok(t, err) + case RecordSamples: + samples, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + for _, s := range samples { + testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp") + } + } + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, []RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + }, series) +} diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index c2c25842ad..397858958b 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -6,6 +6,7 @@ package fileutil import ( "os" + "path/filepath" "sort" ) @@ -23,3 +24,25 @@ func ReadDir(dirpath string) ([]string, error) { sort.Strings(names) return names, nil } + +// Rename safely renames a file. +func Rename(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} diff --git a/record.go b/record.go new file mode 100644 index 0000000000..c8cc7a5043 --- /dev/null +++ b/record.go @@ -0,0 +1,213 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "math" + "sort" + + "github.com/pkg/errors" + "github.com/prometheus/tsdb/labels" +) + +// RecordType represents the data type of a record. +type RecordType uint8 + +const ( + RecordInvalid RecordType = 255 + RecordSeries RecordType = 1 + RecordSamples RecordType = 2 + RecordTombstones RecordType = 3 +) + +type RecordLogger interface { + Log(recs ...[]byte) error +} + +type RecordReader interface { + Next() bool + Err() error + Record() []byte +} + +// RecordDecoder decodes series, sample, and tombstone records. +// The zero value is ready to use. +type RecordDecoder struct { +} + +// Type returns the type of the record. +// Return RecordInvalid if no valid record type is found. +func (d *RecordDecoder) Type(rec []byte) RecordType { + if len(rec) < 1 { + return RecordInvalid + } + switch t := RecordType(rec[0]); t { + case RecordSeries, RecordSamples, RecordTombstones: + return t + } + return RecordInvalid +} + +// Series appends series in rec to the given slice. +func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordSeries { + return nil, errors.New("invalid record type") + } + for len(dec.b) > 0 && dec.err() == nil { + ref := dec.be64() + + lset := make(labels.Labels, dec.uvarint()) + + for i := range lset { + lset[i].Name = dec.uvarintStr() + lset[i].Value = dec.uvarintStr() + } + sort.Sort(lset) + + series = append(series, RefSeries{ + Ref: ref, + Labels: lset, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return series, nil +} + +// Samples appends samples in rec to the given slice. +func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordSamples { + return nil, errors.New("invalid record type") + } + if dec.len() == 0 { + return samples, nil + } + var ( + baseRef = dec.be64() + baseTime = dec.be64int64() + ) + for len(dec.b) > 0 && dec.err() == nil { + dref := dec.varint64() + dtime := dec.varint64() + val := dec.be64() + + samples = append(samples, RefSample{ + Ref: uint64(int64(baseRef) + dref), + T: baseTime + dtime, + V: math.Float64frombits(val), + }) + } + + if dec.err() != nil { + return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return samples, nil +} + +// Tombstones appends tombstones in rec to the given slice. +func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordTombstones { + return nil, errors.New("invalid record type") + } + for dec.len() > 0 && dec.err() == nil { + tstones = append(tstones, Stone{ + ref: dec.be64(), + intervals: Intervals{ + {Mint: dec.varint64(), Maxt: dec.varint64()}, + }, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return tstones, nil +} + +// RecordEncoder encodes series, sample, and tombstones records. +// The zero value is ready to use. +type RecordEncoder struct { +} + +// Series appends the encoded series to b and returns the resulting slice. +func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordSeries)) + + for _, s := range series { + buf.putBE64(s.Ref) + buf.putUvarint(len(s.Labels)) + + for _, l := range s.Labels { + buf.putUvarintStr(l.Name) + buf.putUvarintStr(l.Value) + } + } + return buf.get() +} + +// Samples appends the encoded samples to b and returns the resulting slice. +func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordSamples)) + + if len(samples) == 0 { + return buf.get() + } + + // Store base timestamp and base reference number of first sample. + // All samples encode their timestamp and ref as delta to those. + first := samples[0] + + buf.putBE64(first.Ref) + buf.putBE64int64(first.T) + + for _, s := range samples { + buf.putVarint64(int64(s.Ref) - int64(first.Ref)) + buf.putVarint64(s.T - first.T) + buf.putBE64(math.Float64bits(s.V)) + } + return buf.get() +} + +// Tombstones appends the encoded tombstones to b and returns the resulting slice. +func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordTombstones)) + + for _, s := range tstones { + for _, iv := range s.intervals { + buf.putBE64(s.ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) + } + } + return buf.get() +} diff --git a/record_test.go b/record_test.go new file mode 100644 index 0000000000..4257fc0c52 --- /dev/null +++ b/record_test.go @@ -0,0 +1,73 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "testing" + + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/testutil" +) + +func TestRecord_EncodeDecode(t *testing.T) { + var enc RecordEncoder + var dec RecordDecoder + + series := []RefSeries{ + { + Ref: 100, + Labels: labels.FromStrings("abc", "def", "123", "456"), + }, { + Ref: 1, + Labels: labels.FromStrings("abc", "def2", "1234", "4567"), + }, { + Ref: 435245, + Labels: labels.FromStrings("xyz", "def", "foo", "bar"), + }, + } + decSeries, err := dec.Series(enc.Series(series, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, series, decSeries) + + samples := []RefSample{ + {Ref: 0, T: 12423423, V: 1.2345}, + {Ref: 123, T: -1231, V: -123}, + {Ref: 2, T: 0, V: 99999}, + } + decSamples, err := dec.Samples(enc.Samples(samples, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, samples, decSamples) + + // Intervals get split up into single entries. So we don't get back exactly + // what we put in. + tstones := []Stone{ + {ref: 123, intervals: Intervals{ + {Mint: -1000, Maxt: 1231231}, + {Mint: 5000, Maxt: 0}, + }}, + {ref: 13, intervals: Intervals{ + {Mint: -1000, Maxt: -11}, + {Mint: 5000, Maxt: 1000}, + }}, + } + decTstones, err := dec.Tombstones(enc.Tombstones(tstones, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, []Stone{ + {ref: 123, intervals: Intervals{{Mint: -1000, Maxt: 1231231}}}, + {ref: 123, intervals: Intervals{{Mint: 5000, Maxt: 0}}}, + {ref: 13, intervals: Intervals{{Mint: -1000, Maxt: -11}}}, + {ref: 13, intervals: Intervals{{Mint: 5000, Maxt: 1000}}}, + }, decTstones) +}