diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 1d330a02da..21b1439a85 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -372,7 +372,7 @@ func main() { os.Exit(checkErr(benchmarkWrite(*benchWriteOutPath, *benchSamplesFile, *benchWriteNumMetrics, *benchWriteNumScrapes))) case tsdbAnalyzeCmd.FullCommand(): - os.Exit(checkErr(analyzeBlock(*analyzePath, *analyzeBlockID, *analyzeLimit, *analyzeRunExtended))) + os.Exit(checkErr(analyzeBlock(ctx, *analyzePath, *analyzeBlockID, *analyzeLimit, *analyzeRunExtended))) case tsdbListCmd.FullCommand(): os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable))) diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 2ade90d105..ebde18489c 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -413,7 +413,7 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) return db, b, nil } -func analyzeBlock(path, blockID string, limit int, runExtended bool) error { +func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExtended bool) error { db, block, err := openBlock(path, blockID) if err != nil { return err @@ -460,7 +460,7 @@ func analyzeBlock(path, blockID string, limit int, runExtended bool) error { labelpairsUncovered := map[string]uint64{} labelpairsCount := map[string]uint64{} entries := 0 - p, err := ir.Postings("", "") // The special all key. + p, err := ir.Postings(ctx, "", "") // The special all key. if err != nil { return err } @@ -543,7 +543,7 @@ func analyzeBlock(path, blockID string, limit int, runExtended bool) error { return err } for _, n := range lv { - postings, err := ir.Postings("__name__", n) + postings, err := ir.Postings(ctx, "__name__", n) if err != nil { return err } @@ -560,14 +560,15 @@ func analyzeBlock(path, blockID string, limit int, runExtended bool) error { printInfo(postingInfos) if runExtended { - return analyzeCompaction(block, ir) + return analyzeCompaction(ctx, block, ir) } return nil } -func analyzeCompaction(block tsdb.BlockReader, indexr tsdb.IndexReader) (err error) { - postingsr, err := indexr.Postings(index.AllPostingsKey()) +func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.IndexReader) (err error) { + n, v := index.AllPostingsKey() + postingsr, err := indexr.Postings(ctx, n, v) if err != nil { return err } diff --git a/promql/bench_test.go b/promql/bench_test.go index 94be6d3289..8e443b5a6a 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -28,6 +28,8 @@ import ( ) func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval, numIntervals int) error { + ctx := context.Background() + metrics := []labels.Labels{} metrics = append(metrics, labels.FromStrings("__name__", "a_one")) metrics = append(metrics, labels.FromStrings("__name__", "b_one")) @@ -67,7 +69,7 @@ func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval, } } stor.DB.ForceHeadMMap() // Ensure we have at most one head chunk for every series. - stor.DB.Compact() + stor.DB.Compact(ctx) return nil } diff --git a/tsdb/block.go b/tsdb/block.go index ff1a38ff9e..f635b10414 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -74,7 +74,7 @@ type IndexReader interface { // The Postings here contain the offsets to the series inside the index. // Found IDs are not strictly required to point to a valid Series, e.g. // during background garbage collections. - Postings(name string, values ...string) (index.Postings, error) + Postings(ctx context.Context, name string, values ...string) (index.Postings, error) // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. @@ -488,8 +488,8 @@ func (r blockIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err return labelNamesWithMatchers(r.ir, matchers...) } -func (r blockIndexReader) Postings(name string, values ...string) (index.Postings, error) { - p, err := r.ir.Postings(name, values...) +func (r blockIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { + p, err := r.ir.Postings(ctx, name, values...) if err != nil { return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) } diff --git a/tsdb/compact.go b/tsdb/compact.go index 0d42f627fb..7f5c307638 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -731,7 +731,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa closers = append(closers, tombsr) k, v := index.AllPostingsKey() - all, err := indexr.Postings(k, v) + all, err := indexr.Postings(ctx, k, v) if err != nil { return err } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 2ef25d91a6..098be8bfa0 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1268,6 +1268,8 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { for title, bootStrap := range tests { t.Run(title, func(t *testing.T) { + ctx := context.Background() + db := openTestDB(t, nil, []int64{1, 100}) defer func() { require.NoError(t, db.Close()) @@ -1291,7 +1293,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { // Do the compaction and check the metrics. // Compaction should succeed, but the reloadBlocks should fail and // the new block created from the compaction should be deleted. - require.Error(t, db.Compact()) + require.Error(t, db.Compact(ctx)) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reloadBlocks' count metrics mismatch") require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "`compaction` count metric mismatch") require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch") diff --git a/tsdb/db.go b/tsdb/db.go index 447b137a5a..8a1c084f20 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -908,7 +908,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs db.oooWasEnabled.Store(true) } - go db.run() + go db.run(ctx) return db, nil } @@ -949,7 +949,7 @@ func (db *DB) Dir() string { return db.dir } -func (db *DB) run() { +func (db *DB) run(ctx context.Context) { defer close(db.donec) backoff := time.Duration(0) @@ -980,7 +980,7 @@ func (db *DB) run() { db.autoCompactMtx.Lock() if db.autoCompact { - if err := db.Compact(); err != nil { + if err := db.Compact(ctx); err != nil { level.Error(db.logger).Log("msg", "compaction failed", "err", err) backoff = exponential(backoff, 1*time.Second, 1*time.Minute) } else { @@ -1100,7 +1100,7 @@ func (a dbAppender) Commit() error { // which will also delete the blocks that fall out of the retention window. // Old blocks are only deleted on reloadBlocks based on the new block's parent information. // See DB.reloadBlocks documentation for further information. -func (db *DB) Compact() (returnErr error) { +func (db *DB) Compact(ctx context.Context) (returnErr error) { db.cmtx.Lock() defer db.cmtx.Unlock() defer func() { @@ -1173,7 +1173,7 @@ func (db *DB) Compact() (returnErr error) { if lastBlockMaxt != math.MinInt64 { // The head was compacted, so we compact OOO head as well. - if err := db.compactOOOHead(); err != nil { + if err := db.compactOOOHead(ctx); err != nil { return errors.Wrap(err, "compact ooo head") } } @@ -1197,18 +1197,18 @@ func (db *DB) CompactHead(head *RangeHead) error { } // CompactOOOHead compacts the OOO Head. -func (db *DB) CompactOOOHead() error { +func (db *DB) CompactOOOHead(ctx context.Context) error { db.cmtx.Lock() defer db.cmtx.Unlock() - return db.compactOOOHead() + return db.compactOOOHead(ctx) } -func (db *DB) compactOOOHead() error { +func (db *DB) compactOOOHead(ctx context.Context) error { if !db.oooWasEnabled.Load() { return nil } - oooHead, err := NewOOOCompactionHead(db.head) + oooHead, err := NewOOOCompactionHead(ctx, db.head) if err != nil { return errors.Wrap(err, "get ooo compaction head") } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index a016d43444..233ad2e909 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1207,7 +1207,7 @@ func TestTombstoneClean(t *testing.T) { defer db.Close() for _, r := range c.intervals { - require.NoError(t, db.Delete(context.Background(), r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) } // All of the setup for THIS line. @@ -1864,7 +1864,7 @@ func TestChunkAtBlockBoundary(t *testing.T) { err := app.Commit() require.NoError(t, err) - err = db.Compact() + err = db.Compact(ctx) require.NoError(t, err) var builder labels.ScratchBuilder @@ -1877,7 +1877,7 @@ func TestChunkAtBlockBoundary(t *testing.T) { meta := block.Meta() k, v := index.AllPostingsKey() - p, err := r.Postings(k, v) + p, err := r.Postings(ctx, k, v) require.NoError(t, err) var chks []chunks.Meta @@ -1920,7 +1920,7 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { err := app.Commit() require.NoError(t, err) - err = db.Compact() + err = db.Compact(ctx) require.NoError(t, err) require.GreaterOrEqual(t, len(db.blocks), 3, "invalid test, less than three blocks in DB") @@ -2051,7 +2051,7 @@ func TestNoEmptyBlocks(t *testing.T) { defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "", ".*") t.Run("Test no blocks after compact with empty head.", func(t *testing.T) { - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) actBlocks, err := blockDirs(db.Dir()) require.NoError(t, err) require.Equal(t, len(db.Blocks()), len(actBlocks)) @@ -2069,7 +2069,7 @@ func TestNoEmptyBlocks(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") actBlocks, err := blockDirs(db.Dir()) @@ -2091,7 +2091,7 @@ func TestNoEmptyBlocks(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") actBlocks, err = blockDirs(db.Dir()) require.NoError(t, err) @@ -2112,7 +2112,7 @@ func TestNoEmptyBlocks(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) require.NoError(t, db.head.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") require.Equal(t, oldBlocks, db.Blocks()) }) @@ -2131,7 +2131,7 @@ func TestNoEmptyBlocks(t *testing.T) { require.NoError(t, db.reloadBlocks()) // Reload the db to register the new blocks. require.Equal(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here once for each block that have tombstones") actBlocks, err := blockDirs(db.Dir()) @@ -2198,6 +2198,7 @@ func TestDB_LabelNames(t *testing.T) { require.NoError(t, err) } for _, tst := range tests { + ctx := context.Background() db := openTestDB(t, nil, nil) defer func() { require.NoError(t, db.Close()) @@ -2214,7 +2215,7 @@ func TestDB_LabelNames(t *testing.T) { require.NoError(t, headIndexr.Close()) // Testing disk. - err = db.Compact() + err = db.Compact(ctx) require.NoError(t, err) // All blocks have same label names, hence check them individually. // No need to aggregate and check. @@ -2264,7 +2265,7 @@ func TestCorrectNumTombstones(t *testing.T) { } require.NoError(t, app.Commit()) - err := db.Compact() + err := db.Compact(ctx) require.NoError(t, err) require.Equal(t, 1, len(db.blocks)) @@ -3033,12 +3034,14 @@ func TestCompactHeadWithDeletion(t *testing.T) { db, err := Open(t.TempDir(), log.NewNopLogger(), prometheus.NewRegistry(), nil, nil) require.NoError(t, err) - app := db.Appender(context.Background()) + ctx := context.Background() + + app := db.Appender(ctx) _, err = app.Append(0, labels.FromStrings("a", "b"), 10, rand.Float64()) require.NoError(t, err) require.NoError(t, app.Commit()) - err = db.Delete(context.Background(), 0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + err = db.Delete(ctx, 0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.NoError(t, err) // This recreates the bug. @@ -3197,6 +3200,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { } tmpDir := t.TempDir() + ctx := context.Background() db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) require.NoError(t, err) @@ -3228,7 +3232,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.Equal(t, 60, last) require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) // As the data spans for 59 blocks, 58 go to disk and 1 remains in Head. @@ -3286,7 +3290,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.Equal(t, 62, last) require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) // No new blocks should be created as there was not data in between the new samples and the blocks. @@ -3385,7 +3389,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t } // Compact the TSDB head for the first time. We expect the head chunks file has been cut. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) // Push more samples for another 1x block duration period. @@ -3430,7 +3434,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t require.Equal(t, actualSeries, numSeries) // Compact the TSDB head again. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, float64(2), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) // At this point we expect 1 head chunk has been deleted. @@ -3521,7 +3525,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun } // Compact the TSDB head for the first time. We expect the head chunks file has been cut. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) // Push more samples for another 1x block duration period. @@ -3564,7 +3568,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun require.Equal(t, actualSeries, numSeries) // Compact the TSDB head again. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, float64(2), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) // At this point we expect 1 head chunk has been deleted. @@ -3796,6 +3800,7 @@ func TestOOOWALWrite(t *testing.T) { // Tests https://github.com/prometheus/prometheus/issues/10291#issuecomment-1044373110. func TestDBPanicOnMmappingHeadChunk(t *testing.T) { dir := t.TempDir() + ctx := context.Background() db, err := Open(dir, nil, nil, DefaultOptions(), nil) require.NoError(t, err) @@ -3826,7 +3831,7 @@ func TestDBPanicOnMmappingHeadChunk(t *testing.T) { addSamples(numSamples) require.Len(t, db.Blocks(), 0) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Len(t, db.Blocks(), 0) // Restarting. @@ -3841,7 +3846,7 @@ func TestDBPanicOnMmappingHeadChunk(t *testing.T) { addSamples(numSamples) require.Len(t, db.Blocks(), 0) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Len(t, db.Blocks(), 1) // More samples to m-map and panic. @@ -4107,6 +4112,7 @@ func TestMetadataAssertInMemoryData(t *testing.T) { // are not included in this compaction. func TestOOOCompaction(t *testing.T) { dir := t.TempDir() + ctx := context.Background() opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -4204,7 +4210,7 @@ func TestOOOCompaction(t *testing.T) { require.Greater(t, f.Size(), int64(100)) // OOO compaction happens here. - require.NoError(t, db.CompactOOOHead()) + require.NoError(t, db.CompactOOOHead(ctx)) // 3 blocks exist now. [0, 120), [120, 240), [240, 360) require.Equal(t, len(db.Blocks()), 3) @@ -4272,7 +4278,7 @@ func TestOOOCompaction(t *testing.T) { require.Equal(t, "000001", files[0].Name()) // This will merge overlapping block. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, len(db.Blocks()), 3) // [0, 120), [120, 240), [240, 360) verifySamples(db.Blocks()[0], 90, 119) @@ -4286,6 +4292,7 @@ func TestOOOCompaction(t *testing.T) { // when the normal head's compaction is done. func TestOOOCompactionWithNormalCompaction(t *testing.T) { dir := t.TempDir() + ctx := context.Background() opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -4328,7 +4335,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { } // If the normal Head is not compacted, the OOO head compaction does not take place. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, len(db.Blocks()), 0) // Add more in-order samples in future that would trigger the compaction. @@ -4338,7 +4345,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { require.Equal(t, len(db.Blocks()), 0) // Compacts normal and OOO head. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) // 2 blocks exist now. [0, 120), [250, 360) require.Equal(t, len(db.Blocks()), 2) @@ -4385,6 +4392,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { // and out-of-order head func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { dir := t.TempDir() + ctx := context.Background() opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -4428,7 +4436,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { } // If the normal Head is not compacted, the OOO head compaction does not take place. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Equal(t, len(db.Blocks()), 0) // Add more in-order samples in future that would trigger the compaction. @@ -4438,7 +4446,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { require.Equal(t, len(db.Blocks()), 0) // Compacts normal and OOO head. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) // 2 blocks exist now. [0, 120), [250, 360) require.Equal(t, len(db.Blocks()), 2) @@ -4485,6 +4493,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { // data from the mmap chunks. func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) { dir := t.TempDir() + ctx := context.Background() opts := DefaultOptions() opts.OutOfOrderCapMax = 10 @@ -4573,7 +4582,7 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) { // Compaction should also work fine. require.Equal(t, len(db.Blocks()), 0) - require.NoError(t, db.CompactOOOHead()) + require.NoError(t, db.CompactOOOHead(ctx)) require.Equal(t, len(db.Blocks()), 1) // One block from OOO data. require.Equal(t, int64(0), db.Blocks()[0].MinTime()) require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime()) @@ -5144,6 +5153,7 @@ func TestWBLAndMmapReplay(t *testing.T) { func TestOOOCompactionFailure(t *testing.T) { dir := t.TempDir() + ctx := context.Background() opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -5206,7 +5216,7 @@ func TestOOOCompactionFailure(t *testing.T) { originalCompactor := db.compactor db.compactor = &mockCompactorFailing{t: t} for i := 0; i < 5; i++ { - require.Error(t, db.CompactOOOHead()) + require.Error(t, db.CompactOOOHead(ctx)) } require.Equal(t, len(db.Blocks()), 0) @@ -5217,7 +5227,7 @@ func TestOOOCompactionFailure(t *testing.T) { verifyFirstWBLFileIs0(6) db.compactor = originalCompactor - require.NoError(t, db.CompactOOOHead()) + require.NoError(t, db.CompactOOOHead(ctx)) oldBlocks := db.Blocks() require.Equal(t, len(db.Blocks()), 3) @@ -5229,7 +5239,7 @@ func TestOOOCompactionFailure(t *testing.T) { // The failed compaction should not have left the ooo Head corrupted. // Hence, expect no new blocks with another OOO compaction call. - require.NoError(t, db.CompactOOOHead()) + require.NoError(t, db.CompactOOOHead(ctx)) require.Equal(t, len(db.Blocks()), 3) require.Equal(t, oldBlocks, db.Blocks()) @@ -5550,6 +5560,8 @@ func TestOOOMmapCorruption(t *testing.T) { } func TestOutOfOrderRuntimeConfig(t *testing.T) { + ctx := context.Background() + getDB := func(oooTimeWindow int64) *DB { dir := t.TempDir() @@ -5616,7 +5628,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) { require.Greater(t, size, int64(0)) require.Len(t, db.Blocks(), 0) - require.NoError(t, db.compactOOOHead()) + require.NoError(t, db.compactOOOHead(ctx)) require.Greater(t, len(db.Blocks()), 0) // WBL is empty. @@ -5836,6 +5848,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) { for i, c := range cases { t.Run(fmt.Sprintf("case=%d", i), func(t *testing.T) { dir := t.TempDir() + ctx := context.Background() opts := DefaultOptions() opts.OutOfOrderTimeWindow = 30 * time.Minute.Milliseconds() @@ -5856,7 +5869,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) { verifySamples(t, db, c.inOrderMint, c.inOrderMaxt) // We get 2 blocks. 1 from OOO, 1 from in-order. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) verifyBlockRanges := func() { blocks := db.Blocks() require.Equal(t, len(c.blockRanges), len(blocks)) @@ -5993,6 +6006,7 @@ func TestPanicOnApplyConfig(t *testing.T) { func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { dir := t.TempDir() + ctx := context.Background() opts := DefaultOptions() opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds() @@ -6057,14 +6071,14 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { db.head.mmapHeadChunks() checkMmapFileContents([]string{"000001", "000002"}, nil) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) checkMmapFileContents([]string{"000002"}, []string{"000001"}) require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted") addSamples(501, 650) db.head.mmapHeadChunks() checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"}) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) checkMmapFileContents(nil, []string{"000001", "000002", "000003"}) // Verify that WBL is empty. diff --git a/tsdb/head_read.go b/tsdb/head_read.go index f27d4ef762..410f4f0937 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -103,7 +103,7 @@ func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err } // Postings returns the postings list iterator for the label pairs. -func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { +func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { switch len(values) { case 0: return index.EmptyPostings(), nil @@ -116,7 +116,7 @@ func (h *headIndexReader) Postings(name string, values ...string) (index.Posting res = append(res, p) } } - return index.Merge(res...), nil + return index.Merge(ctx, res...), nil } } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index fc3ae62d29..69eb622d18 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2546,7 +2546,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { require.NoError(t, app.Commit()) require.Equal(t, int64(math.MinInt64), db.head.minValidTime.Load()) - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Greater(t, db.head.minValidTime.Load(), int64(0)) app = db.Appender(ctx) @@ -2997,6 +2997,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { func TestChunkNotFoundHeadGCRace(t *testing.T) { db := newTestDB(t) db.DisableCompactions() + ctx := context.Background() var ( app = db.Appender(context.Background()) @@ -3029,7 +3030,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { go func() { defer wg.Done() // Compacting head while the querier spans the compaction time. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Greater(t, len(db.Blocks()), 0) }() @@ -3062,6 +3063,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { func TestDataMissingOnQueryDuringCompaction(t *testing.T) { db := newTestDB(t) db.DisableCompactions() + ctx := context.Background() var ( app = db.Appender(context.Background()) @@ -3091,7 +3093,7 @@ func TestDataMissingOnQueryDuringCompaction(t *testing.T) { go func() { defer wg.Done() // Compacting head while the querier spans the compaction time. - require.NoError(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.Greater(t, len(db.Blocks()), 0) }() @@ -5332,6 +5334,7 @@ func BenchmarkCuttingHeadHistogramChunks(b *testing.B) { } func TestCuttingNewHeadChunks(t *testing.T) { + ctx := context.Background() testCases := map[string]struct { numTotalSamples int timestampJitter bool @@ -5465,7 +5468,7 @@ func TestCuttingNewHeadChunks(t *testing.T) { chkReader, err := h.Chunks() require.NoError(t, err) - p, err := idxReader.Postings("foo", "bar") + p, err := idxReader.Postings(ctx, "foo", "bar") require.NoError(t, err) var lblBuilder labels.ScratchBuilder diff --git a/tsdb/index/index.go b/tsdb/index/index.go index a1a970f5c5..3f19de7dc0 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1605,7 +1605,7 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch return errors.Wrap(r.dec.Series(d.Get(), builder, chks), "read series") } -func (r *Reader) Postings(name string, values ...string) (Postings, error) { +func (r *Reader) Postings(ctx context.Context, name string, values ...string) (Postings, error) { if r.version == FormatV1 { e, ok := r.postingsV1[name] if !ok { @@ -1625,7 +1625,7 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) { } res = append(res, p) } - return Merge(res...), nil + return Merge(ctx, res...), nil } e, ok := r.postings[name] @@ -1664,7 +1664,7 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) { // Iterate on the offset table. var postingsOff uint64 // The offset into the postings table. - for d.Err() == nil { + for d.Err() == nil && ctx.Err() == nil { if skip == 0 { // These are always the same number of bytes, // and it's faster to skip than parse. @@ -1701,9 +1701,12 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) { if d.Err() != nil { return nil, errors.Wrap(d.Err(), "get postings offset entry") } + if ctx.Err() != nil { + return nil, errors.Wrap(ctx.Err(), "get postings offset entry") + } } - return Merge(res...), nil + return Merge(ctx, res...), nil } // SortedPostings returns the given postings list reordered so that the backing series diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index a978ba186a..cca7e48ef5 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -103,13 +103,13 @@ func (m mockIndex) LabelValues(name string) ([]string, error) { return values, nil } -func (m mockIndex) Postings(name string, values ...string) (Postings, error) { +func (m mockIndex) Postings(ctx context.Context, name string, values ...string) (Postings, error) { p := []Postings{} for _, value := range values { l := labels.Label{Name: name, Value: value} p = append(p, m.SortedPostings(NewListPostings(m.postings[l]))) } - return Merge(p...), nil + return Merge(ctx, p...), nil } func (m mockIndex) SortedPostings(p Postings) Postings { @@ -162,6 +162,7 @@ func TestIndexRW_Create_Open(t *testing.T) { func TestIndexRW_Postings(t *testing.T) { dir := t.TempDir() + ctx := context.Background() fn := filepath.Join(dir, indexFilename) @@ -194,7 +195,7 @@ func TestIndexRW_Postings(t *testing.T) { ir, err := NewFileReader(fn) require.NoError(t, err) - p, err := ir.Postings("a", "1") + p, err := ir.Postings(ctx, "a", "1") require.NoError(t, err) var c []chunks.Meta @@ -245,6 +246,7 @@ func TestIndexRW_Postings(t *testing.T) { func TestPostingsMany(t *testing.T) { dir := t.TempDir() + ctx := context.Background() fn := filepath.Join(dir, indexFilename) @@ -313,7 +315,7 @@ func TestPostingsMany(t *testing.T) { var builder labels.ScratchBuilder for _, c := range cases { - it, err := ir.Postings("i", c.in...) + it, err := ir.Postings(ctx, "i", c.in...) require.NoError(t, err) got := []string{} @@ -335,6 +337,7 @@ func TestPostingsMany(t *testing.T) { func TestPersistence_index_e2e(t *testing.T) { dir := t.TempDir() + ctx := context.Background() lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000) require.NoError(t, err) @@ -413,10 +416,10 @@ func TestPersistence_index_e2e(t *testing.T) { require.NoError(t, err) for p := range mi.postings { - gotp, err := ir.Postings(p.Name, p.Value) + gotp, err := ir.Postings(ctx, p.Name, p.Value) require.NoError(t, err) - expp, err := mi.Postings(p.Name, p.Value) + expp, err := mi.Postings(ctx, p.Name, p.Value) require.NoError(t, err) var chks, expchks []chunks.Meta diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 3be8a1997f..833448e224 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -15,6 +15,7 @@ package index import ( "container/heap" + "context" "encoding/binary" "runtime" "sort" @@ -519,7 +520,7 @@ func (it *intersectPostings) Err() error { } // Merge returns a new iterator over the union of the input iterators. -func Merge(its ...Postings) Postings { +func Merge(ctx context.Context, its ...Postings) Postings { if len(its) == 0 { return EmptyPostings() } @@ -527,7 +528,7 @@ func Merge(its ...Postings) Postings { return its[0] } - p, ok := newMergedPostings(its) + p, ok := newMergedPostings(ctx, its) if !ok { return EmptyPostings() } @@ -559,12 +560,14 @@ type mergedPostings struct { err error } -func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) { +func newMergedPostings(ctx context.Context, p []Postings) (m *mergedPostings, nonEmpty bool) { ph := make(postingsHeap, 0, len(p)) for _, it := range p { // NOTE: mergedPostings struct requires the user to issue an initial Next. switch { + case ctx.Err() != nil: + return &mergedPostings{err: ctx.Err()}, true case it.Next(): ph = append(ph, it) case it.Err() != nil: diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 9454def467..b2ed1064d2 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -385,7 +385,7 @@ func TestMultiMerge(t *testing.T) { i2 := newListPostings(2, 4, 5, 6, 7, 8, 999, 1001) i3 := newListPostings(1, 2, 5, 6, 7, 8, 1001, 1200) - res, err := ExpandPostings(Merge(i1, i2, i3)) + res, err := ExpandPostings(Merge(context.Background(), i1, i2, i3)) require.NoError(t, err) require.Equal(t, []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, res) } @@ -473,10 +473,12 @@ func TestMergedPostings(t *testing.T) { t.Fatal("merge result expectancy cannot be nil") } + ctx := context.Background() + expected, err := ExpandPostings(c.res) require.NoError(t, err) - m := Merge(c.in...) + m := Merge(ctx, c.in...) if c.res == EmptyPostings() { require.Equal(t, EmptyPostings(), m) @@ -537,10 +539,12 @@ func TestMergedPostingsSeek(t *testing.T) { } for _, c := range cases { + ctx := context.Background() + a := newListPostings(c.a...) b := newListPostings(c.b...) - p := Merge(a, b) + p := Merge(ctx, a, b) require.Equal(t, c.success, p.Seek(c.seek)) @@ -796,6 +800,7 @@ func TestIntersectWithMerge(t *testing.T) { a := newListPostings(21, 22, 23, 24, 25, 30) b := Merge( + context.Background(), newListPostings(10, 20, 30), newListPostings(15, 26, 30), ) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 2d683b545a..c99029d1eb 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -15,6 +15,7 @@ package tsdb import ( + "context" "errors" "math" @@ -190,7 +191,7 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) bool { return a.MinTime < b.MinTime } -func (oh *OOOHeadIndexReader) Postings(name string, values ...string) (index.Postings, error) { +func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { switch len(values) { case 0: return index.EmptyPostings(), nil @@ -202,7 +203,7 @@ func (oh *OOOHeadIndexReader) Postings(name string, values ...string) (index.Pos for _, value := range values { res = append(res, oh.head.postings.Get(name, value)) // TODO(ganesh) Also call GetOOOPostings } - return index.Merge(res...), nil + return index.Merge(ctx, res...), nil } } @@ -268,7 +269,7 @@ type OOOCompactionHead struct { // 4. Cuts a new WBL file for the OOO WBL. // All the above together have a bit of CPU and memory overhead, and can have a bit of impact // on the sample append latency. So call NewOOOCompactionHead only right before compaction. -func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) { +func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error) { ch := &OOOCompactionHead{ chunkRange: head.chunkRange.Load(), mint: math.MaxInt64, @@ -287,7 +288,7 @@ func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) { n, v := index.AllPostingsKey() // TODO: verify this gets only ooo samples. - p, err := ch.oooIR.Postings(n, v) + p, err := ch.oooIR.Postings(ctx, n, v) if err != nil { return nil, err } @@ -396,7 +397,7 @@ func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter { return ir.ch.oooIR.Symbols() } -func (ir *OOOCompactionHeadIndexReader) Postings(name string, values ...string) (index.Postings, error) { +func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error) { n, v := index.AllPostingsKey() if name != n || len(values) != 1 || values[0] != v { return nil, errors.New("only AllPostingsKey is supported") diff --git a/tsdb/querier.go b/tsdb/querier.go index 673bb16a61..836647d235 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -266,7 +266,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, // We prefer to get AllPostings so that the base of subtraction (i.e. allPostings) // doesn't include series that may be added to the index reader during this function call. k, v := index.AllPostingsKey() - allPostings, err := ix.Postings(k, v) + allPostings, err := ix.Postings(context.TODO(), k, v) if err != nil { return nil, err } @@ -286,7 +286,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, switch { case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least. k, v := index.AllPostingsKey() - allPostings, err := ix.Postings(k, v) + allPostings, err := ix.Postings(context.TODO(), k, v) if err != nil { return nil, err } @@ -363,14 +363,14 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro // Fast-path for equal matching. if m.Type == labels.MatchEqual { - return ix.Postings(m.Name, m.Value) + return ix.Postings(context.TODO(), m.Name, m.Value) } // Fast-path for set matching. if m.Type == labels.MatchRegexp { setMatches := findSetMatches(m.GetRegexString()) if len(setMatches) > 0 { - return ix.Postings(m.Name, setMatches...) + return ix.Postings(context.TODO(), m.Name, setMatches...) } } @@ -390,7 +390,7 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro return index.EmptyPostings(), nil } - return ix.Postings(m.Name, res...) + return ix.Postings(context.TODO(), m.Name, res...) } // inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. @@ -401,14 +401,14 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting if m.Type == labels.MatchNotRegexp { setMatches := findSetMatches(m.GetRegexString()) if len(setMatches) > 0 { - return ix.Postings(m.Name, setMatches...) + return ix.Postings(context.TODO(), m.Name, setMatches...) } } // Fast-path for MatchNotEqual matching. // Inverse of a MatchNotEqual is MatchEqual (double negation). if m.Type == labels.MatchNotEqual { - return ix.Postings(m.Name, m.Value) + return ix.Postings(context.TODO(), m.Name, m.Value) } vals, err := ix.LabelValues(m.Name) @@ -428,7 +428,7 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting } } - return ix.Postings(m.Name, res...) + return ix.Postings(context.TODO(), m.Name, res...) } func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) { @@ -463,7 +463,7 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat valuesPostings := make([]index.Postings, len(allValues)) for i, value := range allValues { - valuesPostings[i], err = r.Postings(name, value) + valuesPostings[i], err = r.Postings(context.TODO(), name, value) if err != nil { return nil, errors.Wrapf(err, "fetching postings for %s=%q", name, value) } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index f3228683ec..ffac2bf3bb 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -504,6 +504,7 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) { } func TestBlockQuerier_TrimmingDoesNotModifyOriginalTombstoneIntervals(t *testing.T) { + ctx := context.Background() c := blockQuerierTestCase{ mint: 2, maxt: 6, @@ -527,7 +528,7 @@ func TestBlockQuerier_TrimmingDoesNotModifyOriginalTombstoneIntervals(t *testing } ir, cr, _, _ := createIdxChkReaders(t, testData) stones := tombstones.NewMemTombstones() - p, err := ir.Postings("a", "a") + p, err := ir.Postings(ctx, "a", "a") require.NoError(t, err) refs, err := index.ExpandPostings(p) require.NoError(t, err) @@ -1500,13 +1501,13 @@ func (m mockIndex) LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) { return names, nil } -func (m mockIndex) Postings(name string, values ...string) (index.Postings, error) { +func (m mockIndex) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { res := make([]index.Postings, 0, len(values)) for _, value := range values { l := labels.Label{Name: name, Value: value} res = append(res, index.NewListPostings(m.postings[l])) } - return index.Merge(res...), nil + return index.Merge(ctx, res...), nil } func (m mockIndex) SortedPostings(p index.Postings) index.Postings { @@ -2452,7 +2453,7 @@ func (m mockMatcherIndex) LabelNamesFor(ids ...storage.SeriesRef) ([]string, err return nil, errors.New("label names for for called") } -func (m mockMatcherIndex) Postings(name string, values ...string) (index.Postings, error) { +func (m mockMatcherIndex) Postings(context.Context, string, ...string) (index.Postings, error) { return index.EmptyPostings(), nil } diff --git a/tsdb/repair_test.go b/tsdb/repair_test.go index d4e9b76ad0..c199ecdd4e 100644 --- a/tsdb/repair_test.go +++ b/tsdb/repair_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "os" "path/filepath" "testing" @@ -28,6 +29,7 @@ import ( func TestRepairBadIndexVersion(t *testing.T) { tmpDir := t.TempDir() + ctx := context.Background() // The broken index used in this test was written by the following script // at a broken revision. @@ -78,7 +80,7 @@ func TestRepairBadIndexVersion(t *testing.T) { // Read current index to check integrity. r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) require.NoError(t, err) - p, err := r.Postings("b", "1") + p, err := r.Postings(ctx, "b", "1") require.NoError(t, err) var builder labels.ScratchBuilder for p.Next() { @@ -97,7 +99,7 @@ func TestRepairBadIndexVersion(t *testing.T) { r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) require.NoError(t, err) defer r.Close() - p, err = r.Postings("b", "1") + p, err = r.Postings(ctx, "b", "1") require.NoError(t, err) res := []labels.Labels{}