From b521559c3b22c00327675906acd5d2e0bfcdc044 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 29 Jan 2019 10:26:01 +0200 Subject: [PATCH 1/9] do a proper cleanup for a failed reload after a compaction a failed reload immediately after a compaction should delete the resulting block to avoid creating blocks with the same time range. Signed-off-by: Krasi Georgiev --- compact_test.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++ db.go | 11 +++++-- 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/compact_test.go b/compact_test.go index 9a45229f21..b496e8eac5 100644 --- a/compact_test.go +++ b/compact_test.go @@ -17,12 +17,14 @@ import ( "io/ioutil" "math" "os" + "path" "path/filepath" "testing" "time" "github.com/go-kit/kit/log" "github.com/pkg/errors" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" @@ -741,3 +743,77 @@ func TestDisableAutoCompactions(t *testing.T) { } testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") } + +// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload imidiately after a compaction +// deletes the resulting block to avoid creatings blocks with the same time range. +func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { + + tests := map[string]func(*DB) int{ + "Test Head Compaction": func(db *DB) int { + rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1 + defaultLabel := labels.FromStrings("foo", "bar") + + // Add some data to the head that is enough to trigger a compaction. + app := db.Appender() + _, err := app.Add(defaultLabel, 1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 2, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + return 1 + }, + "Test Block Compaction": func(db *DB) int { + expBlocks := []*BlockMeta{ + {MinTime: 0, MaxTime: 100}, + {MinTime: 100, MaxTime: 150}, + {MinTime: 150, MaxTime: 200}, + } + for _, m := range expBlocks { + createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) + } + testutil.Ok(t, db.reload()) + testutil.Equals(t, len(expBlocks), len(db.Blocks()), "unexpected block count after a reload") + + return len(expBlocks) + 1 + }, + } + + for title, bootStrap := range tests { + t.Run(title, func(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1, 100}, + }) + defer close() + defer db.Close() + db.DisableCompactions() + + expBlocks := bootStrap(db) + + // Create a block that will trigger the reloard to fail. + blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) + lastBlockIndex := path.Join(blockPath, indexFilename) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, expBlocks, len(actBlocks)) + testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. + + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") + + // Do the compaction and check the metrics. + // Since the most recent block is not included in the compaction, + // the compaction should succeed, but the reload should fail and + // the new block created from the compaction should be deleted. + db.EnableCompactions() + testutil.NotOk(t, db.compact()) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch") + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") + actBlocks, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, expBlocks, len(actBlocks)) + }) + } +} diff --git a/db.go b/db.go index bd3388b203..8b2e795ca7 100644 --- a/db.go +++ b/db.go @@ -425,6 +425,9 @@ func (db *DB) compact() (err error) { runtime.GC() if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete persisted head block after unsuccessful db reload:%s", uid) + } return errors.Wrap(err, "reload blocks") } if (uid == ulid.ULID{}) { @@ -454,12 +457,16 @@ func (db *DB) compact() (err error) { default: } - if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { + uid, err := db.compactor.Compact(db.dir, plan, db.blocks) + if err != nil { return errors.Wrapf(err, "compact %s", plan) } runtime.GC() if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete compacted block after unsuccessful db reload:%s", uid) + } return errors.Wrap(err, "reload blocks") } runtime.GC() @@ -505,7 +512,7 @@ func (db *DB) reload() (err error) { } } if len(corrupted) > 0 { - return errors.Wrap(err, "unexpected corrupted block") + return fmt.Errorf("unexpected corrupted block:%v", corrupted) } // All deletable blocks should not be loaded. From 53c18e7a41c019982ff8d4b96578f95cb47335a9 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 29 Jan 2019 10:32:32 +0200 Subject: [PATCH 2/9] use a global indexFilename constant Signed-off-by: Krasi Georgiev --- block.go | 2 +- index/index.go | 2 ++ index/index_test.go | 8 ++++---- repair.go | 2 +- repair_test.go | 6 +++--- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/block.go b/block.go index 42e11d9513..3ed8c4a640 100644 --- a/block.go +++ b/block.go @@ -282,7 +282,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error if err != nil { return nil, err } - ir, err := index.NewFileReader(filepath.Join(dir, "index")) + ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) if err != nil { return nil, err } diff --git a/index/index.go b/index/index.go index 74e08d4651..ca33eac3fc 100644 --- a/index/index.go +++ b/index/index.go @@ -45,6 +45,8 @@ const ( FormatV2 = 2 labelNameSeperator = "\xff" + + indexFilename = "index" ) type indexWriterSeries struct { diff --git a/index/index_test.go b/index/index_test.go index 2edd3956a4..25c5be938b 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -151,7 +151,7 @@ func TestIndexRW_Create_Open(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - fn := filepath.Join(dir, "index") + fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. iw, err := NewWriter(fn) @@ -177,7 +177,7 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - fn := filepath.Join(dir, "index") + fn := filepath.Join(dir, indexFilename) iw, err := NewWriter(fn) testutil.Ok(t, err) @@ -271,7 +271,7 @@ func TestPersistence_index_e2e(t *testing.T) { }) } - iw, err := NewWriter(filepath.Join(dir, "index")) + iw, err := NewWriter(filepath.Join(dir, indexFilename)) testutil.Ok(t, err) testutil.Ok(t, iw.AddSymbols(symbols)) @@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) { err = iw.Close() testutil.Ok(t, err) - ir, err := NewFileReader(filepath.Join(dir, "index")) + ir, err := NewFileReader(filepath.Join(dir, indexFilename)) testutil.Ok(t, err) for p := range mi.postings { diff --git a/repair.go b/repair.go index 15f79d5f78..4aeffb5547 100644 --- a/repair.go +++ b/repair.go @@ -64,7 +64,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { if err != nil { return wrapErr(err, d) } - broken, err := os.Open(filepath.Join(d, "index")) + broken, err := os.Open(filepath.Join(d, indexFilename)) if err != nil { return wrapErr(err, d) } diff --git a/repair_test.go b/repair_test.go index 5fb780a5bb..cbe21691e6 100644 --- a/repair_test.go +++ b/repair_test.go @@ -30,7 +30,7 @@ func TestRepairBadIndexVersion(t *testing.T) { // at a broken revision. // // func main() { - // w, err := index.NewWriter("index") + // w, err := index.NewWriter(indexFilename) // if err != nil { // panic(err) // } @@ -72,7 +72,7 @@ func TestRepairBadIndexVersion(t *testing.T) { os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777) defer os.RemoveAll(filepath.Join(dbDir, "chunks")) - r, err := index.NewFileReader(filepath.Join(dbDir, "index")) + r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename)) testutil.Ok(t, err) p, err := r.Postings("b", "1") testutil.Ok(t, err) @@ -95,7 +95,7 @@ func TestRepairBadIndexVersion(t *testing.T) { testutil.Ok(t, err) db.Close() - r, err = index.NewFileReader(filepath.Join(tmpDbDir, "index")) + r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) testutil.Ok(t, err) p, err = r.Postings("b", "1") testutil.Ok(t, err) From 3ec08eac5062396f08e4e5f98c76ff7bf7b0e6ec Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 29 Jan 2019 10:32:59 +0200 Subject: [PATCH 3/9] use camelcase for rangeToTriggerCompaction Signed-off-by: Krasi Georgiev --- db_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/db_test.go b/db_test.go index e3e90ce8b3..15da12847f 100644 --- a/db_test.go +++ b/db_test.go @@ -1373,7 +1373,7 @@ func TestNoEmptyBlocks(t *testing.T) { defer db.Close() db.DisableCompactions() - rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1 + rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1 defaultLabel := labels.FromStrings("foo", "bar") defaultMatcher := labels.NewMustRegexpMatcher("", ".*") @@ -1392,7 +1392,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) _, err = app.Add(defaultLabel, 2, 0) testutil.Ok(t, err) - _, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0) + _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) @@ -1414,7 +1414,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) _, err = app.Add(defaultLabel, currentTime+1, 0) testutil.Ok(t, err) - _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1435,7 +1435,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) _, err = app.Add(defaultLabel, currentTime+1, 0) testutil.Ok(t, err) - _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) From 1603222bbc8ff6dedb79b047d9fd7535f96720ec Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 30 Jan 2019 11:40:12 +0200 Subject: [PATCH 4/9] small refactor of openTestDB to handle errors properly. Signed-off-by: Krasi Georgiev --- compact_test.go | 16 +++-- db_test.go | 188 +++++++++++++++++++++++++++++------------------- 2 files changed, 124 insertions(+), 80 deletions(-) diff --git a/compact_test.go b/compact_test.go index b496e8eac5..7c5f4f05fc 100644 --- a/compact_test.go +++ b/compact_test.go @@ -693,9 +693,11 @@ func TestCompaction_populateBlock(t *testing.T) { // This is needed for unit tests that rely on // checking state before and after a compaction. func TestDisableAutoCompactions(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() blockRange := DefaultOptions.BlockRanges[0] label := labels.FromStrings("foo", "bar") @@ -783,11 +785,13 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { for title, bootStrap := range tests { t.Run(title, func(t *testing.T) { - db, close := openTestDB(t, &Options{ + db, delete := openTestDB(t, &Options{ BlockRanges: []int64{1, 100}, }) - defer close() - defer db.Close() + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() db.DisableCompactions() expBlocks := bootStrap(db) diff --git a/db_test.go b/db_test.go index 3aa5e29b2b..f8a6119f82 100644 --- a/db_test.go +++ b/db_test.go @@ -46,7 +46,9 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { testutil.Ok(t, err) // Do not close the test database by default as it will deadlock on test failures. - return db, func() { os.RemoveAll(tmpdir) } + return db, func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + } } // query runs a matcher query against the querier and fully expands its data. @@ -78,9 +80,11 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam // Ensure that blocks are held in memory in their time order // and not in ULID order as they are read from the directory. func TestDB_reloadOrder(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() metas := []BlockMeta{ {MinTime: 90, MaxTime: 100}, @@ -106,9 +110,11 @@ func TestDB_reloadOrder(t *testing.T) { } func TestDataAvailableOnlyAfterCommit(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -135,9 +141,11 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { } func TestDataNotAvailableAfterRollback(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) @@ -156,9 +164,11 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { } func TestDBAppenderAddRef(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app1 := db.Appender() @@ -213,9 +223,11 @@ func TestDBAppenderAddRef(t *testing.T) { func TestDeleteSimple(t *testing.T) { numSamples := int64(10) - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -287,9 +299,11 @@ Outer: } func TestAmendDatapointCausesError(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() _, err := app.Add(labels.Labels{}, 0, 0) @@ -303,9 +317,11 @@ func TestAmendDatapointCausesError(t *testing.T) { } func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() _, err := app.Add(labels.Labels{}, 0, math.NaN()) @@ -318,10 +334,11 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { } func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() - + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) testutil.Ok(t, err) @@ -333,9 +350,11 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { } func TestSkippingInvalidValuesInSameTxn(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() // Append AmendedValue. app := db.Appender() @@ -377,8 +396,8 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { } func TestDB_Snapshot(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer delete() // append data app := db.Appender() @@ -401,11 +420,11 @@ func TestDB_Snapshot(t *testing.T) { // reopen DB from snapshot db, err = Open(snap, nil, nil, nil) testutil.Ok(t, err) - defer db.Close() + defer func() { testutil.Ok(t, db.Close()) }() querier, err := db.Querier(mint, mint+1000) testutil.Ok(t, err) - defer querier.Close() + defer func() { querier.Close() }() // sum values seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) @@ -427,8 +446,8 @@ func TestDB_Snapshot(t *testing.T) { func TestDB_SnapshotWithDelete(t *testing.T) { numSamples := int64(10) - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer delete() app := db.Appender() @@ -468,12 +487,12 @@ Outer: // reopen DB from snapshot db, err = Open(snap, nil, nil, nil) testutil.Ok(t, err) - defer db.Close() + defer func() { testutil.Ok(t, db.Close()) }() // Compare the result. q, err := db.Querier(0, numSamples) testutil.Ok(t, err) - defer q.Close() + defer func() { testutil.Ok(t, q.Close()) }() res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) @@ -570,9 +589,11 @@ func TestDB_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []sample{} } - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -675,8 +696,8 @@ func TestDB_e2e(t *testing.T) { } func TestWALFlushedOnDBClose(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer delete() dirDb := db.Dir() @@ -691,7 +712,7 @@ func TestWALFlushedOnDBClose(t *testing.T) { db, err = Open(dirDb, nil, nil, nil) testutil.Ok(t, err) - defer db.Close() + defer func() { testutil.Ok(t, db.Close()) }() q, err := db.Querier(0, 1) testutil.Ok(t, err) @@ -704,8 +725,8 @@ func TestWALFlushedOnDBClose(t *testing.T) { func TestWALSegmentSizeOption(t *testing.T) { options := *DefaultOptions options.WALSegmentSize = 2 * 32 * 1024 - db, close := openTestDB(t, &options) - defer close() + db, delete := openTestDB(t, &options) + defer delete() app := db.Appender() for i := int64(0); i < 155; i++ { _, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64()) @@ -730,8 +751,8 @@ func TestWALSegmentSizeOption(t *testing.T) { func TestTombstoneClean(t *testing.T) { numSamples := int64(10) - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer delete() app := db.Appender() @@ -827,9 +848,11 @@ func TestTombstoneClean(t *testing.T) { // if TombstoneClean leaves any blocks behind these will overlap. func TestTombstoneCleanFail(t *testing.T) { - db, close := openTestDB(t, nil) - defer db.Close() - defer close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() var expectedBlockDirs []string @@ -906,11 +929,13 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) } func TestTimeRetention(t *testing.T) { - db, close := openTestDB(t, &Options{ + db, delete := openTestDB(t, &Options{ BlockRanges: []int64{1000}, }) - defer close() - defer db.Close() + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() blocks := []*BlockMeta{ {MinTime: 500, MaxTime: 900}, // Oldest block @@ -938,11 +963,13 @@ func TestTimeRetention(t *testing.T) { } func TestSizeRetention(t *testing.T) { - db, close := openTestDB(t, &Options{ + db, delete := openTestDB(t, &Options{ BlockRanges: []int64{100}, }) - defer close() - defer db.Close() + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() blocks := []*BlockMeta{ {MinTime: 100, MaxTime: 200}, // Oldest block @@ -1000,8 +1027,11 @@ func dbDiskSize(dir string) int64 { } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() labelpairs := []labels.Labels{ labels.FromStrings("a", "abcd", "b", "abcde"), @@ -1059,7 +1089,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { q, err := db.Querier(0, 10) testutil.Ok(t, err) - defer q.Close() + defer func() { testutil.Ok(t, q.Close()) }() for _, c := range cases { ss, err := q.Select(c.selector...) @@ -1175,9 +1205,11 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { // Regression test for https://github.com/prometheus/tsdb/issues/347 func TestChunkAtBlockBoundary(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -1229,9 +1261,11 @@ func TestChunkAtBlockBoundary(t *testing.T) { } func TestQuerierWithBoundaryChunks(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -1366,11 +1400,13 @@ func TestInitializeHeadTimestamp(t *testing.T) { } func TestNoEmptyBlocks(t *testing.T) { - db, close := openTestDB(t, &Options{ + db, delete := openTestDB(t, &Options{ BlockRanges: []int64{100}, }) - defer close() - defer db.Close() + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() db.DisableCompactions() rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1 @@ -1524,9 +1560,11 @@ func TestDB_LabelNames(t *testing.T) { testutil.Ok(t, err) } for _, tst := range tests { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() appendSamples(db, 0, 4, tst.sampleLabels1) @@ -1567,9 +1605,11 @@ func TestDB_LabelNames(t *testing.T) { } func TestCorrectNumTombstones(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() blockRange := DefaultOptions.BlockRanges[0] defaultLabel := labels.FromStrings("foo", "bar") From 315de4c7825519408cd1ece09e8fa4bee3eaa9d9 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 30 Jan 2019 11:40:40 +0200 Subject: [PATCH 5/9] fix windows tests Signed-off-by: Krasi Georgiev --- block.go | 20 ++++++++++++++++++-- db.go | 6 ++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/block.go b/block.go index 3ed8c4a640..26853ea304 100644 --- a/block.go +++ b/block.go @@ -269,7 +269,7 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { if logger == nil { logger = log.NewNopLogger() } @@ -282,15 +282,31 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error if err != nil { return nil, err } + defer func() { + if err != nil { + cr.Close() + } + }() ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) if err != nil { return nil, err } + defer func() { + if err != nil { + ir.Close() + } + }() + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + defer func() { + if err != nil { + tr.Close() + } + }() // TODO refactor to set this at block creation time as // that would be the logical place for a block size to be calculated. @@ -301,7 +317,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) } - pb := &Block{ + pb = &Block{ dir: dir, meta: *meta, chunkr: cr, diff --git a/db.go b/db.go index 8b2e795ca7..5063b6837a 100644 --- a/db.go +++ b/db.go @@ -512,6 +512,12 @@ func (db *DB) reload() (err error) { } } if len(corrupted) > 0 { + // Close all new blocks to release the lock for windows. + for _, block := range loadable { + if _, loaded := db.getBlock(block.Meta().ULID); !loaded { + block.Close() + } + } return fmt.Errorf("unexpected corrupted block:%v", corrupted) } From 6c34eb8b636b3d08432e96acce0c6d856fd4c94b Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 30 Jan 2019 11:40:55 +0200 Subject: [PATCH 6/9] nits Signed-off-by: Krasi Georgiev --- compact_test.go | 19 +++++++++---------- db.go | 4 ++-- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/compact_test.go b/compact_test.go index 7c5f4f05fc..c22babe61a 100644 --- a/compact_test.go +++ b/compact_test.go @@ -765,21 +765,21 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - return 1 + return 0 }, "Test Block Compaction": func(db *DB) int { - expBlocks := []*BlockMeta{ + blocks := []*BlockMeta{ {MinTime: 0, MaxTime: 100}, {MinTime: 100, MaxTime: 150}, {MinTime: 150, MaxTime: 200}, } - for _, m := range expBlocks { + for _, m := range blocks { createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) } testutil.Ok(t, db.reload()) - testutil.Equals(t, len(expBlocks), len(db.Blocks()), "unexpected block count after a reload") + testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reload") - return len(expBlocks) + 1 + return len(blocks) }, } @@ -801,15 +801,14 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { lastBlockIndex := path.Join(blockPath, indexFilename) actBlocks, err := blockDirs(db.Dir()) testutil.Ok(t, err) - testutil.Equals(t, expBlocks, len(actBlocks)) - testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. + testutil.Equals(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block. + testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") // Do the compaction and check the metrics. - // Since the most recent block is not included in the compaction, - // the compaction should succeed, but the reload should fail and + // Compaction should succeed, but the reload should fail and // the new block created from the compaction should be deleted. db.EnableCompactions() testutil.NotOk(t, db.compact()) @@ -817,7 +816,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") actBlocks, err = blockDirs(db.Dir()) testutil.Ok(t, err) - testutil.Equals(t, expBlocks, len(actBlocks)) + testutil.Equals(t, expBlocks, len(actBlocks)-1, "block count should be the same as before the compaction") // -1 to exclude the corrupted block. }) } } diff --git a/db.go b/db.go index 5063b6837a..3bce5b08ba 100644 --- a/db.go +++ b/db.go @@ -426,7 +426,7 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { - return errors.Wrapf(err, "delete persisted head block after unsuccessful db reload:%s", uid) + return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid) } return errors.Wrap(err, "reload blocks") } @@ -465,7 +465,7 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { - return errors.Wrapf(err, "delete compacted block after unsuccessful db reload:%s", uid) + return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid) } return errors.Wrap(err, "reload blocks") } From d48606827c06abdf320bcee6a3df79b3708979b1 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 8 Feb 2019 13:35:32 +0200 Subject: [PATCH 7/9] simplify closers Signed-off-by: Krasi Georgiev --- block.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/block.go b/block.go index 26853ea304..08b4861f80 100644 --- a/block.go +++ b/block.go @@ -16,6 +16,7 @@ package tsdb import ( "encoding/json" + "io" "io/ioutil" "os" "path/filepath" @@ -273,6 +274,14 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er if logger == nil { logger = log.NewNopLogger() } + var closers []io.Closer + defer func() { + if err != nil { + for _, c := range closers { + c.Close() + } + } + }() meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -282,31 +291,19 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er if err != nil { return nil, err } - defer func() { - if err != nil { - cr.Close() - } - }() + closers = append(closers, cr) + ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) if err != nil { return nil, err } - - defer func() { - if err != nil { - ir.Close() - } - }() + closers = append(closers, ir) tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } - defer func() { - if err != nil { - tr.Close() - } - }() + closers = append(closers, tr) // TODO refactor to set this at block creation time as // that would be the logical place for a block size to be calculated. From 07df4fd38339d27ae5a99290d98256d71e3a322a Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 11 Feb 2019 11:25:57 +0200 Subject: [PATCH 8/9] nits Signed-off-by: Krasi Georgiev --- compact_test.go | 5 ++--- db_test.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/compact_test.go b/compact_test.go index c22babe61a..d92efab1cd 100644 --- a/compact_test.go +++ b/compact_test.go @@ -746,7 +746,7 @@ func TestDisableAutoCompactions(t *testing.T) { testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") } -// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload imidiately after a compaction +// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction // deletes the resulting block to avoid creatings blocks with the same time range. func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { @@ -796,7 +796,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { expBlocks := bootStrap(db) - // Create a block that will trigger the reloard to fail. + // Create a block that will trigger the reload to fail. blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) lastBlockIndex := path.Join(blockPath, indexFilename) actBlocks, err := blockDirs(db.Dir()) @@ -810,7 +810,6 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { // Do the compaction and check the metrics. // Compaction should succeed, but the reload should fail and // the new block created from the compaction should be deleted. - db.EnableCompactions() testutil.NotOk(t, db.compact()) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch") testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") diff --git a/db_test.go b/db_test.go index 996ef3eb56..1b5cb35086 100644 --- a/db_test.go +++ b/db_test.go @@ -452,7 +452,7 @@ func TestDB_Snapshot(t *testing.T) { querier, err := db.Querier(mint, mint+1000) testutil.Ok(t, err) - defer func() { querier.Close() }() + defer func() { testutil.Ok(t, querier.Close()) }() // sum values seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) From bf2239079db8e7597a72cb1817714fdd099738dc Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 11 Feb 2019 11:57:46 +0200 Subject: [PATCH 9/9] refactor multi errors Signed-off-by: Krasi Georgiev --- block.go | 7 ++++--- chunks/chunks.go | 4 ++-- compact.go | 9 +++++++-- db.go | 2 +- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/block.go b/block.go index 08b4861f80..002ed1bebb 100644 --- a/block.go +++ b/block.go @@ -277,9 +277,10 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er var closers []io.Closer defer func() { if err != nil { - for _, c := range closers { - c.Close() - } + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() } }() meta, err := readMetaFile(dir) diff --git a/chunks/chunks.go b/chunks/chunks.go index 3f65bfa6aa..f0f5ac776e 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -342,7 +342,7 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { } func (s *Reader) Close() error { - return closeAll(s.cs...) + return closeAll(s.cs) } // Size returns the size of the chunks. @@ -410,7 +410,7 @@ func sequenceFiles(dir string) ([]string, error) { return res, nil } -func closeAll(cs ...io.Closer) (err error) { +func closeAll(cs []io.Closer) (err error) { for _, c := range cs { if e := c.Close(); e != nil { err = e diff --git a/compact.go b/compact.go index 4025b12d74..265cda0b84 100644 --- a/compact.go +++ b/compact.go @@ -582,7 +582,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { +func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block from no readers") } @@ -592,7 +592,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) - defer func() { closeAll(closers...) }() + defer func() { + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() + }() for i, b := range blocks { indexr, err := b.Index() diff --git a/db.go b/db.go index 3bce5b08ba..e8fd9c0aab 100644 --- a/db.go +++ b/db.go @@ -1097,7 +1097,7 @@ func (es MultiError) Err() error { return es } -func closeAll(cs ...io.Closer) error { +func closeAll(cs []io.Closer) error { var merr MultiError for _, c := range cs {