diff --git a/.travis.yml b/.travis.yml index 83e2006783..8858c67f45 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ sudo: false language: go go: -- 1.8 +- 1.8.x go_import_path: github.com/prometheus/tsdb @@ -14,6 +14,6 @@ install: - go get -t ./... script: -- go test ./... +- go test -timeout 5m ./... diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index dbe92f3df7..d87b1d1503 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -112,8 +112,8 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, - RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds - BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5), + RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds + BlockRanges: tsdb.ExponentialBlockRanges(3*60*60*1000, 3, 5), }) if err != nil { exitWithError(err) @@ -188,6 +188,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u } wg.Wait() } + fmt.Println("ingestion completed") return total, nil } diff --git a/compact.go b/compact.go index 1036cf4031..751edf4f17 100644 --- a/compact.go +++ b/compact.go @@ -229,11 +229,24 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { for i := 0; i < len(ds); { var group []dirMeta + var t0 int64 + m := ds[i].meta // Compute start of aligned time range of size tr closest to the current block's start. - t0 := ds[i].meta.MinTime - (ds[i].meta.MinTime % tr) + if m.MinTime >= 0 { + t0 = tr * (m.MinTime / tr) + } else { + t0 = tr * ((m.MinTime - tr + 1) / tr) + } + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr { + i++ + continue + } // Add all dirs to the current group that are within [t0, t0+tr]. for ; i < len(ds); i++ { + // Either the block falls into the next range or doesn't fit at all (checked above). if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr { break } diff --git a/compact_test.go b/compact_test.go index 07d3f6d63d..25b931a1e3 100644 --- a/compact_test.go +++ b/compact_test.go @@ -14,7 +14,6 @@ package tsdb import ( - "sort" "testing" "github.com/stretchr/testify/require" @@ -178,58 +177,84 @@ func TestCompactionSelect(t *testing.T) { } func TestSplitByRange(t *testing.T) { - splitterFunc := func(ds []dirMeta, tr int64) [][]dirMeta { - rMap := make(map[int64][]dirMeta) - for _, dir := range ds { - t0 := dir.meta.MinTime - dir.meta.MinTime%tr - if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) { - rMap[t0] = append(rMap[t0], dir) - } - } - res := make([][]dirMeta, 0, len(rMap)) - for _, v := range rMap { - res = append(res, v) - } - - sort.Slice(res, func(i, j int) bool { - return res[i][0].meta.MinTime < res[j][0].meta.MinTime - }) - - return res - } - cases := []struct { trange int64 - ranges [][]int64 - output [][][]int64 + ranges [][2]int64 + output [][][2]int64 }{ { trange: 60, - ranges: [][]int64{{0, 10}}, + ranges: [][2]int64{{0, 10}}, + output: [][][2]int64{ + {{0, 10}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 60}}, + ranges: [][2]int64{{0, 60}}, + output: [][][2]int64{ + {{0, 60}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 10}, {30, 60}}, + ranges: [][2]int64{{0, 10}, {9, 15}, {30, 60}}, + output: [][][2]int64{ + {{0, 10}, {9, 15}, {30, 60}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 10}, {60, 90}}, + ranges: [][2]int64{{70, 90}, {125, 130}, {130, 180}, {1000, 1001}}, + output: [][][2]int64{ + {{70, 90}}, + {{125, 130}, {130, 180}}, + {{1000, 1001}}, + }, + }, + // Mis-aligned or too-large blocks are ignored. + { + trange: 60, + ranges: [][2]int64{{50, 70}, {70, 80}}, + output: [][][2]int64{ + {{70, 80}}, + }, + }, + { + trange: 72, + ranges: [][2]int64{{0, 144}, {144, 216}, {216, 288}}, + output: [][][2]int64{ + {{144, 216}}, + {{216, 288}}, + }, + }, + // Various awkward edge cases easy to hit with negative numbers. + { + trange: 60, + ranges: [][2]int64{{-10, -5}}, + output: [][][2]int64{ + {{-10, -5}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 10}, {20, 30}, {90, 120}}, + ranges: [][2]int64{{-60, -50}, {-10, -5}}, + output: [][][2]int64{ + {{-60, -50}, {-10, -5}}, + }, }, { trange: 60, - ranges: [][]int64{{0, 10}, {59, 60}, {60, 120}, {120, 180}, {190, 200}, {200, 210}, {220, 239}}, + ranges: [][2]int64{{-60, -50}, {-10, -5}, {0, 15}}, + output: [][][2]int64{ + {{-60, -50}, {-10, -5}}, + {{0, 15}}, + }, }, } for _, c := range cases { + // Transform input range tuples into dirMetas. blocks := make([]dirMeta, 0, len(c.ranges)) for _, r := range c.ranges { blocks = append(blocks, dirMeta{ @@ -240,6 +265,16 @@ func TestSplitByRange(t *testing.T) { }) } - require.Equal(t, splitterFunc(blocks, c.trange), splitByRange(blocks, c.trange)) + // Transform output range tuples into dirMetas. + exp := make([][]dirMeta, len(c.output)) + for i, group := range c.output { + for _, r := range group { + exp[i] = append(exp[i], dirMeta{ + meta: &BlockMeta{MinTime: r[0], MaxTime: r[1]}, + }) + } + } + + require.Equal(t, exp, splitByRange(blocks, c.trange)) } } diff --git a/db.go b/db.go index 2204de6c9e..a5674056a9 100644 --- a/db.go +++ b/db.go @@ -99,14 +99,13 @@ type DB struct { metrics *dbMetrics opts *Options - // Mutex for that must be held when modifying the general - // block layout. + // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex blocks []Block // Mutex that must be held when modifying just the head blocks // or the general layout. - // Must never be held when acquiring a blocks's mutex! + // mtx must be held before acquiring. headmtx sync.RWMutex heads []headBlock @@ -117,8 +116,8 @@ type DB struct { stopc chan struct{} // cmtx is used to control compactions and deletions. - cmtx sync.Mutex - compacting bool + cmtx sync.Mutex + compactionsEnabled bool } type dbMetrics struct { @@ -197,13 +196,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), - compacting: true, + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + compactionsEnabled: true, } db.metrics = newDBMetrics(db, r) @@ -325,37 +324,59 @@ func headFullness(h headBlock) float64 { return a / b } +// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to. +func (db *DB) appendableHeads() (r []headBlock) { + switch l := len(db.heads); l { + case 0: + case 1: + r = append(r, db.heads[0]) + default: + if headFullness(db.heads[l-1]) < 0.5 { + r = append(r, db.heads[l-2]) + } + r = append(r, db.heads[l-1]) + } + return r +} + +func (db *DB) completedHeads() (r []headBlock) { + db.mtx.RLock() + defer db.mtx.RUnlock() + + db.headmtx.RLock() + defer db.headmtx.RUnlock() + + if len(db.heads) < 2 { + return nil + } + + // Select all old heads unless they still have pending appenders. + for _, h := range db.heads[:len(db.heads)-2] { + if h.ActiveWriters() > 0 { + return r + } + r = append(r, h) + } + // Add the 2nd last head if the last head is more than 50% filled. + // Compacting it early allows us to free its memory before allocating + // more for the next block and thus reduces spikes. + if h2 := db.heads[len(db.heads)-2]; headFullness(h2) >= 0.5 && h2.ActiveWriters() == 0 { + r = append(r, h2) + } + return r +} + func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() - db.headmtx.RLock() + if !db.compactionsEnabled { + return false, nil + } // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - var singles []Block - - // Collect head blocks that are ready for compaction. Write them after - // returning the lock to not block Appenders. - // Selected blocks are semantically ensured to not be written to afterwards - // by appendable(). - if len(db.heads) > 1 { - f := headFullness(db.heads[len(db.heads)-1]) - - for _, h := range db.heads[:len(db.heads)-1] { - // Blocks that won't be appendable when instantiating a new appender - // might still have active appenders on them. - // Abort at the first one we encounter. - if h.ActiveWriters() > 0 || f < 0.5 { - break - } - singles = append(singles, h) - } - } - - db.headmtx.RUnlock() - - for _, h := range singles { + for _, h := range db.completedHeads() { select { case <-db.stopc: return changes, nil @@ -561,30 +582,30 @@ func (db *DB) Close() error { // DisableCompactions disables compactions. func (db *DB) DisableCompactions() { - if db.compacting { - db.cmtx.Lock() - db.compacting = false - db.logger.Log("msg", "compactions disabled") - } + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.compactionsEnabled = false + db.logger.Log("msg", "compactions disabled") } // EnableCompactions enables compactions. func (db *DB) EnableCompactions() { - if !db.compacting { - db.cmtx.Unlock() - db.compacting = true - db.logger.Log("msg", "compactions enabled") - } + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.compactionsEnabled = true + db.logger.Log("msg", "compactions enabled") } // Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { - db.mtx.Lock() // To block any appenders. - defer db.mtx.Unlock() - db.cmtx.Lock() defer db.cmtx.Unlock() + db.mtx.Lock() // To block any appenders. + defer db.mtx.Unlock() + blocks := db.blocks[:] for _, b := range blocks { db.logger.Log("msg", "snapshotting block", "block", b) @@ -677,7 +698,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { } var hb headBlock - for _, h := range a.db.appendable() { + for _, h := range a.db.appendableHeads() { m := h.Meta() if intervalContains(m.MinTime, m.MaxTime-1, t) { @@ -789,6 +810,7 @@ func (a *dbAppender) Rollback() error { func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() + db.mtx.Lock() defer db.mtx.Unlock() @@ -809,18 +831,6 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { return nil } -// appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() (r []headBlock) { - switch len(db.heads) { - case 0: - case 1: - r = append(r, db.heads[0]) - default: - r = append(r, db.heads[len(db.heads)-2:]...) - } - return r -} - func intervalOverlap(amin, amax, bmin, bmax int64) bool { // Checks Overlap: http://stackoverflow.com/questions/3269434/ return amin <= bmax && bmin <= amax diff --git a/db_test.go b/db_test.go index e410dfceb7..d4b3ca8bbc 100644 --- a/db_test.go +++ b/db_test.go @@ -60,10 +60,10 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { require.NoError(t, err) querier := db.Querier(0, 1) - defer querier.Close() seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) require.NoError(t, err) require.Equal(t, seriesSet, map[string][]sample{}) + require.NoError(t, querier.Close()) err = app.Commit() require.NoError(t, err) diff --git a/querier.go b/querier.go index 523a673981..cfe53cd578 100644 --- a/querier.go +++ b/querier.go @@ -53,8 +53,8 @@ type querier struct { blocks []Querier } -// Querier returns a new querier over the data partition for the given -// time range. +// Querier returns a new querier over the data partition for the given time range. +// A goroutine must not handle more than one open Querier. func (s *DB) Querier(mint, maxt int64) Querier { s.mtx.RLock()