From e478d0e3bccd03cc48f6da724e53acbd607148ac Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 23 Mar 2017 18:27:20 +0100 Subject: [PATCH] Actually close olds blocks in reloadBlocks This fixes a bug leaking memory because blocks were not actually closed as the closing call references the initial, empty slice --- db.go | 4 ++-- head.go | 35 +++++++++++++++++++++++++---------- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/db.go b/db.go index fb3fdc9691..9e50d5b209 100644 --- a/db.go +++ b/db.go @@ -366,7 +366,7 @@ func (db *DB) seqBlock(i int) (Block, bool) { func (db *DB) reloadBlocks() error { var cs []io.Closer - defer closeAll(cs...) + defer func() { closeAll(cs...) }() db.mtx.Lock() defer db.mtx.Unlock() @@ -423,7 +423,7 @@ func (db *DB) reloadBlocks() error { // Close all blocks that we no longer need. They are closed after returning all // locks to avoid questionable locking order. for _, b := range db.blocks { - if nb := seqBlocks[b.Meta().Sequence]; nb != b { + if nb, ok := seqBlocks[b.Meta().Sequence]; !ok || nb != b { cs = append(cs, b) } } diff --git a/head.go b/head.go index 742ea56893..7ec2f2eaea 100644 --- a/head.go +++ b/head.go @@ -53,8 +53,7 @@ type headBlock struct { values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms - metamtx sync.RWMutex - meta BlockMeta + meta BlockMeta } func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { @@ -109,6 +108,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { r := wal.Reader() +Outer: for r.Next() { series, samples := r.At() @@ -117,6 +117,10 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { h.meta.Stats.NumSeries++ } for _, s := range samples { + if int(s.ref) >= len(h.series) { + l.Log("msg", "unknown series reference, abort WAL restore", "got", s.ref, "max", len(h.series)-1) + break Outer + } h.series[s.ref].append(s.t, s.v) if !h.inBounds(s.t) { @@ -168,10 +172,19 @@ func (h *headBlock) Close() error { } func (h *headBlock) Meta() BlockMeta { - h.metamtx.RLock() - defer h.metamtx.RUnlock() + m := BlockMeta{ + ULID: h.meta.ULID, + Sequence: h.meta.Sequence, + MinTime: h.meta.MinTime, + MaxTime: h.meta.MaxTime, + Compaction: h.meta.Compaction, + } - return h.meta + m.Stats.NumChunks = atomic.LoadUint64(&h.meta.Stats.NumChunks) + m.Stats.NumSeries = atomic.LoadUint64(&h.meta.Stats.NumSeries) + m.Stats.NumSamples = atomic.LoadUint64(&h.meta.Stats.NumSamples) + + return m } func (h *headBlock) Dir() string { return h.dir } @@ -199,6 +212,11 @@ func (h *headBlock) Querier(mint, maxt int64) Querier { ep := make([]uint32, 0, 64) for p.Next() { + // Skip posting entries that include series added after we + // instantiated the querier. + if int(p.At()) >= len(series) { + break + } ep = append(ep, p.At()) } if err := p.Err(); err != nil { @@ -413,11 +431,8 @@ func (a *headAppender) Commit() error { a.mtx.RUnlock() - a.metamtx.Lock() - defer a.metamtx.Unlock() - - a.meta.Stats.NumSamples += total - a.meta.Stats.NumSeries += uint64(len(a.newSeries)) + atomic.AddUint64(&a.meta.Stats.NumSamples, total) + atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries))) return nil }