diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index dbe92f3df7..d87b1d1503 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -112,8 +112,8 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, - RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds - BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5), + RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds + BlockRanges: tsdb.ExponentialBlockRanges(3*60*60*1000, 3, 5), }) if err != nil { exitWithError(err) @@ -188,6 +188,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u } wg.Wait() } + fmt.Println("ingestion completed") return total, nil } diff --git a/db.go b/db.go index 2204de6c9e..4f10fe9e8c 100644 --- a/db.go +++ b/db.go @@ -325,37 +325,52 @@ func headFullness(h headBlock) float64 { return a / b } +// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to. +func (db *DB) appendableHeads() (r []headBlock) { + switch l := len(db.heads); l { + case 0: + case 1: + r = append(r, db.heads[0]) + default: + if headFullness(db.heads[l-1]) < 0.5 { + r = append(r, db.heads[l-2]) + } + r = append(r, db.heads[l-1]) + } + return r +} + +func (db *DB) completedHeads() (r []headBlock) { + db.headmtx.RLock() + defer db.headmtx.RUnlock() + + if len(db.heads) < 2 { + return nil + } + + // Select all old heads unless they still have pending appenders. + for _, h := range db.heads[:len(db.heads)-2] { + if h.ActiveWriters() > 0 { + return r + } + r = append(r, h) + } + // Add the 2nd last head if the last head is more than 50% filled. + // Compacting it early allows us to free its memory before allocating + // more for the next block and thus reduces spikes. + if h2 := db.heads[len(db.heads)-2]; headFullness(h2) >= 0.5 && h2.ActiveWriters() == 0 { + r = append(r, h2) + } + return r +} + func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() - db.headmtx.RLock() - // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - var singles []Block - - // Collect head blocks that are ready for compaction. Write them after - // returning the lock to not block Appenders. - // Selected blocks are semantically ensured to not be written to afterwards - // by appendable(). - if len(db.heads) > 1 { - f := headFullness(db.heads[len(db.heads)-1]) - - for _, h := range db.heads[:len(db.heads)-1] { - // Blocks that won't be appendable when instantiating a new appender - // might still have active appenders on them. - // Abort at the first one we encounter. - if h.ActiveWriters() > 0 || f < 0.5 { - break - } - singles = append(singles, h) - } - } - - db.headmtx.RUnlock() - - for _, h := range singles { + for _, h := range db.completedHeads() { select { case <-db.stopc: return changes, nil @@ -677,7 +692,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { } var hb headBlock - for _, h := range a.db.appendable() { + for _, h := range a.db.appendableHeads() { m := h.Meta() if intervalContains(m.MinTime, m.MaxTime-1, t) { @@ -809,18 +824,6 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { return nil } -// appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() (r []headBlock) { - switch len(db.heads) { - case 0: - case 1: - r = append(r, db.heads[0]) - default: - r = append(r, db.heads[len(db.heads)-2:]...) - } - return r -} - func intervalOverlap(amin, amax, bmin, bmax int64) bool { // Checks Overlap: http://stackoverflow.com/questions/3269434/ return amin <= bmax && bmin <= amax