From fb9da52b1139947a8124de486d45f1d228bd3920 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 9 Oct 2017 15:21:46 +0200 Subject: [PATCH] Add more verbose error handling for closing, reduce locking This commit introduces error returns in various places and is explicit about closing persisted blocks. {Index,Chunk,Tombstone}Readers are more consistent about their Close() method. Whenever a reader is retrieved, the corresponding close method must eventually be called. We use this to track pending readers against persisted blocks. Querier's against the DB no longer hold a read lock for their entire lifecycle. This avoids long running queriers to starve new ones when we have to acquire a write lock when reloading blocks. --- block.go | 143 ++++++++++++++++++++++++++++++++++++++------------ chunks.go | 2 +- compact.go | 32 ++++++++--- db.go | 83 ++++++++++++----------------- db_test.go | 32 +++++++---- head.go | 47 +++++++++-------- head_test.go | 3 +- index.go | 4 +- index_test.go | 6 ++- querier.go | 39 +++++++++----- tombstones.go | 8 ++- 11 files changed, 257 insertions(+), 142 deletions(-) diff --git a/block.go b/block.go index 90915a1f00..63b468f272 100644 --- a/block.go +++ b/block.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -26,33 +27,16 @@ import ( "github.com/prometheus/tsdb/labels" ) -// DiskBlock represents a data block backed by on-disk data. -type DiskBlock interface { - BlockReader - - // Directory where block data is stored. - Dir() string - - // Stats returns statistics about the block. - Meta() BlockMeta - - Delete(mint, maxt int64, m ...labels.Matcher) error - - Snapshot(dir string) error - - Close() error -} - // BlockReader provides reading access to a data block. type BlockReader interface { // Index returns an IndexReader over the block's data. - Index() IndexReader + Index() (IndexReader, error) // Chunks returns a ChunkReader over the block's data. - Chunks() ChunkReader + Chunks() (ChunkReader, error) // Tombstones returns a TombstoneReader over the block's deleted data. - Tombstones() TombstoneReader + Tombstones() (TombstoneReader, error) } // Appendable defines an entity to which data can be appended. @@ -149,7 +133,12 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } -type persistedBlock struct { +// Block represents a directory of time series data covering a continous time range. +type Block struct { + mtx sync.RWMutex + closing bool + pendingReaders sync.WaitGroup + dir string meta BlockMeta @@ -159,7 +148,9 @@ type persistedBlock struct { tombstones tombstoneReader } -func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { +// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used +// to instantiate chunk structs. +func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -179,7 +170,7 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { return nil, err } - pb := &persistedBlock{ + pb := &Block{ dir: dir, meta: *meta, chunkr: cr, @@ -189,28 +180,110 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { return pb, nil } -func (pb *persistedBlock) Close() error { +// Close closes the on-disk block. It blocks as long as there are readers reading from the block. +func (pb *Block) Close() error { + pb.mtx.Lock() + pb.closing = true + pb.mtx.Unlock() + + pb.pendingReaders.Wait() + var merr MultiError merr.Add(pb.chunkr.Close()) merr.Add(pb.indexr.Close()) + merr.Add(pb.tombstones.Close()) return merr.Err() } -func (pb *persistedBlock) String() string { +func (pb *Block) String() string { return pb.meta.ULID.String() } -func (pb *persistedBlock) Dir() string { return pb.dir } -func (pb *persistedBlock) Index() IndexReader { return pb.indexr } -func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } -func (pb *persistedBlock) Tombstones() TombstoneReader { - return pb.tombstones -} -func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } +// Dir returns the directory of the block. +func (pb *Block) Dir() string { return pb.dir } + +// Meta returns meta information about the block. +func (pb *Block) Meta() BlockMeta { return pb.meta } + +// ErrClosing is returned when a block is in the process of being closed. +var ErrClosing = errors.New("block is closing") + +func (pb *Block) startRead() error { + pb.mtx.RLock() + defer pb.mtx.RUnlock() + + if pb.closing { + return ErrClosing + } + pb.pendingReaders.Add(1) + return nil +} + +// Index returns a new IndexReader against the block data. +func (pb *Block) Index() (IndexReader, error) { + if err := pb.startRead(); err != nil { + return nil, err + } + return blockIndexReader{IndexReader: pb.indexr, b: pb}, nil +} + +// Chunks returns a new ChunkReader against the block data. +func (pb *Block) Chunks() (ChunkReader, error) { + if err := pb.startRead(); err != nil { + return nil, err + } + return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil +} + +// Tombstones returns a new TombstoneReader against the block data. +func (pb *Block) Tombstones() (TombstoneReader, error) { + if err := pb.startRead(); err != nil { + return nil, err + } + return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil +} + +type blockIndexReader struct { + IndexReader + b *Block +} + +func (r blockIndexReader) Close() error { + r.b.pendingReaders.Done() + return nil +} + +type blockTombstoneReader struct { + TombstoneReader + b *Block +} + +func (r blockTombstoneReader) Close() error { + r.b.pendingReaders.Done() + return nil +} + +type blockChunkReader struct { + ChunkReader + b *Block +} + +func (r blockChunkReader) Close() error { + r.b.pendingReaders.Done() + return nil +} + +// Delete matching series between mint and maxt in the block. +func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { + pb.mtx.Lock() + defer pb.mtx.Unlock() + + if pb.closing { + return ErrClosing + } -func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { pr := newPostingsReader(pb.indexr) p, absent := pr.Select(ms...) @@ -262,7 +335,8 @@ Outer: return writeMetaFile(pb.dir, &pb.meta) } -func (pb *persistedBlock) Snapshot(dir string) error { +// Snapshot creates snapshot of the block into dir. +func (pb *Block) Snapshot(dir string) error { blockDir := filepath.Join(dir, pb.meta.ULID.String()) if err := os.MkdirAll(blockDir, 0777); err != nil { return errors.Wrap(err, "create snapshot block dir") @@ -311,7 +385,6 @@ func clampInterval(a, b, mint, maxt int64) (int64, int64) { if b > maxt { b = maxt } - return a, b } diff --git a/chunks.go b/chunks.go index 626e7b41e2..8152677f73 100644 --- a/chunks.go +++ b/chunks.go @@ -21,9 +21,9 @@ import ( "io" "os" - "github.com/prometheus/tsdb/fileutil" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" ) const ( diff --git a/compact.go b/compact.go index 4f5c40e1aa..5b66082aab 100644 --- a/compact.go +++ b/compact.go @@ -14,6 +14,7 @@ package tsdb import ( + "io" "math/rand" "os" "path/filepath" @@ -299,7 +300,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var metas []*BlockMeta for _, d := range dirs { - b, err := newPersistedBlock(d, c.chunkPool) + b, err := OpenBlock(d, c.chunkPool) if err != nil { return err } @@ -444,10 +445,30 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, var ( set compactionSet allSymbols = make(map[string]struct{}, 1<<16) + closers = []io.Closer{} ) - for i, b := range blocks { + defer func() { closeAll(closers...) }() - symbols, err := b.Index().Symbols() + for i, b := range blocks { + indexr, err := b.Index() + if err != nil { + return errors.Wrapf(err, "open index reader for block %s", b) + } + closers = append(closers, indexr) + + chunkr, err := b.Chunks() + if err != nil { + return errors.Wrapf(err, "open chunk reader for block %s", b) + } + closers = append(closers, chunkr) + + tombsr, err := b.Tombstones() + if err != nil { + return errors.Wrapf(err, "open tombstone reader for block %s", b) + } + closers = append(closers, tombsr) + + symbols, err := indexr.Symbols() if err != nil { return errors.Wrap(err, "read symbols") } @@ -455,15 +476,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, allSymbols[s] = struct{}{} } - indexr := b.Index() - all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value) if err != nil { return err } all = indexr.SortedPostings(all) - s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all) + s := newCompactionSeriesSet(indexr, chunkr, tombsr, all) if i == 0 { set = s @@ -565,7 +584,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "write postings") } } - return nil } diff --git a/db.go b/db.go index e499de5459..ff1763762b 100644 --- a/db.go +++ b/db.go @@ -30,7 +30,6 @@ import ( "golang.org/x/sync/errgroup" - "github.com/prometheus/tsdb/fileutil" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/nightlyone/lockfile" @@ -38,6 +37,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" ) @@ -105,7 +105,7 @@ type DB struct { // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex - blocks []DiskBlock + blocks []*Block head *Head @@ -431,7 +431,7 @@ func retentionCutoff(dir string, mint int64) (bool, error) { return changes, fileutil.Fsync(df) } -func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) { +func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { for _, b := range db.blocks { if b.Meta().ULID == id { return b, true @@ -456,7 +456,7 @@ func (db *DB) reload() (err error) { return errors.Wrap(err, "find blocks") } var ( - blocks []DiskBlock + blocks []*Block exist = map[ulid.ULID]struct{}{} ) @@ -468,7 +468,7 @@ func (db *DB) reload() (err error) { b, ok := db.getBlock(meta.ULID) if !ok { - b, err = newPersistedBlock(dir, db.chunkPool) + b, err = OpenBlock(dir, db.chunkPool) if err != nil { return errors.Wrapf(err, "open block %s", dir) } @@ -505,7 +505,7 @@ func (db *DB) reload() (err error) { return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } -func validateBlockSequence(bs []DiskBlock) error { +func validateBlockSequence(bs []*Block) error { if len(bs) == 0 { return nil } @@ -521,13 +521,19 @@ func validateBlockSequence(bs []DiskBlock) error { return nil } -func (db *DB) Blocks() []DiskBlock { +func (db *DB) String() string { + return "HEAD" +} + +// Blocks returns the databases persisted blocks. +func (db *DB) Blocks() []*Block { db.mtx.RLock() defer db.mtx.RUnlock() return db.blocks } +// Head returns the databases's head. func (db *DB) Head() *Head { return db.head } @@ -587,41 +593,42 @@ func (db *DB) Snapshot(dir string) error { db.cmtx.Lock() defer db.cmtx.Unlock() - db.mtx.RLock() - defer db.mtx.RUnlock() - - for _, b := range db.blocks { + for _, b := range db.Blocks() { level.Info(db.logger).Log("msg", "snapshotting block", "block", b) if err := b.Snapshot(dir); err != nil { return errors.Wrap(err, "error snapshotting headblock") } } - return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) } // Querier returns a new querier over the data partition for the given time range. // A goroutine must not handle more than one open Querier. -func (db *DB) Querier(mint, maxt int64) Querier { - db.mtx.RLock() +func (db *DB) Querier(mint, maxt int64) (Querier, error) { + var blocks []BlockReader - blocks := db.blocksForInterval(mint, maxt) + for _, b := range db.Blocks() { + m := b.Meta() + if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + blocks = append(blocks, b) + } + } + if maxt >= db.head.MinTime() { + blocks = append(blocks, db.head) + } sq := &querier{ blocks: make([]Querier, 0, len(blocks)), - db: db, } for _, b := range blocks { - sq.blocks = append(sq.blocks, &blockQuerier{ - mint: mint, - maxt: maxt, - index: b.Index(), - chunks: b.Chunks(), - tombstones: b.Tombstones(), - }) + q, err := NewBlockQuerier(b, mint, maxt) + if err != nil { + return nil, errors.Wrapf(err, "open querier for block %s", b) + } + sq.blocks = append(sq.blocks, q) } - return sq + return sq, nil } func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { @@ -634,28 +641,22 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() - db.mtx.Lock() - defer db.mtx.Unlock() - var g errgroup.Group - for _, b := range db.blocks { + for _, b := range db.Blocks() { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { - g.Go(func(b DiskBlock) func() error { + g.Go(func(b *Block) func() error { return func() error { return b.Delete(mint, maxt, ms...) } }(b)) } } - g.Go(func() error { return db.head.Delete(mint, maxt, ms...) }) - if err := g.Wait(); err != nil { return err } - return nil } @@ -668,24 +669,6 @@ func intervalContains(min, max, t int64) bool { return t >= min && t <= max } -// blocksForInterval returns all blocks within the partition that may contain -// data for the given time range. -func (db *DB) blocksForInterval(mint, maxt int64) []BlockReader { - var bs []BlockReader - - for _, b := range db.blocks { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { - bs = append(bs, b) - } - } - if maxt >= db.head.MinTime() { - bs = append(bs, db.head) - } - - return bs -} - func isBlockDir(fi os.FileInfo) bool { if !fi.IsDir() { return false diff --git a/db_test.go b/db_test.go index 30ecf26f3a..5dd6e2a761 100644 --- a/db_test.go +++ b/db_test.go @@ -68,7 +68,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) - querier := db.Querier(0, 1) + querier, err := db.Querier(0, 1) + require.NoError(t, err) seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) require.Equal(t, seriesSet, map[string][]sample{}) @@ -77,7 +78,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { err = app.Commit() require.NoError(t, err) - querier = db.Querier(0, 1) + querier, err = db.Querier(0, 1) + require.NoError(t, err) defer querier.Close() seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) @@ -96,7 +98,8 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { err = app.Rollback() require.NoError(t, err) - querier := db.Querier(0, 1) + querier, err := db.Querier(0, 1) + require.NoError(t, err) defer querier.Close() seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) @@ -140,7 +143,9 @@ func TestDBAppenderAddRef(t *testing.T) { require.NoError(t, app2.Commit()) - q := db.Querier(0, 200) + q, err := db.Querier(0, 200) + require.NoError(t, err) + res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b"))) require.Equal(t, map[string][]sample{ @@ -190,7 +195,9 @@ Outer: } // Compare the result. - q := db.Querier(0, numSamples) + q, err := db.Querier(0, numSamples) + require.NoError(t, err) + res := q.Select(labels.NewEqualMatcher("a", "b")) expSamples := make([]sample, 0, len(c.remaint)) @@ -284,7 +291,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { require.NoError(t, app.Commit()) // Make sure the right value is stored. - q := db.Querier(0, 10) + q, err := db.Querier(0, 10) + require.NoError(t, err) + ss := q.Select(labels.NewEqualMatcher("a", "b")) ssMap := readSeriesSet(t, ss) @@ -302,7 +311,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) - q = db.Querier(0, 10) + q, err = db.Querier(0, 10) + require.NoError(t, err) + ss = q.Select(labels.NewEqualMatcher("a", "b")) ssMap = readSeriesSet(t, ss) @@ -336,7 +347,8 @@ func TestDB_Snapshot(t *testing.T) { db, err = Open(snap, nil, nil, nil) require.NoError(t, err) - querier := db.Querier(mint, mint+1000) + querier, err := db.Querier(mint, mint+1000) + require.NoError(t, err) defer querier.Close() // sum values @@ -485,7 +497,9 @@ func TestDB_e2e(t *testing.T) { } } - q := db.Querier(mint, maxt) + q, err := db.Querier(mint, maxt) + require.NoError(t, err) + ss := q.Select(qry.ms...) result := map[string][]sample{} diff --git a/head.go b/head.go index b97a44643e..6ae652a147 100644 --- a/head.go +++ b/head.go @@ -305,6 +305,23 @@ func (h *Head) initTime(t int64) (initialized bool) { return true } +type rangeHead struct { + head *Head + mint, maxt int64 +} + +func (h *rangeHead) Index() (IndexReader, error) { + return h.head.indexRange(h.mint, h.maxt), nil +} + +func (h *rangeHead) Chunks() (ChunkReader, error) { + return h.head.chunksRange(h.mint, h.maxt), nil +} + +func (h *rangeHead) Tombstones() (TombstoneReader, error) { + return h.head.tombstones, nil +} + // initAppender is a helper to initialize the time bounds of a the head // upon the first sample it receives. type initAppender struct { @@ -611,13 +628,14 @@ func (h *Head) gc() { h.symMtx.Unlock() } -func (h *Head) Tombstones() TombstoneReader { - return h.tombstones +// Tombstones returns a new reader over the head's tombstones +func (h *Head) Tombstones() (TombstoneReader, error) { + return h.tombstones, nil } // Index returns an IndexReader against the block. -func (h *Head) Index() IndexReader { - return h.indexRange(math.MinInt64, math.MaxInt64) +func (h *Head) Index() (IndexReader, error) { + return h.indexRange(math.MinInt64, math.MaxInt64), nil } func (h *Head) indexRange(mint, maxt int64) *headIndexReader { @@ -628,8 +646,8 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { } // Chunks returns a ChunkReader against the block. -func (h *Head) Chunks() ChunkReader { - return h.chunksRange(math.MinInt64, math.MaxInt64) +func (h *Head) Chunks() (ChunkReader, error) { + return h.chunksRange(math.MinInt64, math.MaxInt64), nil } func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { @@ -712,23 +730,6 @@ func (c *safeChunk) Iterator() chunks.Iterator { // func (c *safeChunk) Bytes() []byte { panic("illegal") } // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } -type rangeHead struct { - head *Head - mint, maxt int64 -} - -func (h *rangeHead) Index() IndexReader { - return h.head.indexRange(h.mint, h.maxt) -} - -func (h *rangeHead) Chunks() ChunkReader { - return h.head.chunksRange(h.mint, h.maxt) -} - -func (h *rangeHead) Tombstones() TombstoneReader { - return newEmptyTombstoneReader() -} - type headIndexReader struct { head *Head mint, maxt int64 diff --git a/head_test.go b/head_test.go index 31713b9c1c..afdf1597f8 100644 --- a/head_test.go +++ b/head_test.go @@ -322,7 +322,8 @@ Outer: } // Compare the result. - q := NewBlockQuerier(head.Index(), head.Chunks(), head.Tombstones(), head.MinTime(), head.MaxTime()) + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + require.NoError(t, err) res := q.Select(labels.NewEqualMatcher("a", "b")) expSamples := make([]sample, 0, len(c.remaint)) diff --git a/index.go b/index.go index b9cda3073f..89f7764331 100644 --- a/index.go +++ b/index.go @@ -19,14 +19,14 @@ import ( "fmt" "hash" "io" + "math" "os" "path/filepath" "sort" "strings" - "math" - "github.com/prometheus/tsdb/fileutil" "github.com/pkg/errors" + "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" ) diff --git a/index_test.go b/index_test.go index c97cf380ce..7b908e39af 100644 --- a/index_test.go +++ b/index_test.go @@ -40,12 +40,15 @@ type mockIndex struct { } func newMockIndex() mockIndex { - return mockIndex{ + ix := mockIndex{ series: make(map[uint64]series), labelIndex: make(map[string][]string), postings: newMemPostings(), symbols: make(map[string]struct{}), } + ix.postings.ensureOrder() + + return ix } func (m mockIndex) Symbols() (map[string]struct{}, error) { @@ -277,6 +280,7 @@ func TestPersistence_index_e2e(t *testing.T) { postings = newMemPostings() values = map[string]stringset{} ) + postings.ensureOrder() mi := newMockIndex() diff --git a/querier.go b/querier.go index 5461fec89a..808a1057aa 100644 --- a/querier.go +++ b/querier.go @@ -18,6 +18,7 @@ import ( "sort" "strings" + "github.com/pkg/errors" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" ) @@ -50,7 +51,6 @@ type Series interface { // querier aggregates querying results from time blocks within // a single partition. type querier struct { - db *DB blocks []Querier } @@ -103,21 +103,30 @@ func (q *querier) Close() error { for _, bq := range q.blocks { merr.Add(bq.Close()) } - q.db.mtx.RUnlock() - return merr.Err() } // NewBlockQuerier returns a queries against the readers. -func NewBlockQuerier(ir IndexReader, cr ChunkReader, tr TombstoneReader, mint, maxt int64) Querier { - return &blockQuerier{ - index: ir, - chunks: cr, - tombstones: tr, - - mint: mint, - maxt: maxt, +func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { + indexr, err := b.Index() + if err != nil { + return nil, errors.Wrapf(err, "open index reader") } + chunkr, err := b.Chunks() + if err != nil { + return nil, errors.Wrapf(err, "open chunk reader") + } + tombsr, err := b.Tombstones() + if err != nil { + return nil, errors.Wrapf(err, "open tombstone reader") + } + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: indexr, + chunks: chunkr, + tombstones: tombsr, + }, nil } // blockQuerier provides querying access to a single block database. @@ -175,7 +184,13 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { } func (q *blockQuerier) Close() error { - return nil + var merr MultiError + + merr.Add(q.index.Close()) + merr.Add(q.chunks.Close()) + merr.Add(q.tombstones.Close()) + + return merr.Err() } // postingsReader is used to select matching postings from an IndexReader. diff --git a/tombstones.go b/tombstones.go index 7b24407b53..19b2246348 100644 --- a/tombstones.go +++ b/tombstones.go @@ -33,9 +33,11 @@ const ( tombstoneFormatV1 = 1 ) -// TombstoneReader is the iterator over tombstones. +// TombstoneReader gives access to tombstone intervals by series reference. type TombstoneReader interface { Get(ref uint64) Intervals + + Close() error } func writeTombstoneFile(dir string, tr tombstoneReader) error { @@ -154,6 +156,10 @@ func (t tombstoneReader) add(ref uint64, itv Interval) { t[ref] = t[ref].add(itv) } +func (tombstoneReader) Close() error { + return nil +} + // Interval represents a single time-interval. type Interval struct { Mint, Maxt int64