diff --git a/Documentation/format/tombstones.md b/Documentation/format/tombstones.md new file mode 100644 index 0000000000..2af0ac98c5 --- /dev/null +++ b/Documentation/format/tombstones.md @@ -0,0 +1,31 @@ +# Tombstones Disk Format + +The following describes the format of a tombstones file, which is placed +at the top level directory of a block. + +The last 8 bytes specifies the offset to the start of Stones section. +The stones section is 0 padded to a multiple of 4 for fast scans. + +``` +┌────────────────────────────┬─────────────────────┐ +│ magic(0x130BA30) <4b> │ version(1) <1 byte> │ +├────────────────────────────┴─────────────────────┤ +│ ┌──────────────────────────────────────────────┐ │ +│ │ Tombstone 1 │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Tombstone N │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ CRC<4b> │ │ +│ └──────────────────────────────────────────────┘ │ +└──────────────────────────────────────────────────┘ +``` + +# Tombstone + +``` +┌─────────────┬───────────────┬──────────────┐ +│ref │ mint │ maxt │ +└─────────────┴───────────────┴──────────────┘ +``` diff --git a/block.go b/block.go index 6c1e555f39..1351efae7b 100644 --- a/block.go +++ b/block.go @@ -21,6 +21,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/tsdb/labels" ) // DiskBlock handles reads against a Block of time series data. @@ -37,6 +38,12 @@ type DiskBlock interface { // Chunks returns a ChunkReader over the block's data. Chunks() ChunkReader + // Tombstones returns a TombstoneReader over the block's deleted data. + Tombstones() TombstoneReader + + // Delete deletes data from the block. + Delete(mint, maxt int64, ms ...labels.Matcher) error + // Close releases all underlying resources of the block. Close() error } @@ -79,9 +86,10 @@ type BlockMeta struct { // Stats about the contents of the block. Stats struct { - NumSamples uint64 `json:"numSamples,omitempty"` - NumSeries uint64 `json:"numSeries,omitempty"` - NumChunks uint64 `json:"numChunks,omitempty"` + NumSamples uint64 `json:"numSamples,omitempty"` + NumSeries uint64 `json:"numSeries,omitempty"` + NumChunks uint64 `json:"numChunks,omitempty"` + NumTombstones uint64 `json:"numTombstones,omitempty"` } `json:"stats,omitempty"` // Information on compactions the block was created from. @@ -150,6 +158,8 @@ type persistedBlock struct { chunkr *chunkReader indexr *indexReader + + tombstones tombstoneReader } func newPersistedBlock(dir string) (*persistedBlock, error) { @@ -167,11 +177,17 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, err } + tr, err := readTombstones(dir) + if err != nil { + return nil, err + } + pb := &persistedBlock{ - dir: dir, - meta: *meta, - chunkr: cr, - indexr: ir, + dir: dir, + meta: *meta, + chunkr: cr, + indexr: ir, + tombstones: tr, } return pb, nil } @@ -191,21 +207,85 @@ func (pb *persistedBlock) String() string { func (pb *persistedBlock) Querier(mint, maxt int64) Querier { return &blockQuerier{ - mint: mint, - maxt: maxt, - index: pb.Index(), - chunks: pb.Chunks(), + mint: mint, + maxt: maxt, + index: pb.Index(), + chunks: pb.Chunks(), + tombstones: pb.Tombstones(), } } func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } -func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } +func (pb *persistedBlock) Tombstones() TombstoneReader { + return pb.tombstones +} +func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } + +func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { + pr := newPostingsReader(pb.indexr) + p, absent := pr.Select(ms...) + + ir := pb.indexr + + // Choose only valid postings which have chunks in the time-range. + stones := map[uint32]intervals{} + +Outer: + for p.Next() { + lset, chunks, err := ir.Series(p.At()) + if err != nil { + return err + } + + for _, abs := range absent { + if lset.Get(abs) != "" { + continue Outer + } + } + + for _, chk := range chunks { + if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { + // Delete only until the current vlaues and not beyond. + tmin, tmax := clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime) + stones[p.At()] = intervals{{tmin, tmax}} + continue Outer + } + } + } + + if p.Err() != nil { + return p.Err() + } + + // Merge the current and new tombstones. + for k, v := range stones { + pb.tombstones.add(k, v[0]) + } + + if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { + return err + } + + pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones)) + return writeMetaFile(pb.dir, &pb.meta) +} func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } func walDir(dir string) string { return filepath.Join(dir, "wal") } +func clampInterval(a, b, mint, maxt int64) (int64, int64) { + if a < mint { + a = mint + } + if b > maxt { + b = maxt + } + + return a, b +} + type mmapFile struct { f *os.File b []byte diff --git a/block_test.go b/block_test.go new file mode 100644 index 0000000000..e75d4ac3f2 --- /dev/null +++ b/block_test.go @@ -0,0 +1,14 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb diff --git a/chunks.go b/chunks.go index 77663359cf..075384cd59 100644 --- a/chunks.go +++ b/chunks.go @@ -54,6 +54,46 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error { return nil } +// deletedIterator wraps an Iterator and makes sure any deleted metrics are not +// returned. +type deletedIterator struct { + it chunks.Iterator + + intervals intervals +} + +func (it *deletedIterator) At() (int64, float64) { + return it.it.At() +} + +func (it *deletedIterator) Next() bool { +Outer: + for it.it.Next() { + ts, _ := it.it.At() + + for _, tr := range it.intervals { + if tr.inBounds(ts) { + continue Outer + } + + if ts > tr.maxt { + it.intervals = it.intervals[1:] + continue + } + + return true + } + + return true + } + + return false +} + +func (it *deletedIterator) Err() error { + return it.it.Err() +} + // ChunkWriter serializes a time block of chunked series data. type ChunkWriter interface { // WriteChunks writes several chunks. The Chunk field of the ChunkMetas diff --git a/chunks_test.go b/chunks_test.go index ae9d698768..2a722d8fd1 100644 --- a/chunks_test.go +++ b/chunks_test.go @@ -14,8 +14,12 @@ package tsdb import ( + "math/rand" + "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/chunks" + "github.com/stretchr/testify/require" ) type mockChunkReader map[uint64]chunks.Chunk @@ -32,3 +36,63 @@ func (cr mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { func (cr mockChunkReader) Close() error { return nil } + +func TestDeletedIterator(t *testing.T) { + chk := chunks.NewXORChunk() + app, err := chk.Appender() + require.NoError(t, err) + // Insert random stuff from (0, 1000). + act := make([]sample, 1000) + for i := 0; i < 1000; i++ { + act[i].t = int64(i) + act[i].v = rand.Float64() + app.Append(act[i].t, act[i].v) + } + + cases := []struct { + r intervals + }{ + {r: intervals{{1, 20}}}, + {r: intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}}, + {r: intervals{{1, 10}, {12, 20}, {20, 30}}}, + {r: intervals{{1, 10}, {12, 23}, {25, 30}}}, + {r: intervals{{1, 23}, {12, 20}, {25, 30}}}, + {r: intervals{{1, 23}, {12, 20}, {25, 3000}}}, + {r: intervals{{0, 2000}}}, + {r: intervals{{500, 2000}}}, + {r: intervals{{0, 200}}}, + {r: intervals{{1000, 20000}}}, + } + + for _, c := range cases { + i := int64(-1) + it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]} + ranges := c.r[:] + for it.Next() { + i++ + for _, tr := range ranges { + if tr.inBounds(i) { + i = tr.maxt + 1 + ranges = ranges[1:] + } + } + + require.True(t, i < 1000) + + ts, v := it.At() + require.Equal(t, act[i].t, ts) + require.Equal(t, act[i].v, v) + } + // There has been an extra call to Next(). + i++ + for _, tr := range ranges { + if tr.inBounds(i) { + i = tr.maxt + 1 + ranges = ranges[1:] + } + } + + require.False(t, i < 1000) + require.NoError(t, it.Err()) + } +} diff --git a/compact.go b/compact.go index d3ca6edfa6..ee54bc9ecf 100644 --- a/compact.go +++ b/compact.go @@ -26,6 +26,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" ) @@ -262,6 +263,11 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { return errors.Wrap(err, "close index writer") } + // Create an empty tombstones file. + if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { + return errors.Wrap(err, "write new tombstones file") + } + // Block successfully written, make visible and remove old ones. if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") @@ -293,7 +299,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri if err != nil { return nil, err } - s := newCompactionSeriesSet(b.Index(), b.Chunks(), all) + s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all) if i == 0 { set = s @@ -314,14 +320,36 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri ) for set.Next() { - lset, chunks := set.At() - if err := chunkw.WriteChunks(chunks...); err != nil { + lset, chks, dranges := set.At() // The chunks here are not fully deleted. + + if len(dranges) > 0 { + // Re-encode the chunk to not have deleted values. + for _, chk := range chks { + if intervalOverlap(dranges[0].mint, dranges[len(dranges)-1].maxt, chk.MinTime, chk.MaxTime) { + newChunk := chunks.NewXORChunk() + app, err := newChunk.Appender() + if err != nil { + return nil, err + } + + it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} + for it.Next() { + ts, v := it.At() + app.Append(ts, v) + } + + chk.Chunk = newChunk + } + } + } + + if err := chunkw.WriteChunks(chks...); err != nil { return nil, err } - indexw.AddSeries(i, lset, chunks...) + indexw.AddSeries(i, lset, chks...) - meta.Stats.NumChunks += uint64(len(chunks)) + meta.Stats.NumChunks += uint64(len(chks)) meta.Stats.NumSeries++ for _, l := range lset { @@ -371,25 +399,28 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri type compactionSet interface { Next() bool - At() (labels.Labels, []*ChunkMeta) + At() (labels.Labels, []*ChunkMeta, intervals) Err() error } type compactionSeriesSet struct { - p Postings - index IndexReader - chunks ChunkReader + p Postings + index IndexReader + chunks ChunkReader + tombstones TombstoneReader - l labels.Labels - c []*ChunkMeta - err error + l labels.Labels + c []*ChunkMeta + intervals intervals + err error } -func newCompactionSeriesSet(i IndexReader, c ChunkReader, p Postings) *compactionSeriesSet { +func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p Postings) *compactionSeriesSet { return &compactionSeriesSet{ - index: i, - chunks: c, - p: p, + index: i, + chunks: c, + tombstones: t, + p: p, } } @@ -398,10 +429,25 @@ func (c *compactionSeriesSet) Next() bool { return false } + c.intervals = c.tombstones.Get(c.p.At()) + c.l, c.c, c.err = c.index.Series(c.p.At()) if c.err != nil { return false } + + // Remove completely deleted chunks. + if len(c.intervals) > 0 { + chks := make([]*ChunkMeta, 0, len(c.c)) + for _, chk := range c.c { + if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { + chks = append(chks, chk) + } + } + + c.c = chks + } + for _, chk := range c.c { chk.Chunk, c.err = c.chunks.Chunk(chk.Ref) if c.err != nil { @@ -419,16 +465,17 @@ func (c *compactionSeriesSet) Err() error { return c.p.Err() } -func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta) { - return c.l, c.c +func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) { + return c.l, c.c, c.intervals } type compactionMerger struct { a, b compactionSet - aok, bok bool - l labels.Labels - c []*ChunkMeta + aok, bok bool + l labels.Labels + c []*ChunkMeta + intervals intervals } type compactionSeries struct { @@ -456,8 +503,8 @@ func (c *compactionMerger) compare() int { if !c.bok { return -1 } - a, _ := c.a.At() - b, _ := c.b.At() + a, _, _ := c.a.At() + b, _, _ := c.b.At() return labels.Compare(a, b) } @@ -469,17 +516,21 @@ func (c *compactionMerger) Next() bool { d := c.compare() // Both sets contain the current series. Chain them into a single one. if d > 0 { - c.l, c.c = c.b.At() + c.l, c.c, c.intervals = c.b.At() c.bok = c.b.Next() } else if d < 0 { - c.l, c.c = c.a.At() + c.l, c.c, c.intervals = c.a.At() c.aok = c.a.Next() } else { - l, ca := c.a.At() - _, cb := c.b.At() + l, ca, ra := c.a.At() + _, cb, rb := c.b.At() + for _, r := range rb { + ra = ra.add(r) + } c.l = l c.c = append(ca, cb...) + c.intervals = ra c.aok = c.a.Next() c.bok = c.b.Next() @@ -494,8 +545,8 @@ func (c *compactionMerger) Err() error { return c.b.Err() } -func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta) { - return c.l, c.c +func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta, intervals) { + return c.l, c.c, c.intervals } func renameFile(from, to string) error { diff --git a/db.go b/db.go index 74b834b0cc..d50ff842c8 100644 --- a/db.go +++ b/db.go @@ -119,6 +119,9 @@ type DB struct { compactc chan struct{} donec chan struct{} stopc chan struct{} + + // cmtx is used to control compactions and deletions. + cmtx sync.Mutex } type dbMetrics struct { @@ -296,6 +299,9 @@ func (db *DB) retentionCutoff() (bool, error) { } func (db *DB) compact() (changes bool, err error) { + db.cmtx.Lock() + defer db.cmtx.Unlock() + db.headmtx.RLock() // Check whether we have pending head blocks that are ready to be persisted. @@ -461,6 +467,7 @@ func (db *DB) reloadBlocks() (err error) { if err := validateBlockSequence(blocks); err != nil { return errors.Wrap(err, "invalid block sequence") } + // Close all opened blocks that no longer exist after we returned all locks. for _, b := range db.blocks { if _, ok := exist[b.Meta().ULID]; !ok { @@ -707,6 +714,30 @@ func (a *dbAppender) Rollback() error { return g.Wait() } +// Delete implements deletion of metrics. +func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { + db.cmtx.Lock() + defer db.cmtx.Unlock() + db.mtx.Lock() + defer db.mtx.Unlock() + + blocks := db.blocksForInterval(mint, maxt) + + var g errgroup.Group + + for _, b := range blocks { + g.Go(func(b Block) func() error { + return func() error { return b.Delete(mint, maxt, ms...) } + }(b)) + } + + if err := g.Wait(); err != nil { + return err + } + + return nil +} + // appendable returns a copy of a slice of HeadBlocks that can still be appended to. func (db *DB) appendable() (r []headBlock) { switch len(db.heads) { @@ -720,13 +751,8 @@ func (db *DB) appendable() (r []headBlock) { } func intervalOverlap(amin, amax, bmin, bmax int64) bool { - if bmin >= amin && bmin <= amax { - return true - } - if amin >= bmin && amin <= bmax { - return true - } - return false + // Checks Overlap: http://stackoverflow.com/questions/3269434/ + return amin <= bmax && bmin <= amax } func intervalContains(min, max, t int64) bool { diff --git a/db_test.go b/db_test.go index 1fa4d72706..e410dfceb7 100644 --- a/db_test.go +++ b/db_test.go @@ -15,6 +15,7 @@ package tsdb import ( "io/ioutil" + "math/rand" "os" "testing" @@ -141,3 +142,77 @@ func TestDBAppenderAddRef(t *testing.T) { err = app2.AddFast(string(refb), 1, 1) require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) } + +func TestDeleteSimple(t *testing.T) { + numSamples := int64(10) + + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + require.NoError(t, err) + app := db.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + require.NoError(t, app.Commit()) + cases := []struct { + intervals intervals + remaint []int64 + }{ + { + intervals: intervals{{1, 3}, {4, 7}}, + remaint: []int64{0, 8, 9}, + }, + } + +Outer: + for _, c := range cases { + // TODO(gouthamve): Reset the tombstones somehow. + // Delete the ranges. + for _, r := range c.intervals { + require.NoError(t, db.Delete(r.mint, r.maxt, labels.NewEqualMatcher("a", "b"))) + } + + // Compare the result. + q := db.Querier(0, numSamples) + res := q.Select(labels.NewEqualMatcher("a", "b")) + + expSamples := make([]sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamples = append(expSamples, sample{ts, smpls[ts]}) + } + + expss := newListSeriesSet([]Series{ + newSeries(map[string]string{"a": "b"}, expSamples), + }) + + if len(expSamples) == 0 { + require.False(t, res.Next()) + continue + } + + for { + eok, rok := expss.Next(), res.Next() + require.Equal(t, eok, rok, "next") + + if !eok { + continue Outer + } + sexp := expss.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } +} diff --git a/encoding_helpers.go b/encoding_helpers.go index 91f73a54c7..25ff32d00b 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -22,6 +22,7 @@ func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } +func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } @@ -71,8 +72,10 @@ type decbuf struct { e error } -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } +func (d *decbuf) be64int64() int64 { return int64(d.be64()) } func (d *decbuf) uvarintStr() string { l := d.uvarint64() @@ -140,6 +143,19 @@ func (d *decbuf) be32() uint32 { return x } +func (d *decbuf) byte() byte { + if d.e != nil { + return 0 + } + if len(d.b) < 1 { + d.e = errInvalidSize + return 0 + } + x := d.b[0] + d.b = d.b[1:] + return x +} + func (d *decbuf) decbuf(l int) decbuf { if d.e != nil { return decbuf{e: d.e} diff --git a/head.go b/head.go index f30a934a52..7f93d1658a 100644 --- a/head.go +++ b/head.go @@ -69,6 +69,8 @@ type HeadBlock struct { values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms + tombstones tombstoneReader + meta BlockMeta } @@ -97,6 +99,7 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) { }); err != nil { return "", err } + return dir, renameFile(tmp, dir) } @@ -108,13 +111,14 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { } h := &HeadBlock{ - dir: dir, - wal: wal, - series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. - hashes: map[uint64][]*memSeries{}, - values: map[string]stringset{}, - postings: &memPostings{m: make(map[term][]uint32)}, - meta: *meta, + dir: dir, + wal: wal, + series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. + hashes: map[uint64][]*memSeries{}, + values: map[string]stringset{}, + postings: &memPostings{m: make(map[term][]uint32)}, + meta: *meta, + tombstones: newEmptyTombstoneReader(), } return h, h.init() } @@ -122,16 +126,19 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { func (h *HeadBlock) init() error { r := h.wal.Reader() - for r.Next() { - series, samples := r.At() - + seriesFunc := func(series []labels.Labels) error { for _, lset := range series { h.create(lset.Hash(), lset) h.meta.Stats.NumSeries++ } + + return nil + } + samplesFunc := func(samples []RefSample) error { for _, s := range samples { if int(s.Ref) >= len(h.series) { - return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) + return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", + s.Ref, len(h.series)) } h.series[s.Ref].append(s.T, s.V) @@ -140,8 +147,24 @@ func (h *HeadBlock) init() error { } h.meta.Stats.NumSamples++ } + + return nil } - return errors.Wrap(r.Err(), "consume WAL") + deletesFunc := func(stones []Stone) error { + for _, s := range stones { + for _, itv := range s.intervals { + h.tombstones.add(s.ref, itv) + } + } + + return nil + } + + if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { + return errors.Wrap(err, "consume WAL") + } + + return nil } // inBounds returns true if the given timestamp is within the valid @@ -195,6 +218,50 @@ func (h *HeadBlock) Meta() BlockMeta { return m } +// Tombstones returns the TombstoneReader against the block. +func (h *HeadBlock) Tombstones() TombstoneReader { + return h.tombstones +} + +// Delete implements headBlock. +func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error { + ir := h.Index() + + pr := newPostingsReader(ir) + p, absent := pr.Select(ms...) + + var stones []Stone + +Outer: + for p.Next() { + ref := p.At() + lset := h.series[ref].lset + for _, abs := range absent { + if lset.Get(abs) != "" { + continue Outer + } + } + + // Delete only until the current values and not beyond. + tmin, tmax := clampInterval(mint, maxt, h.series[ref].chunks[0].minTime, h.series[ref].head().maxTime) + stones = append(stones, Stone{ref, intervals{{tmin, tmax}}}) + } + + if p.Err() != nil { + return p.Err() + } + if err := h.wal.LogDeletes(stones); err != nil { + return err + } + + for _, s := range stones { + h.tombstones.add(s.ref, s.intervals[0]) + } + + h.meta.Stats.NumTombstones = uint64(len(h.tombstones)) + return nil +} + // Dir returns the directory of the block. func (h *HeadBlock) Dir() string { return h.dir } @@ -217,10 +284,12 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { series := h.series[:] return &blockQuerier{ - mint: mint, - maxt: maxt, - index: h.Index(), - chunks: h.Chunks(), + mint: mint, + maxt: maxt, + index: h.Index(), + chunks: h.Chunks(), + tombstones: h.Tombstones(), + postingsMapper: func(p Postings) Postings { ep := make([]uint32, 0, 64) @@ -423,6 +492,7 @@ func (a *headAppender) createSeries() { func (a *headAppender) Commit() error { defer atomic.AddUint64(&a.activeWriters, ^uint64(0)) defer putHeadAppendBuffer(a.samples) + defer a.mtx.RUnlock() a.createSeries() @@ -436,9 +506,11 @@ func (a *headAppender) Commit() error { // Write all new series and samples to the WAL and add it to the // in-mem database on success. - if err := a.wal.Log(a.newLabels, a.samples); err != nil { - a.mtx.RUnlock() - return err + if err := a.wal.LogSeries(a.newLabels); err != nil { + return errors.Wrap(err, "WAL log series") + } + if err := a.wal.LogSamples(a.samples); err != nil { + return errors.Wrap(err, "WAL log samples") } total := uint64(len(a.samples)) @@ -449,8 +521,6 @@ func (a *headAppender) Commit() error { } } - a.mtx.RUnlock() - atomic.AddUint64(&a.meta.Stats.NumSamples, total) atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries))) @@ -538,6 +608,7 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error if int(ref) >= len(h.series) { return nil, nil, ErrNotFound } + s := h.series[ref] if s == nil { return nil, nil, ErrNotFound diff --git a/head_test.go b/head_test.go index a6df150794..0463d84306 100644 --- a/head_test.go +++ b/head_test.go @@ -36,6 +36,10 @@ func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock dir, err := TouchHeadBlock(dir, mint, maxt) require.NoError(t, err) + return openTestHeadBlock(t, dir) +} + +func openTestHeadBlock(t testing.TB, dir string) *HeadBlock { wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) require.NoError(t, err) @@ -378,6 +382,323 @@ func TestHeadBlock_e2e(t *testing.T) { return } +func TestHBDeleteSimple(t *testing.T) { + numSamples := int64(10) + + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + + hb := createTestHeadBlock(t, dir, 0, numSamples) + app := hb.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + require.NoError(t, app.Commit()) + cases := []struct { + intervals intervals + remaint []int64 + }{ + { + intervals: intervals{{0, 3}}, + remaint: []int64{4, 5, 6, 7, 8, 9}, + }, + { + intervals: intervals{{1, 3}}, + remaint: []int64{0, 4, 5, 6, 7, 8, 9}, + }, + { + intervals: intervals{{1, 3}, {4, 7}}, + remaint: []int64{0, 8, 9}, + }, + { + intervals: intervals{{1, 3}, {4, 700}}, + remaint: []int64{0}, + }, + { + intervals: intervals{{0, 9}}, + remaint: []int64{}, + }, + } + +Outer: + for _, c := range cases { + // Reset the tombstones. + hb.tombstones = newEmptyTombstoneReader() + + // Delete the ranges. + for _, r := range c.intervals { + require.NoError(t, hb.Delete(r.mint, r.maxt, labels.NewEqualMatcher("a", "b"))) + } + + // Compare the result. + q := hb.Querier(0, numSamples) + res := q.Select(labels.NewEqualMatcher("a", "b")) + + expSamples := make([]sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamples = append(expSamples, sample{ts, smpls[ts]}) + } + + expss := newListSeriesSet([]Series{ + newSeries(map[string]string{"a": "b"}, expSamples), + }) + + if len(expSamples) == 0 { + require.False(t, res.Next()) + continue + } + + for { + eok, rok := expss.Next(), res.Next() + require.Equal(t, eok, rok, "next") + + if !eok { + continue Outer + } + sexp := expss.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } +} + +func TestDeleteUntilCurMax(t *testing.T) { + numSamples := int64(10) + + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + + hb := createTestHeadBlock(t, dir, 0, 2*numSamples) + app := hb.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + require.NoError(t, app.Commit()) + require.NoError(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) + app = hb.Appender() + _, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + q := hb.Querier(0, 100000) + res := q.Select(labels.NewEqualMatcher("a", "b")) + + require.True(t, res.Next()) + exps := res.At() + it := exps.Iterator() + ressmpls, err := expandSeriesIterator(it) + require.NoError(t, err) + require.Equal(t, []sample{{11, 1}}, ressmpls) +} + +func TestDelete_e2e(t *testing.T) { + numDatapoints := 1000 + numRanges := 1000 + timeInterval := int64(2) + maxTime := int64(2 * 1000) + minTime := int64(200) + // Create 8 series with 1000 data-points of different ranges, delete and run queries. + lbls := [][]labels.Label{ + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + } + + seriesMap := map[string][]sample{} + for _, l := range lbls { + seriesMap[labels.New(l...).String()] = []sample{} + } + + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + + hb := createTestHeadBlock(t, dir, minTime, maxTime) + app := hb.Appender() + + for _, l := range lbls { + ls := labels.New(l...) + series := []sample{} + + ts := rand.Int63n(300) + for i := 0; i < numDatapoints; i++ { + v := rand.Float64() + if ts >= minTime && ts <= maxTime { + series = append(series, sample{ts, v}) + } + + _, err := app.Add(ls, ts, v) + if ts >= minTime && ts <= maxTime { + require.NoError(t, err) + } else { + require.Error(t, ErrOutOfBounds, err) + } + + ts += rand.Int63n(timeInterval) + 1 + } + + seriesMap[labels.New(l...).String()] = series + } + + require.NoError(t, app.Commit()) + + // Delete a time-range from each-selector. + dels := []struct { + ms []labels.Matcher + drange intervals + }{ + { + ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, + drange: intervals{{300, 500}, {600, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("job", "prom-k8s"), + }, + drange: intervals{{300, 500}, {100, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "c"), + labels.NewEqualMatcher("instance", "localhost:9090"), + labels.NewEqualMatcher("job", "prometheus"), + }, + drange: intervals{{300, 400}, {100, 6700}}, + }, + // TODO: Add Regexp Matchers. + } + + for _, del := range dels { + // Reset the deletes everytime. + writeTombstoneFile(hb.dir, newEmptyTombstoneReader()) + hb.tombstones = newEmptyTombstoneReader() + + for _, r := range del.drange { + require.NoError(t, hb.Delete(r.mint, r.maxt, del.ms...)) + } + + matched := labels.Slice{} + for _, ls := range lbls { + s := labels.Selector(del.ms) + if s.Matches(ls) { + matched = append(matched, ls) + } + } + + sort.Sort(matched) + + for i := 0; i < numRanges; i++ { + mint := rand.Int63n(200) + maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) + + q := hb.Querier(mint, maxt) + ss := q.Select(del.ms...) + + // Build the mockSeriesSet. + matchedSeries := make([]Series, 0, len(matched)) + for _, m := range matched { + smpls := boundedSamples(seriesMap[m.String()], mint, maxt) + smpls = deletedSamples(smpls, del.drange) + + // Only append those series for which samples exist as mockSeriesSet + // doesn't skip series with no samples. + // TODO: But sometimes SeriesSet returns an empty SeriesIterator + if len(smpls) > 0 { + matchedSeries = append(matchedSeries, newSeries( + m.Map(), + smpls, + )) + } + } + expSs := newListSeriesSet(matchedSeries) + + // Compare both SeriesSets. + for { + eok, rok := expSs.Next(), ss.Next() + + // Skip a series if iterator is empty. + if rok { + for !ss.At().Iterator().Next() { + rok = ss.Next() + if !rok { + break + } + } + } + require.Equal(t, eok, rok, "next") + + if !eok { + break + } + sexp := expSs.At() + sres := ss.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } + } + + return +} + func boundedSamples(full []sample, mint, maxt int64) []sample { for len(full) > 0 { if full[0].t >= mint { @@ -394,3 +715,19 @@ func boundedSamples(full []sample, mint, maxt int64) []sample { // maxt is after highest sample. return full } + +func deletedSamples(full []sample, dranges intervals) []sample { + ds := make([]sample, 0, len(full)) +Outer: + for _, s := range full { + for _, r := range dranges { + if r.inBounds(s.t) { + continue Outer + } + } + + ds = append(ds, s) + } + + return ds +} diff --git a/index.go b/index.go index c0e96381f3..b36eb49bd9 100644 --- a/index.go +++ b/index.go @@ -569,11 +569,7 @@ func newIndexReader(dir string) (*indexReader, error) { return nil, errors.Wrap(err, "read label index table") } r.postings, err = r.readOffsetTable(r.toc.postingsTable) - if err != nil { - return nil, errors.Wrap(err, "read postings table") - } - - return r, nil + return r, errors.Wrap(err, "read postings table") } func (r *indexReader) readTOC() error { diff --git a/querier.go b/querier.go index 86dd76b99e..601fc74407 100644 --- a/querier.go +++ b/querier.go @@ -126,8 +126,9 @@ func (q *querier) Close() error { // blockQuerier provides querying access to a single block database. type blockQuerier struct { - index IndexReader - chunks ChunkReader + index IndexReader + chunks ChunkReader + tombstones TombstoneReader postingsMapper func(Postings) Postings @@ -149,6 +150,8 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { p: p, index: q.index, absent: absent, + + tombstones: q.tombstones, }, chunks: q.chunks, mint: q.mint, @@ -366,29 +369,35 @@ func (s *mergedSeriesSet) Next() bool { type chunkSeriesSet interface { Next() bool - At() (labels.Labels, []*ChunkMeta) + At() (labels.Labels, []*ChunkMeta, intervals) Err() error } // baseChunkSeries loads the label set and chunk references for a postings // list from an index. It filters out series that have labels set that should be unset. type baseChunkSeries struct { - p Postings - index IndexReader - absent []string // labels that must be unset in results. + p Postings + index IndexReader + tombstones TombstoneReader + absent []string // labels that must be unset in results. - lset labels.Labels - chks []*ChunkMeta - err error + lset labels.Labels + chks []*ChunkMeta + intervals intervals + err error } -func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks } -func (s *baseChunkSeries) Err() error { return s.err } +func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) { + return s.lset, s.chks, s.intervals +} + +func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Next() bool { Outer: for s.p.Next() { - lset, chunks, err := s.index.Series(s.p.At()) + ref := s.p.At() + lset, chunks, err := s.index.Series(ref) if err != nil { s.err = err return false @@ -403,6 +412,19 @@ Outer: s.lset = lset s.chks = chunks + s.intervals = s.tombstones.Get(s.p.At()) + + if len(s.intervals) > 0 { + // Only those chunks that are not entirely deleted. + chks := make([]*ChunkMeta, 0, len(s.chks)) + for _, chk := range s.chks { + if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) { + chks = append(chks, chk) + } + } + + s.chks = chks + } return true } @@ -420,17 +442,20 @@ type populatedChunkSeries struct { chunks ChunkReader mint, maxt int64 - err error - chks []*ChunkMeta - lset labels.Labels + err error + chks []*ChunkMeta + lset labels.Labels + intervals intervals } -func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks } -func (s *populatedChunkSeries) Err() error { return s.err } +func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) { + return s.lset, s.chks, s.intervals +} +func (s *populatedChunkSeries) Err() error { return s.err } func (s *populatedChunkSeries) Next() bool { for s.set.Next() { - lset, chks := s.set.At() + lset, chks, dranges := s.set.At() for len(chks) > 0 { if chks[0].MaxTime >= s.mint { @@ -457,6 +482,7 @@ func (s *populatedChunkSeries) Next() bool { s.lset = lset s.chks = chks + s.intervals = dranges return true } @@ -477,8 +503,15 @@ type blockSeriesSet struct { func (s *blockSeriesSet) Next() bool { for s.set.Next() { - lset, chunks := s.set.At() - s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt} + lset, chunks, dranges := s.set.At() + s.cur = &chunkSeries{ + labels: lset, + chunks: chunks, + mint: s.mint, + maxt: s.maxt, + + intervals: dranges, + } return true } if s.set.Err() != nil { @@ -497,6 +530,8 @@ type chunkSeries struct { chunks []*ChunkMeta // in-order chunk refs mint, maxt int64 + + intervals intervals } func (s *chunkSeries) Labels() labels.Labels { @@ -504,7 +539,7 @@ func (s *chunkSeries) Labels() labels.Labels { } func (s *chunkSeries) Iterator() SeriesIterator { - return newChunkSeriesIterator(s.chunks, s.mint, s.maxt) + return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt) } // SeriesIterator iterates over the data of a time series. @@ -601,16 +636,24 @@ type chunkSeriesIterator struct { cur chunks.Iterator maxt, mint int64 + + intervals intervals } -func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []*ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator { + it := cs[0].Chunk.Iterator() + if len(dranges) > 0 { + it = &deletedIterator{it: it, intervals: dranges} + } return &chunkSeriesIterator{ chunks: cs, i: 0, - cur: cs[0].Chunk.Iterator(), + cur: it, mint: mint, maxt: maxt, + + intervals: dranges, } } @@ -645,6 +688,9 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { it.i = x it.cur = it.chunks[x].Chunk.Iterator() + if len(it.intervals) > 0 { + it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} + } for it.cur.Next() { t0, _ := it.cur.At() @@ -676,6 +722,9 @@ func (it *chunkSeriesIterator) Next() bool { it.i++ it.cur = it.chunks[it.i].Chunk.Iterator() + if len(it.intervals) > 0 { + it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} + } return it.Next() } diff --git a/querier_test.go b/querier_test.go index 864e76cbba..e03570fad6 100644 --- a/querier_test.go +++ b/querier_test.go @@ -232,6 +232,7 @@ func createIdxChkReaders(tc []struct { mi := newMockIndex() for i, s := range tc { + i = i + 1 // 0 is not a valid posting. metas := make([]*ChunkMeta, 0, len(s.chunks)) for _, chk := range s.chunks { // Collisions can be there, but for tests, its fine. @@ -378,8 +379,181 @@ Outer: for _, c := range cases.queries { ir, cr := createIdxChkReaders(cases.data) querier := &blockQuerier{ - index: ir, - chunks: cr, + index: ir, + chunks: cr, + tombstones: newEmptyTombstoneReader(), + + mint: c.mint, + maxt: c.maxt, + } + + res := querier.Select(c.ms...) + + for { + eok, rok := c.exp.Next(), res.Next() + require.Equal(t, eok, rok, "next") + + if !eok { + continue Outer + } + sexp := c.exp.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } + + return +} + +func TestBlockQuerierDelete(t *testing.T) { + newSeries := func(l map[string]string, s []sample) Series { + return &mockSeries{ + labels: func() labels.Labels { return labels.FromMap(l) }, + iterator: func() SeriesIterator { return newListSeriesIterator(s) }, + } + } + + type query struct { + mint, maxt int64 + ms []labels.Matcher + exp SeriesSet + } + + cases := struct { + data []struct { + lset map[string]string + chunks [][]sample + } + + tombstones tombstoneReader + queries []query + }{ + data: []struct { + lset map[string]string + chunks [][]sample + }{ + { + lset: map[string]string{ + "a": "a", + }, + chunks: [][]sample{ + { + {1, 2}, {2, 3}, {3, 4}, + }, + { + {5, 2}, {6, 3}, {7, 4}, + }, + }, + }, + { + lset: map[string]string{ + "a": "a", + "b": "b", + }, + chunks: [][]sample{ + { + {1, 1}, {2, 2}, {3, 3}, + }, + { + {4, 15}, {5, 3}, {6, 6}, + }, + }, + }, + { + lset: map[string]string{ + "b": "b", + }, + chunks: [][]sample{ + { + {1, 3}, {2, 2}, {3, 6}, + }, + { + {5, 1}, {6, 7}, {7, 2}, + }, + }, + }, + }, + tombstones: newTombstoneReader( + map[uint32]intervals{ + 1: intervals{{1, 3}}, + 2: intervals{{1, 3}, {6, 10}}, + 3: intervals{{6, 10}}, + }, + ), + + queries: []query{ + { + mint: 2, + maxt: 7, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + }, + []sample{{5, 2}, {6, 3}, {7, 4}}, + ), + newSeries(map[string]string{ + "a": "a", + "b": "b", + }, + []sample{{4, 15}, {5, 3}}, + ), + }), + }, + { + mint: 2, + maxt: 7, + ms: []labels.Matcher{labels.NewEqualMatcher("b", "b")}, + exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + "b": "b", + }, + []sample{{4, 15}, {5, 3}}, + ), + newSeries(map[string]string{ + "b": "b", + }, + []sample{{2, 2}, {3, 6}, {5, 1}}, + ), + }), + }, + { + mint: 1, + maxt: 4, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + "b": "b", + }, + []sample{{4, 15}}, + ), + }), + }, + { + mint: 1, + maxt: 3, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{}), + }, + }, + } + +Outer: + for _, c := range cases.queries { + ir, cr := createIdxChkReaders(cases.data) + querier := &blockQuerier{ + index: ir, + chunks: cr, + tombstones: cases.tombstones, mint: c.mint, maxt: c.maxt, @@ -487,13 +661,14 @@ func TestBaseChunkSeries(t *testing.T) { } bcs := &baseChunkSeries{ - p: newListPostings(tc.postings), - index: mi, + p: newListPostings(tc.postings), + index: mi, + tombstones: newEmptyTombstoneReader(), } i := 0 for bcs.Next() { - lset, chks := bcs.At() + lset, chks, _ := bcs.At() idx := tc.expIdxs[i] @@ -701,7 +876,7 @@ func TestSeriesIterator(t *testing.T) { chunkFromSamples(tc.b), chunkFromSamples(tc.c), } - res := newChunkSeriesIterator(chkMetas, tc.mint, tc.maxt) + res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) smplValid := make([]sample, 0) for _, s := range tc.exp { @@ -772,7 +947,7 @@ func TestSeriesIterator(t *testing.T) { chunkFromSamples(tc.b), chunkFromSamples(tc.c), } - res := newChunkSeriesIterator(chkMetas, tc.mint, tc.maxt) + res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) smplValid := make([]sample, 0) for _, s := range tc.exp { @@ -919,8 +1094,8 @@ func (m *mockChunkSeriesSet) Next() bool { return m.i < len(m.l) } -func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta) { - return m.l[m.i], m.cm[m.i] +func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) { + return m.l[m.i], m.cm[m.i], nil } func (m *mockChunkSeriesSet) Err() error { diff --git a/tombstones.go b/tombstones.go new file mode 100644 index 0000000000..612b3029fe --- /dev/null +++ b/tombstones.go @@ -0,0 +1,223 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "io/ioutil" + "os" + "path/filepath" + + "github.com/pkg/errors" +) + +const tombstoneFilename = "tombstones" + +const ( + // MagicTombstone is 4 bytes at the head of a tombstone file. + MagicTombstone = 0x130BA30 + + tombstoneFormatV1 = 1 +) + +func writeTombstoneFile(dir string, tr tombstoneReader) error { + path := filepath.Join(dir, tombstoneFilename) + tmp := path + ".tmp" + hash := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + + f, err := os.Create(tmp) + if err != nil { + return err + } + defer f.Close() + + buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)} + buf.reset() + // Write the meta. + buf.putBE32(MagicTombstone) + buf.putByte(tombstoneFormatV1) + _, err = f.Write(buf.get()) + if err != nil { + return err + } + + mw := io.MultiWriter(f, hash) + + for k, v := range tr { + for _, itv := range v { + buf.reset() + buf.putUvarint32(k) + buf.putVarint64(itv.mint) + buf.putVarint64(itv.maxt) + + _, err = mw.Write(buf.get()) + if err != nil { + return err + } + } + } + + _, err = f.Write(hash.Sum(nil)) + if err != nil { + return err + } + + return renameFile(tmp, path) +} + +// Stone holds the information on the posting and time-range +// that is deleted. +type Stone struct { + ref uint32 + intervals intervals +} + +// TombstoneReader is the iterator over tombstones. +type TombstoneReader interface { + Get(ref uint32) intervals +} + +func readTombstones(dir string) (tombstoneReader, error) { + b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) + if err != nil { + return nil, err + } + + if len(b) < 5 { + return nil, errors.Wrap(errInvalidSize, "tombstones header") + } + + d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. + if mg := d.be32(); mg != MagicTombstone { + return nil, fmt.Errorf("invalid magic number %x", mg) + } + if flag := d.byte(); flag != tombstoneFormatV1 { + return nil, fmt.Errorf("invalid tombstone format %x", flag) + } + + if d.err() != nil { + return nil, d.err() + } + + // Verify checksum + hash := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + if _, err := hash.Write(d.get()); err != nil { + return nil, errors.Wrap(err, "write to hash") + } + if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { + return nil, errors.New("checksum did not match") + } + + stonesMap := newEmptyTombstoneReader() + for d.len() > 0 { + k := d.uvarint32() + mint := d.varint64() + maxt := d.varint64() + if d.err() != nil { + return nil, d.err() + } + + stonesMap.add(k, interval{mint, maxt}) + } + + return newTombstoneReader(stonesMap), nil +} + +type tombstoneReader map[uint32]intervals + +func newTombstoneReader(ts map[uint32]intervals) tombstoneReader { + return tombstoneReader(ts) +} + +func newEmptyTombstoneReader() tombstoneReader { + return tombstoneReader(make(map[uint32]intervals)) +} + +func (t tombstoneReader) Get(ref uint32) intervals { + return t[ref] +} + +func (t tombstoneReader) add(ref uint32, itv interval) { + t[ref] = t[ref].add(itv) +} + +type interval struct { + mint, maxt int64 +} + +func (tr interval) inBounds(t int64) bool { + return t >= tr.mint && t <= tr.maxt +} + +func (tr interval) isSubrange(dranges intervals) bool { + for _, r := range dranges { + if r.inBounds(tr.mint) && r.inBounds(tr.maxt) { + return true + } + } + + return false +} + +type intervals []interval + +// This adds the new time-range to the existing ones. +// The existing ones must be sorted. +func (itvs intervals) add(n interval) intervals { + for i, r := range itvs { + // TODO(gouthamve): Make this codepath easier to digest. + if r.inBounds(n.mint-1) || r.inBounds(n.mint) { + if n.maxt > r.maxt { + itvs[i].maxt = n.maxt + } + + j := 0 + for _, r2 := range itvs[i+1:] { + if n.maxt < r2.mint { + break + } + j++ + } + if j != 0 { + if itvs[i+j].maxt > n.maxt { + itvs[i].maxt = itvs[i+j].maxt + } + itvs = append(itvs[:i+1], itvs[i+j+1:]...) + } + return itvs + } + + if r.inBounds(n.maxt+1) || r.inBounds(n.maxt) { + if n.mint < r.maxt { + itvs[i].mint = n.mint + } + return itvs + } + + if n.mint < r.mint { + newRange := make(intervals, i, len(itvs[:i])+1) + copy(newRange, itvs[:i]) + newRange = append(newRange, n) + newRange = append(newRange, itvs[i:]...) + + return newRange + } + } + + itvs = append(itvs, n) + return itvs +} diff --git a/tombstones_test.go b/tombstones_test.go new file mode 100644 index 0000000000..bc6199f114 --- /dev/null +++ b/tombstones_test.go @@ -0,0 +1,123 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "io/ioutil" + "math/rand" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestWriteAndReadbackTombStones(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + ref := uint32(0) + + stones := make(map[uint32]intervals) + // Generate the tombstones. + for i := 0; i < 100; i++ { + ref += uint32(rand.Int31n(10)) + 1 + numRanges := rand.Intn(5) + 1 + dranges := make(intervals, 0, numRanges) + mint := rand.Int63n(time.Now().UnixNano()) + for j := 0; j < numRanges; j++ { + dranges = dranges.add(interval{mint, mint + rand.Int63n(1000)}) + mint += rand.Int63n(1000) + 1 + } + stones[ref] = dranges + } + + require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones))) + + restr, err := readTombstones(tmpdir) + require.NoError(t, err) + exptr := newTombstoneReader(stones) + // Compare the two readers. + require.Equal(t, exptr, restr) +} + +func TestAddingNewIntervals(t *testing.T) { + cases := []struct { + exist intervals + new interval + + exp intervals + }{ + { + new: interval{1, 2}, + exp: intervals{{1, 2}}, + }, + { + exist: intervals{{1, 2}}, + new: interval{1, 2}, + exp: intervals{{1, 2}}, + }, + { + exist: intervals{{1, 4}, {6, 6}}, + new: interval{5, 6}, + exp: intervals{{1, 6}}, + }, + { + exist: intervals{{1, 10}, {12, 20}, {25, 30}}, + new: interval{21, 23}, + exp: intervals{{1, 10}, {12, 23}, {25, 30}}, + }, + { + exist: intervals{{1, 2}, {3, 5}, {7, 7}}, + new: interval{6, 7}, + exp: intervals{{1, 2}, {3, 7}}, + }, + { + exist: intervals{{1, 10}, {12, 20}, {25, 30}}, + new: interval{21, 25}, + exp: intervals{{1, 10}, {12, 30}}, + }, + { + exist: intervals{{1, 10}, {12, 20}, {25, 30}}, + new: interval{18, 23}, + exp: intervals{{1, 10}, {12, 23}, {25, 30}}, + }, + { + exist: intervals{{1, 10}, {12, 20}, {25, 30}}, + new: interval{9, 23}, + exp: intervals{{1, 23}, {25, 30}}, + }, + { + exist: intervals{{1, 10}, {12, 20}, {25, 30}}, + new: interval{9, 230}, + exp: intervals{{1, 230}}, + }, + { + exist: intervals{{5, 10}, {12, 20}, {25, 30}}, + new: interval{1, 4}, + exp: intervals{{1, 10}, {12, 20}, {25, 30}}, + }, + { + exist: intervals{{5, 10}, {12, 20}, {25, 30}}, + new: interval{11, 14}, + exp: intervals{{5, 20}, {25, 30}}, + }, + } + + for _, c := range cases { + + require.Equal(t, c.exp, c.exist.add(c.new)) + } + return +} diff --git a/wal.go b/wal.go index 63975de2a1..50ddb6e346 100644 --- a/wal.go +++ b/wal.go @@ -46,8 +46,18 @@ const ( WALEntrySymbols WALEntryType = 1 WALEntrySeries WALEntryType = 2 WALEntrySamples WALEntryType = 3 + WALEntryDeletes WALEntryType = 4 ) +// SamplesCB is the callback after reading samples. +type SamplesCB func([]RefSample) error + +// SeriesCB is the callback after reading series. +type SeriesCB func([]labels.Labels) error + +// DeletesCB is the callback after reading deletes. +type DeletesCB func([]Stone) error + // SegmentWAL is a write ahead log for series data. type SegmentWAL struct { mtx sync.Mutex @@ -71,15 +81,15 @@ type SegmentWAL struct { // It must be completely read before new entries are logged. type WAL interface { Reader() WALReader - Log([]labels.Labels, []RefSample) error + LogSeries([]labels.Labels) error + LogSamples([]RefSample) error + LogDeletes([]Stone) error Close() error } // WALReader reads entries from a WAL. type WALReader interface { - At() ([]labels.Labels, []RefSample) - Next() bool - Err() error + Read(SeriesCB, SamplesCB, DeletesCB) error } // RefSample is a timestamp/value pair associated with a reference to a series. @@ -141,13 +151,40 @@ func (w *SegmentWAL) Reader() WALReader { } // Log writes a batch of new series labels and samples to the log. -func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error { +//func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error { +//return nil +//} + +// LogSeries writes a batch of new series labels to the log. +func (w *SegmentWAL) LogSeries(series []labels.Labels) error { if err := w.encodeSeries(series); err != nil { return err } + + if w.flushInterval <= 0 { + return w.Sync() + } + return nil +} + +// LogSamples writes a batch of new samples to the log. +func (w *SegmentWAL) LogSamples(samples []RefSample) error { if err := w.encodeSamples(samples); err != nil { return err } + + if w.flushInterval <= 0 { + return w.Sync() + } + return nil +} + +// LogDeletes write a batch of new deletes to the log. +func (w *SegmentWAL) LogDeletes(stones []Stone) error { + if err := w.encodeDeletes(stones); err != nil { + return err + } + if w.flushInterval <= 0 { return w.Sync() } @@ -369,6 +406,7 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { const ( walSeriesSimple = 1 walSamplesSimple = 1 + walDeletesSimple = 1 ) var walBuffers = sync.Pool{} @@ -445,6 +483,23 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error { return w.entry(WALEntrySamples, walSamplesSimple, buf) } +func (w *SegmentWAL) encodeDeletes(stones []Stone) error { + b := make([]byte, 2*binary.MaxVarintLen64) + eb := &encbuf{b: b} + buf := getWALBuffer() + for _, s := range stones { + for _, itv := range s.intervals { + eb.reset() + eb.putUvarint32(s.ref) + eb.putVarint64(itv.mint) + eb.putVarint64(itv.maxt) + buf = append(buf, eb.get()...) + } + } + + return w.entry(WALEntryDeletes, walDeletesSimple, buf) +} + // walReader decodes and emits write ahead log entries. type walReader struct { logger log.Logger @@ -454,9 +509,11 @@ type walReader struct { buf []byte crc32 hash.Hash32 - err error - labels []labels.Labels - samples []RefSample + curType WALEntryType + curFlag byte + curBuf []byte + + err error } func newWALReader(w *SegmentWAL, l log.Logger) *walReader { @@ -471,18 +528,41 @@ func newWALReader(w *SegmentWAL, l log.Logger) *walReader { } } -// At returns the last decoded entry of labels or samples. -// The returned slices are only valid until the next call to Next(). Their elements -// have to be copied to preserve them. -func (r *walReader) At() ([]labels.Labels, []RefSample) { - return r.labels, r.samples -} - // Err returns the last error the reader encountered. func (r *walReader) Err() error { return r.err } +func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error { + for r.next() { + et, flag, b := r.at() + // In decoding below we never return a walCorruptionErr for now. + // Those should generally be catched by entry decoding before. + switch et { + case WALEntrySeries: + s, err := r.decodeSeries(flag, b) + if err != nil { + return err + } + seriesf(s) + case WALEntrySamples: + s, err := r.decodeSamples(flag, b) + if err != nil { + return err + } + samplesf(s) + case WALEntryDeletes: + s, err := r.decodeDeletes(flag, b) + if err != nil { + return err + } + deletesf(s) + } + } + + return r.Err() +} + // nextEntry retrieves the next entry. It is also used as a testing hook. func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { if r.cur >= len(r.wal.files) { @@ -505,12 +585,13 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { return et, flag, b, err } -// Next returns decodes the next entry pair and returns true -// if it was succesful. -func (r *walReader) Next() bool { - r.labels = r.labels[:0] - r.samples = r.samples[:0] +func (r *walReader) at() (WALEntryType, byte, []byte) { + return r.curType, r.curFlag, r.curBuf +} +// next returns decodes the next entry pair and returns true +// if it was succesful. +func (r *walReader) next() bool { if r.cur >= len(r.wal.files) { return false } @@ -537,7 +618,7 @@ func (r *walReader) Next() bool { return false } r.cur++ - return r.Next() + return r.next() } if err != nil { r.err = err @@ -548,19 +629,9 @@ func (r *walReader) Next() bool { return false } - // In decoding below we never return a walCorruptionErr for now. - // Those should generally be catched by entry decoding before. - - switch et { - case WALEntrySamples: - if err := r.decodeSamples(flag, b); err != nil { - r.err = err - } - case WALEntrySeries: - if err := r.decodeSeries(flag, b); err != nil { - r.err = err - } - } + r.curType = et + r.curFlag = flag + r.curBuf = b return r.err == nil } @@ -617,7 +688,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { if etype == 0 { return 0, 0, nil, io.EOF } - if etype != WALEntrySeries && etype != WALEntrySamples { + if etype != WALEntrySeries && etype != WALEntrySamples && etype != WALEntryDeletes { return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype) } @@ -644,11 +715,12 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *walReader) decodeSeries(flag byte, b []byte) error { +func (r *walReader) decodeSeries(flag byte, b []byte) ([]labels.Labels, error) { + series := []labels.Labels{} for len(b) > 0 { l, n := binary.Uvarint(b) if n < 1 { - return errors.Wrap(errInvalidSize, "number of labels") + return nil, errors.Wrap(errInvalidSize, "number of labels") } b = b[n:] lset := make(labels.Labels, l) @@ -656,27 +728,29 @@ func (r *walReader) decodeSeries(flag byte, b []byte) error { for i := 0; i < int(l); i++ { nl, n := binary.Uvarint(b) if n < 1 || len(b) < n+int(nl) { - return errors.Wrap(errInvalidSize, "label name") + return nil, errors.Wrap(errInvalidSize, "label name") } lset[i].Name = string(b[n : n+int(nl)]) b = b[n+int(nl):] vl, n := binary.Uvarint(b) if n < 1 || len(b) < n+int(vl) { - return errors.Wrap(errInvalidSize, "label value") + return nil, errors.Wrap(errInvalidSize, "label value") } lset[i].Value = string(b[n : n+int(vl)]) b = b[n+int(vl):] } - r.labels = append(r.labels, lset) + series = append(series, lset) } - return nil + return series, nil } -func (r *walReader) decodeSamples(flag byte, b []byte) error { +func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { + samples := []RefSample{} + if len(b) < 16 { - return errors.Wrap(errInvalidSize, "header length") + return nil, errors.Wrap(errInvalidSize, "header length") } var ( baseRef = binary.BigEndian.Uint64(b) @@ -689,7 +763,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error { dref, n := binary.Varint(b) if n < 1 { - return errors.Wrap(errInvalidSize, "sample ref delta") + return nil, errors.Wrap(errInvalidSize, "sample ref delta") } b = b[n:] @@ -697,18 +771,36 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error { dtime, n := binary.Varint(b) if n < 1 { - return errors.Wrap(errInvalidSize, "sample timestamp delta") + return nil, errors.Wrap(errInvalidSize, "sample timestamp delta") } b = b[n:] smpl.T = baseTime + dtime if len(b) < 8 { - return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) + return nil, errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) } smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) b = b[8:] - r.samples = append(r.samples, smpl) + samples = append(samples, smpl) } - return nil + return samples, nil +} + +func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { + db := &decbuf{b: b} + stones := []Stone{} + + for db.len() > 0 { + var s Stone + s.ref = db.uvarint32() + s.intervals = intervals{{db.varint64(), db.varint64()}} + if db.err() != nil { + return nil, db.err() + } + + stones = append(stones, s) + } + + return stones, nil } diff --git a/wal_test.go b/wal_test.go index c2988e7e04..23667fb9a0 100644 --- a/wal_test.go +++ b/wal_test.go @@ -149,6 +149,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { var ( recordedSeries [][]labels.Labels recordedSamples [][]RefSample + recordedDeletes [][]Stone ) var totalSamples int @@ -166,32 +167,48 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { var ( resultSeries [][]labels.Labels resultSamples [][]RefSample + resultDeletes [][]Stone ) - for r.Next() { - lsets, smpls := r.At() - + serf := func(lsets []labels.Labels) error { if len(lsets) > 0 { clsets := make([]labels.Labels, len(lsets)) copy(clsets, lsets) resultSeries = append(resultSeries, clsets) } + + return nil + } + smplf := func(smpls []RefSample) error { if len(smpls) > 0 { csmpls := make([]RefSample, len(smpls)) copy(csmpls, smpls) resultSamples = append(resultSamples, csmpls) } + + return nil } - require.NoError(t, r.Err()) + + delf := func(stones []Stone) error { + if len(stones) > 0 { + resultDeletes = append(resultDeletes, stones) + } + + return nil + } + + require.NoError(t, r.Read(serf, smplf, delf)) require.Equal(t, recordedSamples, resultSamples) require.Equal(t, recordedSeries, resultSeries) + require.Equal(t, recordedDeletes, resultDeletes) series := series[k : k+(numMetrics/iterations)] // Insert in batches and generate different amounts of samples for each. for i := 0; i < len(series); i += stepSize { var samples []RefSample + var stones []Stone for j := 0; j < i*10; j++ { samples = append(samples, RefSample{ @@ -201,9 +218,16 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { }) } + for j := 0; j < i*20; j++ { + ts := rand.Int63() + stones = append(stones, Stone{rand.Uint32(), intervals{{ts, ts + rand.Int63n(10000)}}}) + } + lbls := series[i : i+stepSize] - require.NoError(t, w.Log(lbls, samples)) + require.NoError(t, w.LogSeries(lbls)) + require.NoError(t, w.LogSamples(samples)) + require.NoError(t, w.LogDeletes(stones)) if len(lbls) > 0 { recordedSeries = append(recordedSeries, lbls) @@ -212,6 +236,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { recordedSamples = append(recordedSamples, samples) totalSamples += len(samples) } + if len(stones) > 0 { + recordedDeletes = append(recordedDeletes, stones) + } } require.NoError(t, w.Close()) @@ -292,13 +319,13 @@ func TestWALRestoreCorrupted(t *testing.T) { w, err := OpenSegmentWAL(dir, nil, 0) require.NoError(t, err) - require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}})) - require.NoError(t, w.Log(nil, []RefSample{{T: 2, V: 3}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) require.NoError(t, w.cut()) - require.NoError(t, w.Log(nil, []RefSample{{T: 3, V: 4}})) - require.NoError(t, w.Log(nil, []RefSample{{T: 5, V: 6}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) require.NoError(t, w.Close()) @@ -314,17 +341,28 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, err) r := w2.Reader() + serf := func(l []labels.Labels) error { + require.Equal(t, 0, len(l)) + return nil + } + delf := func([]Stone) error { return nil } - require.True(t, r.Next()) - l, s := r.At() - require.Equal(t, 0, len(l)) - require.Equal(t, []RefSample{{T: 1, V: 2}}, s) + // Weird hack to check order of reads. + i := 0 + samplf := func(s []RefSample) error { + if i == 0 { + require.Equal(t, []RefSample{{T: 1, V: 2}}, s) + i++ + } else { + require.Equal(t, []RefSample{{T: 99, V: 100}}, s) + } - // Truncation should happen transparently and not cause an error. - require.False(t, r.Next()) - require.Nil(t, r.Err()) + return nil + } - require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}})) + require.NoError(t, r.Read(serf, samplf, delf)) + + require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) require.NoError(t, w2.Close()) // We should see the first valid entry and the new one, everything after @@ -334,18 +372,8 @@ func TestWALRestoreCorrupted(t *testing.T) { r = w3.Reader() - require.True(t, r.Next()) - l, s = r.At() - require.Equal(t, 0, len(l)) - require.Equal(t, []RefSample{{T: 1, V: 2}}, s) - - require.True(t, r.Next()) - l, s = r.At() - require.Equal(t, 0, len(l)) - require.Equal(t, []RefSample{{T: 99, V: 100}}, s) - - require.False(t, r.Next()) - require.Nil(t, r.Err()) + i = 0 + require.NoError(t, r.Read(serf, samplf, delf)) }) } }