Rewrite TestCancelCompactions to run faster

Signed-off-by: Owen Williams <owen.williams@grafana.com>
This commit is contained in:
Owen Williams 2026-05-06 13:39:15 -04:00
parent bd4758a835
commit d9f13b6263
No known key found for this signature in database
GPG Key ID: 711C61A216D34A69

View File

@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"math"
"math/rand"
"os"
@ -1368,69 +1369,92 @@ func TestDisableAutoCompactions(t *testing.T) {
// any running compaction is cancelled to unblock closing the db.
func TestCancelCompactions(t *testing.T) {
t.Parallel()
db := newTestDB(t)
compactionStarted := make(chan struct{})
compactionCanceled := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
originalCancel := db.compactCancel
db.compactCancel = func() {
cancel()
originalCancel()
}
var startedOnce sync.Once
var canceledOnce sync.Once
db.compactor = &mockCompactorFn{
planFn: func() ([]string, error) {
return []string{"block-a", "block-b"}, nil
},
compactFn: func() ([]ulid.ULID, error) {
startedOnce.Do(func() {
close(compactionStarted)
})
<-ctx.Done()
canceledOnce.Do(func() {
close(compactionCanceled)
})
return nil, ctx.Err()
},
writeFn: func() ([]ulid.ULID, error) {
return nil, nil
},
}
select {
case db.compactc <- struct{}{}:
case <-time.After(time.Second):
t.Fatal("triggering compaction timed out")
}
select {
case <-compactionStarted:
case <-time.After(time.Second):
t.Fatal("compaction did not start")
}
start := time.Now()
require.NoError(t, db.Close())
require.Less(t, time.Since(start), time.Second)
select {
case <-compactionCanceled:
case <-time.After(30 * time.Second):
t.Fatal("compaction was not canceled")
}
}
func TestCanceledCompactionDoesNotMarkBlocksFailed(t *testing.T) {
t.Parallel()
tmpdir := t.TempDir()
blockDirs := []string{
createBlock(t, tmpdir, genSeries(1, 1, 0, 100)),
createBlock(t, tmpdir, genSeries(1, 1, 100, 200)),
}
// Create some blocks to fall within the compaction range.
createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000))
createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000))
createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one.
// Copy the db so we have an exact copy to compare compaction times.
tmpdirCopy := t.TempDir()
err := fileutil.CopyDirs(tmpdir, tmpdirCopy)
compactor, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{200}, nil, nil)
require.NoError(t, err)
// Measure the compaction time without interrupting it.
var timeCompactionUninterrupted time.Duration
{
db, err := open(tmpdir, promslog.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
_, err = compactor.CompactWithBlockPopulator(tmpdir, blockDirs, nil, blockPopulatorFunc(
func(context.Context, *CompactorMetrics, *slog.Logger, chunkenc.Pool, storage.VerticalChunkSeriesMergeFunc, []BlockReader, *BlockMeta, IndexWriter, ChunkWriter, IndexReaderPostingsFunc) error {
return fmt.Errorf("populate block: %w", context.Canceled)
},
))
require.ErrorIs(t, err, context.Canceled)
for _, dir := range blockDirs {
meta, _, err := readMetaFile(dir)
require.NoError(t, err)
require.Len(t, db.Blocks(), 3, "initial block count mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch")
db.compactc <- struct{}{} // Trigger a compaction.
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 {
time.Sleep(3 * time.Millisecond)
}
start := time.Now()
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) != 1 {
time.Sleep(3 * time.Millisecond)
}
timeCompactionUninterrupted = time.Since(start)
require.NoError(t, db.Close())
require.Falsef(t, meta.Compaction.Failed, "block %s should not be marked as compaction failed", meta.ULID)
}
// Measure the compaction time when closing the db in the middle of compaction.
{
db, err := open(tmpdirCopy, promslog.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
require.NoError(t, err)
require.Len(t, db.Blocks(), 3, "initial block count mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "initial compaction counter mismatch")
db.compactc <- struct{}{} // Trigger a compaction.
}
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.PopulatingBlocks) <= 0 {
time.Sleep(3 * time.Millisecond)
}
type blockPopulatorFunc func(context.Context, *CompactorMetrics, *slog.Logger, chunkenc.Pool, storage.VerticalChunkSeriesMergeFunc, []BlockReader, *BlockMeta, IndexWriter, ChunkWriter, IndexReaderPostingsFunc) error
start := time.Now()
require.NoError(t, db.Close())
actT := time.Since(start)
expT := timeCompactionUninterrupted / 2 // Closing the db in the middle of compaction should less than half the time.
require.Less(t, actT, expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT)
// Make sure that no blocks were marked as compaction failed.
// This checks that the `context.Canceled` error is properly checked at all levels:
// - callers should check with errors.Is() instead of ==.
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger())
require.NoError(t, err)
blocks, err := readOnlyDB.Blocks()
require.NoError(t, err)
for i, b := range blocks {
require.Falsef(t, b.Meta().Compaction.Failed, "block %d (%s) should not be marked as compaction failed", i, b.Meta().ULID)
}
require.NoError(t, readOnlyDB.Close())
}
func (f blockPopulatorFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger *slog.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error {
return f(ctx, metrics, logger, chunkPool, mergeFunc, blocks, meta, indexw, chunkw, postingsFunc)
}
// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reloadBlocks immediately after a compaction