From 6ef9ed0bc33299499df9addee5b007070c5d32b8 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 13 Sep 2023 15:43:06 +0200 Subject: [PATCH] Add context argument to DB.Delete (#12834) Signed-off-by: Arve Knudsen --- cmd/prometheus/main.go | 4 ++-- tsdb/block.go | 3 ++- tsdb/block_test.go | 2 +- tsdb/db.go | 6 +++--- tsdb/db_test.go | 30 +++++++++++++++--------------- tsdb/head.go | 11 ++++++++++- tsdb/head_test.go | 10 +++++----- tsdb/querier_test.go | 3 ++- web/api/v1/api.go | 4 ++-- web/api/v1/api_test.go | 6 +++--- 10 files changed, 45 insertions(+), 34 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ab57900217..43b781f62c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1461,11 +1461,11 @@ func (s *readyStorage) CleanTombstones() error { } // Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. -func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error { +func (s *readyStorage) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error { if x := s.get(); x != nil { switch db := x.(type) { case *tsdb.DB: - return db.Delete(mint, maxt, ms...) + return db.Delete(ctx, mint, maxt, ms...) case *agent.DB: return agent.ErrUnsupported default: diff --git a/tsdb/block.go b/tsdb/block.go index d7b344ab5d..ff1a38ff9e 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -15,6 +15,7 @@ package tsdb import ( + "context" "encoding/json" "io" "os" @@ -543,7 +544,7 @@ func (r blockChunkReader) Close() error { } // Delete matching series between mint and maxt in the block. -func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error { +func (pb *Block) Delete(_ context.Context, mint, maxt int64, ms ...*labels.Matcher) error { pb.mtx.Lock() defer pb.mtx.Unlock() diff --git a/tsdb/block_test.go b/tsdb/block_test.go index d8a893510c..5687637d08 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -304,7 +304,7 @@ func TestBlockSize(t *testing.T) { // Delete some series and check the sizes again. { - require.NoError(t, blockInit.Delete(1, 10, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))) + require.NoError(t, blockInit.Delete(context.Background(), 1, 10, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))) expAfterDelete := blockInit.Size() require.Greater(t, expAfterDelete, expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit) actAfterDelete, err := fileutil.DirSize(blockDirInit) diff --git a/tsdb/db.go b/tsdb/db.go index 684d4813ee..447b137a5a 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -2006,7 +2006,7 @@ func rangeForTimestamp(t, width int64) (maxt int64) { } // Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. -func (db *DB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { +func (db *DB) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -2018,13 +2018,13 @@ func (db *DB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { for _, b := range db.blocks { if b.OverlapsClosedInterval(mint, maxt) { g.Go(func(b *Block) func() error { - return func() error { return b.Delete(mint, maxt, ms...) } + return func() error { return b.Delete(ctx, mint, maxt, ms...) } }(b)) } } if db.head.OverlapsClosedInterval(mint, maxt) { g.Go(func() error { - return db.head.Delete(mint, maxt, ms...) + return db.head.Delete(ctx, mint, maxt, ms...) }) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index a89d0277fb..a016d43444 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -397,7 +397,7 @@ func TestAppendEmptyLabelsIgnored(t *testing.T) { } func TestDeleteSimple(t *testing.T) { - numSamples := int64(10) + const numSamples int64 = 10 cases := []struct { Intervals tombstones.Intervals @@ -446,7 +446,7 @@ Outer: // TODO(gouthamve): Reset the tombstones somehow. // Delete the ranges. for _, r := range c.Intervals { - require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) } // Compare the result. @@ -733,7 +733,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { } func TestDB_SnapshotWithDelete(t *testing.T) { - numSamples := int64(10) + const numSamples int64 = 10 db := openTestDB(t, nil, nil) defer func() { require.NoError(t, db.Close()) }() @@ -763,7 +763,7 @@ Outer: // TODO(gouthamve): Reset the tombstones somehow. // Delete the ranges. for _, r := range c.intervals { - require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) } // create snapshot @@ -1169,7 +1169,7 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore } func TestTombstoneClean(t *testing.T) { - numSamples := int64(10) + const numSamples int64 = 10 db := openTestDB(t, nil, nil) @@ -1207,7 +1207,7 @@ func TestTombstoneClean(t *testing.T) { defer db.Close() for _, r := range c.intervals { - require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + require.NoError(t, db.Delete(context.Background(), r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) } // All of the setup for THIS line. @@ -1292,7 +1292,7 @@ func TestTombstoneCleanResultEmptyBlock(t *testing.T) { // Create tombstones by deleting all samples. for _, r := range intervals { - require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) } require.NoError(t, db.CleanTombstones()) @@ -2068,7 +2068,7 @@ func TestNoEmptyBlocks(t *testing.T) { _, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0) require.NoError(t, err) require.NoError(t, app.Commit()) - require.NoError(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.Compact()) require.Equal(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") @@ -2111,7 +2111,7 @@ func TestNoEmptyBlocks(t *testing.T) { _, err = app.Append(0, defaultLabel, currentTime+rangeToTriggerCompaction, 0) require.NoError(t, err) require.NoError(t, app.Commit()) - require.NoError(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + require.NoError(t, db.head.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.Compact()) require.Equal(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") require.Equal(t, oldBlocks, db.Blocks()) @@ -2130,7 +2130,7 @@ func TestNoEmptyBlocks(t *testing.T) { oldBlocks := db.Blocks() require.NoError(t, db.reloadBlocks()) // Reload the db to register the new blocks. require.Equal(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. - require.NoError(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.Compact()) require.Equal(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here once for each block that have tombstones") @@ -2268,17 +2268,17 @@ func TestCorrectNumTombstones(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(db.blocks)) - require.NoError(t, db.Delete(0, 1, defaultMatcher)) + require.NoError(t, db.Delete(ctx, 0, 1, defaultMatcher)) require.Equal(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) // {0, 1} and {2, 3} are merged to form 1 tombstone. - require.NoError(t, db.Delete(2, 3, defaultMatcher)) + require.NoError(t, db.Delete(ctx, 2, 3, defaultMatcher)) require.Equal(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) - require.NoError(t, db.Delete(5, 6, defaultMatcher)) + require.NoError(t, db.Delete(ctx, 5, 6, defaultMatcher)) require.Equal(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones) - require.NoError(t, db.Delete(9, 11, defaultMatcher)) + require.NoError(t, db.Delete(ctx, 9, 11, defaultMatcher)) require.Equal(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } @@ -3038,7 +3038,7 @@ func TestCompactHeadWithDeletion(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) - err = db.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + err = db.Delete(context.Background(), 0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.NoError(t, err) // This recreates the bug. diff --git a/tsdb/head.go b/tsdb/head.go index cfda3f6449..0bccbdd841 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "io" "math" @@ -1426,7 +1427,7 @@ func (h *RangeHead) String() string { // Delete all samples in the range of [mint, maxt] for series that satisfy the given // label matchers. -func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { +func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error { // Do not delete anything beyond the currently valid range. mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime()) @@ -1439,6 +1440,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { var stones []tombstones.Stone for p.Next() { + if err := ctx.Err(); err != nil { + return errors.Wrap(err, "select series") + } + series := h.series.getByID(chunks.HeadSeriesRef(p.At())) if series == nil { level.Debug(h.logger).Log("msg", "Series not found in Head.Delete") @@ -1458,6 +1463,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { if p.Err() != nil { return p.Err() } + if ctx.Err() != nil { + return errors.Wrap(err, "select series") + } + if h.wal != nil { var enc record.Encoder if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index da7cbd5498..fc3ae62d29 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1131,7 +1131,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { require.NoError(t, head.Init(math.MinInt64)) - require.NoError(t, head.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))) + require.NoError(t, head.Delete(context.Background(), 0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))) }) } } @@ -1203,7 +1203,7 @@ func TestHeadDeleteSimple(t *testing.T) { // Delete the ranges. for _, r := range c.dranges { - require.NoError(t, head.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))) + require.NoError(t, head.Delete(context.Background(), r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))) } // Add more samples. @@ -1285,7 +1285,7 @@ func TestDeleteUntilCurMax(t *testing.T) { require.NoError(t, err) } require.NoError(t, app.Commit()) - require.NoError(t, hb.Delete(0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + require.NoError(t, hb.Delete(context.Background(), 0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) // Test the series returns no samples. The series is cleared only after compaction. q, err := NewBlockQuerier(hb, 0, 100000) @@ -1332,7 +1332,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) } - require.NoError(t, hb.Delete(0, int64(numSamples), labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + require.NoError(t, hb.Delete(context.Background(), 0, int64(numSamples), labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) require.NoError(t, hb.Truncate(1)) require.NoError(t, hb.Close()) @@ -1464,7 +1464,7 @@ func TestDelete_e2e(t *testing.T) { } for _, del := range dels { for _, r := range del.drange { - require.NoError(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) + require.NoError(t, hb.Delete(context.Background(), r.Mint, r.Maxt, del.ms...)) } matched := labels.Slice{} for _, l := range lbls { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index afb7e68157..f3228683ec 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2699,6 +2699,7 @@ func BenchmarkHeadQuerier(b *testing.B) { // This is a regression test for the case where gauge histograms were not handled by // populateWithDelChunkSeriesIterator correctly. func TestQueryWithDeletedHistograms(t *testing.T) { + ctx := context.Background() testcases := map[string]func(int) (*histogram.Histogram, *histogram.FloatHistogram){ "intCounter": func(i int) (*histogram.Histogram, *histogram.FloatHistogram) { return tsdbutil.GenerateTestHistogram(i), nil @@ -2743,7 +2744,7 @@ func TestQueryWithDeletedHistograms(t *testing.T) { require.NoError(t, err) // Delete the last 20. - err = db.Delete(80, 100, matcher) + err = db.Delete(ctx, 80, 100, matcher) require.NoError(t, err) chunkQuerier, err := db.ChunkQuerier(0, 100) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 2aee55e104..cd9ba5b89a 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -170,7 +170,7 @@ type apiFunc func(r *http.Request) apiFuncResult // TSDBAdminStats defines the tsdb interfaces used by the v1 API for admin operations as well as statistics. type TSDBAdminStats interface { CleanTombstones() error - Delete(mint, maxt int64, ms ...*labels.Matcher) error + Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error Snapshot(dir string, withHead bool) error Stats(statsByLabelName string, limit int) (*tsdb.Stats, error) WALReplayStatus() (tsdb.WALReplayStatus, error) @@ -1632,7 +1632,7 @@ func (api *API) deleteSeries(r *http.Request) apiFuncResult { if err != nil { return invalidParamError(err, "match[]") } - if err := api.db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil { + if err := api.db.Delete(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil { return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} } } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 33e524e8cb..799e0594fa 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -2767,9 +2767,9 @@ type fakeDB struct { err error } -func (f *fakeDB) CleanTombstones() error { return f.err } -func (f *fakeDB) Delete(int64, int64, ...*labels.Matcher) error { return f.err } -func (f *fakeDB) Snapshot(string, bool) error { return f.err } +func (f *fakeDB) CleanTombstones() error { return f.err } +func (f *fakeDB) Delete(context.Context, int64, int64, ...*labels.Matcher) error { return f.err } +func (f *fakeDB) Snapshot(string, bool) error { return f.err } func (f *fakeDB) Stats(statsByLabelName string, limit int) (_ *tsdb.Stats, retErr error) { dbDir, err := os.MkdirTemp("", "tsdb-api-ready") if err != nil {