From d9f13b62636532ec16d3cfc296153bd4dd3e7745 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Wed, 6 May 2026 13:39:15 -0400 Subject: [PATCH] Rewrite TestCancelCompactions to run faster Signed-off-by: Owen Williams --- tsdb/compact_test.go | 134 +++++++++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 55 deletions(-) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 8843777b93..42fe97d8c2 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "log/slog" "math" "math/rand" "os" @@ -1368,69 +1369,92 @@ func TestDisableAutoCompactions(t *testing.T) { // any running compaction is cancelled to unblock closing the db. func TestCancelCompactions(t *testing.T) { t.Parallel() + + db := newTestDB(t) + + compactionStarted := make(chan struct{}) + compactionCanceled := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + originalCancel := db.compactCancel + db.compactCancel = func() { + cancel() + originalCancel() + } + + var startedOnce sync.Once + var canceledOnce sync.Once + db.compactor = &mockCompactorFn{ + planFn: func() ([]string, error) { + return []string{"block-a", "block-b"}, nil + }, + compactFn: func() ([]ulid.ULID, error) { + startedOnce.Do(func() { + close(compactionStarted) + }) + <-ctx.Done() + canceledOnce.Do(func() { + close(compactionCanceled) + }) + return nil, ctx.Err() + }, + writeFn: func() ([]ulid.ULID, error) { + return nil, nil + }, + } + + select { + case db.compactc <- struct{}{}: + case <-time.After(time.Second): + t.Fatal("triggering compaction timed out") + } + + select { + case <-compactionStarted: + case <-time.After(time.Second): + t.Fatal("compaction did not start") + } + + start := time.Now() + require.NoError(t, db.Close()) + require.Less(t, time.Since(start), time.Second) + + select { + case <-compactionCanceled: + case <-time.After(30 * time.Second): + t.Fatal("compaction was not canceled") + } +} + +func TestCanceledCompactionDoesNotMarkBlocksFailed(t *testing.T) { + t.Parallel() + tmpdir := t.TempDir() + blockDirs := []string{ + createBlock(t, tmpdir, genSeries(1, 1, 0, 100)), + createBlock(t, tmpdir, genSeries(1, 1, 100, 200)), + } - // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000)) - createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000)) - createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. - - // Copy the db so we have an exact copy to compare compaction times. - tmpdirCopy := t.TempDir() - err := fileutil.CopyDirs(tmpdir, tmpdirCopy) + compactor, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{200}, nil, nil) require.NoError(t, err) - // Measure the compaction time without interrupting it. - var timeCompactionUninterrupted time.Duration - { - db, err := open(tmpdir, promslog.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil) + _, err = compactor.CompactWithBlockPopulator(tmpdir, blockDirs, nil, blockPopulatorFunc( + func(context.Context, *CompactorMetrics, *slog.Logger, chunkenc.Pool, storage.VerticalChunkSeriesMergeFunc, []BlockReader, *BlockMeta, IndexWriter, ChunkWriter, IndexReaderPostingsFunc) error { + return fmt.Errorf("populate block: %w", context.Canceled) + }, + )) + require.ErrorIs(t, err, context.Canceled) + + for _, dir := range blockDirs { + meta, _, err := readMetaFile(dir) require.NoError(t, err) - require.Len(t, db.Blocks(), 3, "initial block count mismatch") - require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch") - db.compactc <- struct{}{} // Trigger a compaction. - for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 { - time.Sleep(3 * time.Millisecond) - } - - start := time.Now() - for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) != 1 { - time.Sleep(3 * time.Millisecond) - } - timeCompactionUninterrupted = time.Since(start) - - require.NoError(t, db.Close()) + require.Falsef(t, meta.Compaction.Failed, "block %s should not be marked as compaction failed", meta.ULID) } - // Measure the compaction time when closing the db in the middle of compaction. - { - db, err := open(tmpdirCopy, promslog.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil) - require.NoError(t, err) - require.Len(t, db.Blocks(), 3, "initial block count mismatch") - require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch") - db.compactc <- struct{}{} // Trigger a compaction. +} - for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 { - time.Sleep(3 * time.Millisecond) - } +type blockPopulatorFunc func(context.Context, *CompactorMetrics, *slog.Logger, chunkenc.Pool, storage.VerticalChunkSeriesMergeFunc, []BlockReader, *BlockMeta, IndexWriter, ChunkWriter, IndexReaderPostingsFunc) error - start := time.Now() - require.NoError(t, db.Close()) - actT := time.Since(start) - - expT := timeCompactionUninterrupted / 2 // Closing the db in the middle of compaction should less than half the time. - require.Less(t, actT, expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) - - // Make sure that no blocks were marked as compaction failed. - // This checks that the `context.Canceled` error is properly checked at all levels: - // - callers should check with errors.Is() instead of ==. - readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger()) - require.NoError(t, err) - blocks, err := readOnlyDB.Blocks() - require.NoError(t, err) - for i, b := range blocks { - require.Falsef(t, b.Meta().Compaction.Failed, "block %d (%s) should not be marked as compaction failed", i, b.Meta().ULID) - } - require.NoError(t, readOnlyDB.Close()) - } +func (f blockPopulatorFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger *slog.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error { + return f(ctx, metrics, logger, chunkPool, mergeFunc, blocks, meta, indexw, chunkw, postingsFunc) } // TestDeleteCompactionBlockAfterFailedReload ensures that a failed reloadBlocks immediately after a compaction