diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 44a0921eec..8843777b93 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -22,10 +22,10 @@ import ( "os" "path" "path/filepath" - "runtime" "strconv" "sync" "testing" + "testing/synctest" "time" "github.com/oklog/ulid/v2" @@ -1978,162 +1978,127 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) { } func TestDelayedCompaction(t *testing.T) { - // The delay is chosen in such a way as to not slow down the tests, but also to make - // the effective compaction duration negligible compared to it, so that the duration comparisons make sense. - delay := 1000 * time.Millisecond + delay := 5 * time.Second + label := labels.FromStrings("foo", "bar") - waitUntilCompactedAndCheck := func(db *DB) { - t.Helper() - start := time.Now() - for db.head.compactable() { - // This simulates what happens at the end of commits, for less busy DB, a compaction - // is triggered every minute. This is to speed up the test. - select { - case db.compactc <- struct{}{}: - default: - } - time.Sleep(time.Millisecond) + appendSamples := func(t *testing.T, db *DB, timestamps ...int64) { + app := db.Appender(context.Background()) + for _, ts := range timestamps { + _, err := app.Append(0, label, ts, 0) + require.NoError(t, err) } - duration := time.Since(start) - // Only waited for one offset: offset<=delay<<<2*offset - require.Greater(t, duration, db.opts.CompactionDelay) - require.Less(t, duration, 2*db.opts.CompactionDelay) + require.NoError(t, app.Commit()) } - compactAndCheck := func(db *DB) { - t.Helper() - start := time.Now() - db.Compact(context.Background()) - for db.head.compactable() { - time.Sleep(time.Millisecond) + compactorRanCount := func(db *DB) float64 { + return prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) + } + + t.Run("delay not enabled", func(t *testing.T) { + t.Parallel() + db := newTestDB(t, withRngs(10)) + + compactAndCheck := func() { + start := time.Now() + db.Compact(context.Background()) + require.False(t, db.head.compactable()) + require.Less(t, time.Since(start), delay) } - duration := time.Since(start) - require.Less(t, duration, delay) - } - cases := []struct { - name string - // The delays are chosen in such a way as to not slow down the tests, but also in a way to make the - // effective compaction duration negligible compared to them, so that the duration comparisons make sense. - compactionDelay time.Duration - }{ - { - "delayed compaction not enabled", - 0, - }, - { - "delayed compaction enabled", - delay, - }, - } + db.DisableCompactions() + appendSamples(t, db, 0, 11, 21) + compactAndCheck() + require.Equal(t, 1.0, compactorRanCount(db)) - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Time imprecision on windows makes the test flaky, see https://github.com/prometheus/prometheus/issues/16450") - } - t.Parallel() + db.DisableCompactions() + appendSamples(t, db, 31, 41) + compactAndCheck() + require.Equal(t, 3.0, compactorRanCount(db)) + }) - var opts *Options - if c.compactionDelay > 0 { - opts = &Options{CompactionDelay: c.compactionDelay} - } - db := newTestDB(t, withOpts(opts), withRngs(10)) + t.Run("delay enabled", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + db := newTestDB(t, withOpts(&Options{CompactionDelay: delay}), withRngs(10)) - label := labels.FromStrings("foo", "bar") - - // The first compaction is expected to result in 1 block. - db.DisableCompactions() - app := db.Appender(context.Background()) - _, err := app.Append(0, label, 0, 0) - require.NoError(t, err) - _, err = app.Append(0, label, 11, 0) - require.NoError(t, err) - _, err = app.Append(0, label, 21, 0) - require.NoError(t, err) - require.NoError(t, app.Commit()) - - if c.compactionDelay == 0 { - // When delay is not enabled, compaction should run on the first trigger. - compactAndCheck(db) - } else { - db.EnableCompactions() - waitUntilCompactedAndCheck(db) - // The db.compactc signals have been processed multiple times since a compaction is triggered every 1ms by waitUntilCompacted. - // This implies that the compaction delay doesn't block or wait on the initial trigger. - // 3 is an arbitrary value because it's difficult to determine the precise value. - require.GreaterOrEqual(t, prom_testutil.ToFloat64(db.metrics.compactionsTriggered)-prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 3.0) - // The delay doesn't change the head blocks alignment. - require.Eventually(t, func() bool { - return db.head.MinTime() == db.compactor.(*LeveledCompactor).ranges[0]+1 - }, 500*time.Millisecond, 10*time.Millisecond) - // One compaction was run and one block was produced. - require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)) - } - - // The second compaction is expected to result in 2 blocks. - // This ensures that the logic for compaction delay doesn't only work for the first compaction, but also takes into account the future compactions. - // This also ensures that no delay happens between consecutive compactions. - db.DisableCompactions() - app = db.Appender(context.Background()) - _, err = app.Append(0, label, 31, 0) - require.NoError(t, err) - _, err = app.Append(0, label, 41, 0) - require.NoError(t, err) - require.NoError(t, app.Commit()) - - if c.compactionDelay == 0 { - // Compaction should still run on the first trigger. - compactAndCheck(db) - } else { - db.EnableCompactions() - waitUntilCompactedAndCheck(db) - } - - // Two other compactions were run. - require.Eventually(t, func() bool { - return prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) == 3.0 - }, 500*time.Millisecond, 10*time.Millisecond) - - if c.compactionDelay == 0 { - return - } - - // This test covers a special case. If auto compaction is in a delay period and a manual compaction is triggered, - // auto compaction should stop waiting for the delay if the head is no longer compactable. - // Of course, if the head is still compactable after the manual compaction, auto compaction will continue waiting for the same delay. - getTimeWhenCompactionDelayStarted := func() time.Time { - t.Helper() + getDelayStart := func() time.Time { db.cmtx.Lock() defer db.cmtx.Unlock() return db.timeWhenCompactionDelayStarted } - db.DisableCompactions() - app = db.Appender(context.Background()) - _, err = app.Append(0, label, 51, 0) - require.NoError(t, err) - require.NoError(t, app.Commit()) + // triggerAutoCompaction sends a compaction trigger and waits for it to be processed. + triggerAutoCompaction := func() { + db.compactc <- struct{}{} + synctest.Wait() + } + // First compaction: expect 1 block. + db.DisableCompactions() + appendSamples(t, db, 0, 11, 21) + db.EnableCompactions() + + // First trigger starts the delay period; no compaction yet. + triggerAutoCompaction() + require.NotZero(t, getDelayStart()) + require.Equal(t, 0.0, compactorRanCount(db)) + + // Compaction doesn't run before the delay expires. + time.Sleep(delay / 2) + triggerAutoCompaction() + require.Equal(t, 0.0, compactorRanCount(db)) + + // Compaction runs once the delay expires. + time.Sleep(delay / 2) + triggerAutoCompaction() + // One compaction was run and one block was produced. + require.Equal(t, 1.0, compactorRanCount(db)) + // The compaction delay doesn't block or wait on the trigger; + // 3 triggers were processed above without blocking. + require.GreaterOrEqual(t, prom_testutil.ToFloat64(db.metrics.compactionsTriggered)-prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 3.0) + // The delay doesn't change the head blocks alignment. + require.Equal(t, db.compactor.(*LeveledCompactor).ranges[0]+1, db.head.MinTime()) + + // Second compaction: expect 2 more blocks. + // This ensures that the logic for compaction delay doesn't only work for the first compaction, but also takes into account the future compactions. + // This also ensures that no delay happens between consecutive compactions. + db.DisableCompactions() + appendSamples(t, db, 31, 41) + db.EnableCompactions() + + triggerAutoCompaction() + require.Equal(t, 1.0, compactorRanCount(db)) + + time.Sleep(delay / 2) + triggerAutoCompaction() + require.Equal(t, 1.0, compactorRanCount(db)) + + time.Sleep(delay / 2) + triggerAutoCompaction() + require.Equal(t, 3.0, compactorRanCount(db)) + + // This test covers a special case. If auto compaction is in a delay period and a manual compaction is triggered, + // auto compaction should stop waiting for the delay if the head is no longer compactable. + // Of course, if the head is still compactable after the manual compaction, auto compaction will continue waiting for the same delay. + db.DisableCompactions() + appendSamples(t, db, 51) require.True(t, db.head.compactable()) + db.EnableCompactions() // Trigger an auto compaction. - db.compactc <- struct{}{} + triggerAutoCompaction() // That made auto compaction start waiting for the delay. - require.Eventually(t, func() bool { - return !getTimeWhenCompactionDelayStarted().IsZero() - }, 100*time.Millisecond, 10*time.Millisecond) + require.NotZero(t, getDelayStart()) + // Trigger a manual compaction. require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, 50.0))) - require.Equal(t, 4.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)) + require.Equal(t, 4.0, compactorRanCount(db)) + // Re-trigger an auto compaction. - db.compactc <- struct{}{} + triggerAutoCompaction() // That made auto compaction stop waiting for the delay. - require.Eventually(t, func() bool { - return getTimeWhenCompactionDelayStarted().IsZero() - }, 100*time.Millisecond, 10*time.Millisecond) + require.Zero(t, getDelayStart()) }) - } + }) } // TestDelayedCompactionDoesNotBlockUnrelatedOps makes sure that when delayed compaction is enabled,