From 5d7b5994d6c6bfd7df7e83371cb85b16d0c6f898 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 13 Jul 2017 16:13:59 +0200 Subject: [PATCH 1/5] Fix compaction range selection --- compact.go | 15 +++++++- compact_test.go | 95 +++++++++++++++++++++++++++++++++---------------- 2 files changed, 79 insertions(+), 31 deletions(-) diff --git a/compact.go b/compact.go index 1036cf4031..751edf4f17 100644 --- a/compact.go +++ b/compact.go @@ -229,11 +229,24 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { for i := 0; i < len(ds); { var group []dirMeta + var t0 int64 + m := ds[i].meta // Compute start of aligned time range of size tr closest to the current block's start. - t0 := ds[i].meta.MinTime - (ds[i].meta.MinTime % tr) + if m.MinTime >= 0 { + t0 = tr * (m.MinTime / tr) + } else { + t0 = tr * ((m.MinTime - tr + 1) / tr) + } + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr { + i++ + continue + } // Add all dirs to the current group that are within [t0, t0+tr]. for ; i < len(ds); i++ { + // Either the block falls into the next range or doesn't fit at all (checked above). if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr { break } diff --git a/compact_test.go b/compact_test.go index 07d3f6d63d..25b931a1e3 100644 --- a/compact_test.go +++ b/compact_test.go @@ -14,7 +14,6 @@ package tsdb import ( - "sort" "testing" "github.com/stretchr/testify/require" @@ -178,58 +177,84 @@ func TestCompactionSelect(t *testing.T) { } func TestSplitByRange(t *testing.T) { - splitterFunc := func(ds []dirMeta, tr int64) [][]dirMeta { - rMap := make(map[int64][]dirMeta) - for _, dir := range ds { - t0 := dir.meta.MinTime - dir.meta.MinTime%tr - if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) { - rMap[t0] = append(rMap[t0], dir) - } - } - res := make([][]dirMeta, 0, len(rMap)) - for _, v := range rMap { - res = append(res, v) - } - - sort.Slice(res, func(i, j int) bool { - return res[i][0].meta.MinTime < res[j][0].meta.MinTime - }) - - return res - } - cases := []struct { trange int64 - ranges [][]int64 - output [][][]int64 + ranges [][2]int64 + output [][][2]int64 }{ { trange: 60, - ranges: [][]int64{{0, 10}}, + ranges: [][2]int64{{0, 10}}, + output: [][][2]int64{ + {{0, 10}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 60}}, + ranges: [][2]int64{{0, 60}}, + output: [][][2]int64{ + {{0, 60}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 10}, {30, 60}}, + ranges: [][2]int64{{0, 10}, {9, 15}, {30, 60}}, + output: [][][2]int64{ + {{0, 10}, {9, 15}, {30, 60}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 10}, {60, 90}}, + ranges: [][2]int64{{70, 90}, {125, 130}, {130, 180}, {1000, 1001}}, + output: [][][2]int64{ + {{70, 90}}, + {{125, 130}, {130, 180}}, + {{1000, 1001}}, + }, + }, + // Mis-aligned or too-large blocks are ignored. + { + trange: 60, + ranges: [][2]int64{{50, 70}, {70, 80}}, + output: [][][2]int64{ + {{70, 80}}, + }, + }, + { + trange: 72, + ranges: [][2]int64{{0, 144}, {144, 216}, {216, 288}}, + output: [][][2]int64{ + {{144, 216}}, + {{216, 288}}, + }, + }, + // Various awkward edge cases easy to hit with negative numbers. + { + trange: 60, + ranges: [][2]int64{{-10, -5}}, + output: [][][2]int64{ + {{-10, -5}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 10}, {20, 30}, {90, 120}}, + ranges: [][2]int64{{-60, -50}, {-10, -5}}, + output: [][][2]int64{ + {{-60, -50}, {-10, -5}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 10}, {59, 60}, {60, 120}, {120, 180}, {190, 200}, {200, 210}, {220, 239}}, + ranges: [][2]int64{{-60, -50}, {-10, -5}, {0, 15}}, + output: [][][2]int64{ + {{-60, -50}, {-10, -5}}, + {{0, 15}}, + }, }, } for _, c := range cases { + // Transform input range tuples into dirMetas. blocks := make([]dirMeta, 0, len(c.ranges)) for _, r := range c.ranges { blocks = append(blocks, dirMeta{ @@ -240,6 +265,16 @@ func TestSplitByRange(t *testing.T) { }) } - require.Equal(t, splitterFunc(blocks, c.trange), splitByRange(blocks, c.trange)) + // Transform output range tuples into dirMetas. + exp := make([][]dirMeta, len(c.output)) + for i, group := range c.output { + for _, r := range group { + exp[i] = append(exp[i], dirMeta{ + meta: &BlockMeta{MinTime: r[0], MaxTime: r[1]}, + }) + } + } + + require.Equal(t, exp, splitByRange(blocks, c.trange)) } } From 9c4235532efcb6f4e9f57ea770d95e15857b33a0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 13 Jul 2017 16:15:13 +0200 Subject: [PATCH 2/5] Fix compaction selection after creating new heads This fixes the case where between block creations no compaction plans are ran. We were not compacting anything in these cases since the on creation the most recent head block always had a high timestamp of 0. --- cmd/tsdb/main.go | 5 +-- db.go | 79 +++++++++++++++++++++++++----------------------- 2 files changed, 44 insertions(+), 40 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index dbe92f3df7..d87b1d1503 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -112,8 +112,8 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, - RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds - BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5), + RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds + BlockRanges: tsdb.ExponentialBlockRanges(3*60*60*1000, 3, 5), }) if err != nil { exitWithError(err) @@ -188,6 +188,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u } wg.Wait() } + fmt.Println("ingestion completed") return total, nil } diff --git a/db.go b/db.go index 2204de6c9e..4f10fe9e8c 100644 --- a/db.go +++ b/db.go @@ -325,37 +325,52 @@ func headFullness(h headBlock) float64 { return a / b } +// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to. +func (db *DB) appendableHeads() (r []headBlock) { + switch l := len(db.heads); l { + case 0: + case 1: + r = append(r, db.heads[0]) + default: + if headFullness(db.heads[l-1]) < 0.5 { + r = append(r, db.heads[l-2]) + } + r = append(r, db.heads[l-1]) + } + return r +} + +func (db *DB) completedHeads() (r []headBlock) { + db.headmtx.RLock() + defer db.headmtx.RUnlock() + + if len(db.heads) < 2 { + return nil + } + + // Select all old heads unless they still have pending appenders. + for _, h := range db.heads[:len(db.heads)-2] { + if h.ActiveWriters() > 0 { + return r + } + r = append(r, h) + } + // Add the 2nd last head if the last head is more than 50% filled. + // Compacting it early allows us to free its memory before allocating + // more for the next block and thus reduces spikes. + if h2 := db.heads[len(db.heads)-2]; headFullness(h2) >= 0.5 && h2.ActiveWriters() == 0 { + r = append(r, h2) + } + return r +} + func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() - db.headmtx.RLock() - // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - var singles []Block - - // Collect head blocks that are ready for compaction. Write them after - // returning the lock to not block Appenders. - // Selected blocks are semantically ensured to not be written to afterwards - // by appendable(). - if len(db.heads) > 1 { - f := headFullness(db.heads[len(db.heads)-1]) - - for _, h := range db.heads[:len(db.heads)-1] { - // Blocks that won't be appendable when instantiating a new appender - // might still have active appenders on them. - // Abort at the first one we encounter. - if h.ActiveWriters() > 0 || f < 0.5 { - break - } - singles = append(singles, h) - } - } - - db.headmtx.RUnlock() - - for _, h := range singles { + for _, h := range db.completedHeads() { select { case <-db.stopc: return changes, nil @@ -677,7 +692,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { } var hb headBlock - for _, h := range a.db.appendable() { + for _, h := range a.db.appendableHeads() { m := h.Meta() if intervalContains(m.MinTime, m.MaxTime-1, t) { @@ -809,18 +824,6 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { return nil } -// appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() (r []headBlock) { - switch len(db.heads) { - case 0: - case 1: - r = append(r, db.heads[0]) - default: - r = append(r, db.heads[len(db.heads)-2:]...) - } - return r -} - func intervalOverlap(amin, amax, bmin, bmax int64) bool { // Checks Overlap: http://stackoverflow.com/questions/3269434/ return amin <= bmax && bmin <= amax From 47afc8e00f402180a5c061148a757759b7ae192f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 14 Jul 2017 08:00:18 +0200 Subject: [PATCH 3/5] Reduce test timeout on Travis --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 83e2006783..8858c67f45 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ sudo: false language: go go: -- 1.8 +- 1.8.x go_import_path: github.com/prometheus/tsdb @@ -14,6 +14,6 @@ install: - go get -t ./... script: -- go test ./... +- go test -timeout 5m ./... From 3065be97d8970f7a8c40958b23ef0c6dd6365835 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 14 Jul 2017 09:00:22 +0200 Subject: [PATCH 4/5] Fix and document locking order for DB --- db.go | 16 ++++++++++------ db_test.go | 2 +- querier.go | 4 ++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/db.go b/db.go index 4f10fe9e8c..1c56ae7d08 100644 --- a/db.go +++ b/db.go @@ -99,14 +99,14 @@ type DB struct { metrics *dbMetrics opts *Options - // Mutex for that must be held when modifying the general - // block layout. + // Mutex for that must be held when modifying the general block layout. + // cmtx must be held before acquiring it. mtx sync.RWMutex blocks []Block // Mutex that must be held when modifying just the head blocks // or the general layout. - // Must never be held when acquiring a blocks's mutex! + // mtx must be held before acquiring. headmtx sync.RWMutex heads []headBlock @@ -341,6 +341,9 @@ func (db *DB) appendableHeads() (r []headBlock) { } func (db *DB) completedHeads() (r []headBlock) { + db.mtx.RLock() + defer db.mtx.RUnlock() + db.headmtx.RLock() defer db.headmtx.RUnlock() @@ -594,12 +597,12 @@ func (db *DB) EnableCompactions() { // Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { - db.mtx.Lock() // To block any appenders. - defer db.mtx.Unlock() - db.cmtx.Lock() defer db.cmtx.Unlock() + db.mtx.Lock() // To block any appenders. + defer db.mtx.Unlock() + blocks := db.blocks[:] for _, b := range blocks { db.logger.Log("msg", "snapshotting block", "block", b) @@ -804,6 +807,7 @@ func (a *dbAppender) Rollback() error { func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() + db.mtx.Lock() defer db.mtx.Unlock() diff --git a/db_test.go b/db_test.go index e410dfceb7..d4b3ca8bbc 100644 --- a/db_test.go +++ b/db_test.go @@ -60,10 +60,10 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { require.NoError(t, err) querier := db.Querier(0, 1) - defer querier.Close() seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) require.NoError(t, err) require.Equal(t, seriesSet, map[string][]sample{}) + require.NoError(t, querier.Close()) err = app.Commit() require.NoError(t, err) diff --git a/querier.go b/querier.go index 523a673981..cfe53cd578 100644 --- a/querier.go +++ b/querier.go @@ -53,8 +53,8 @@ type querier struct { blocks []Querier } -// Querier returns a new querier over the data partition for the given -// time range. +// Querier returns a new querier over the data partition for the given time range. +// A goroutine must not handle more than one open Querier. func (s *DB) Querier(mint, maxt int64) Querier { s.mtx.RLock() From 74f67e8271364a3f19fef03e99a2d5d5f732ed55 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 14 Jul 2017 10:06:07 +0200 Subject: [PATCH 5/5] Fix compacting disable/enable Enabling and disabling compaction no longer blocks are potentially causes panics. --- db.go | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/db.go b/db.go index 1c56ae7d08..a5674056a9 100644 --- a/db.go +++ b/db.go @@ -100,7 +100,6 @@ type DB struct { opts *Options // Mutex for that must be held when modifying the general block layout. - // cmtx must be held before acquiring it. mtx sync.RWMutex blocks []Block @@ -117,8 +116,8 @@ type DB struct { stopc chan struct{} // cmtx is used to control compactions and deletions. - cmtx sync.Mutex - compacting bool + cmtx sync.Mutex + compactionsEnabled bool } type dbMetrics struct { @@ -197,13 +196,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), - compacting: true, + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + compactionsEnabled: true, } db.metrics = newDBMetrics(db, r) @@ -371,6 +370,10 @@ func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() + if !db.compactionsEnabled { + return false, nil + } + // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. for _, h := range db.completedHeads() { @@ -579,20 +582,20 @@ func (db *DB) Close() error { // DisableCompactions disables compactions. func (db *DB) DisableCompactions() { - if db.compacting { - db.cmtx.Lock() - db.compacting = false - db.logger.Log("msg", "compactions disabled") - } + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.compactionsEnabled = false + db.logger.Log("msg", "compactions disabled") } // EnableCompactions enables compactions. func (db *DB) EnableCompactions() { - if !db.compacting { - db.cmtx.Unlock() - db.compacting = true - db.logger.Log("msg", "compactions enabled") - } + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.compactionsEnabled = true + db.logger.Log("msg", "compactions enabled") } // Snapshot writes the current data to the directory.