diff --git a/tsdb/db.go b/tsdb/db.go index 2d35e3fb00..9ab150c5b4 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -992,9 +992,14 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn db.metrics.maxBytes.Set(float64(maxBytes)) db.metrics.retentionDuration.Set((time.Duration(opts.RetentionDuration) * time.Millisecond).Seconds()) + // Calling db.reload() calls db.reloadBlocks() which requires cmtx to be locked. + db.cmtx.Lock() if err := db.reload(); err != nil { + db.cmtx.Unlock() return nil, err } + db.cmtx.Unlock() + // Set the min valid time for the ingested samples // to be no lower than the maxt of the last block. minValidTime := int64(math.MinInt64) @@ -1363,6 +1368,7 @@ func (db *DB) CompactOOOHead(ctx context.Context) error { // Callback for testing. var compactOOOHeadTestingCallback func() +// The db.cmtx mutex should be held before calling this method. func (db *DB) compactOOOHead(ctx context.Context) error { if !db.oooWasEnabled.Load() { return nil @@ -1417,6 +1423,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error { // compactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given. // Each ULID in the result corresponds to a block in a unique time range. +// The db.cmtx mutex should be held before calling this method. func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID, err error) { start := time.Now() @@ -1461,7 +1468,7 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID } // compactHead compacts the given RangeHead. -// The compaction mutex should be held before calling this method. +// The db.cmtx should be held before calling this method. func (db *DB) compactHead(head *RangeHead) error { uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil) if err != nil { @@ -1487,7 +1494,7 @@ func (db *DB) compactHead(head *RangeHead) error { } // compactBlocks compacts all the eligible on-disk blocks. -// The compaction mutex should be held before calling this method. +// The db.cmtx should be held before calling this method. func (db *DB) compactBlocks() (err error) { // Check for compactions of multiple blocks. for { @@ -1544,6 +1551,7 @@ func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) { } // reload reloads blocks and truncates the head and its WAL. +// The db.cmtx mutex should be held before calling this method. func (db *DB) reload() error { if err := db.reloadBlocks(); err != nil { return fmt.Errorf("reloadBlocks: %w", err) @@ -1560,6 +1568,7 @@ func (db *DB) reload() error { // reloadBlocks reloads blocks without touching head. // Blocks that are obsolete due to replacement or retention will be deleted. +// The db.cmtx mutex should be held before calling this method. func (db *DB) reloadBlocks() (err error) { defer func() { if err != nil { @@ -1568,13 +1577,9 @@ func (db *DB) reloadBlocks() (err error) { db.metrics.reloads.Inc() }() - // Now that we reload TSDB every minute, there is a high chance for a race condition with a reload - // triggered by CleanTombstones(). We need to lock the reload to avoid the situation where - // a normal reload and CleanTombstones try to delete the same block. - db.mtx.Lock() - defer db.mtx.Unlock() - + db.mtx.RLock() loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory) + db.mtx.RUnlock() if err != nil { return err } @@ -1600,11 +1605,13 @@ func (db *DB) reloadBlocks() (err error) { if len(corrupted) > 0 { // Corrupted but no child loaded for it. // Close all new blocks to release the lock for windows. + db.mtx.RLock() for _, block := range loadable { if _, open := getBlock(db.blocks, block.Meta().ULID); !open { block.Close() } } + db.mtx.RUnlock() errs := tsdb_errors.NewMulti() for ulid, err := range corrupted { if err != nil { @@ -1643,8 +1650,10 @@ func (db *DB) reloadBlocks() (err error) { }) // Swap new blocks first for subsequently created readers to be seen. + db.mtx.Lock() oldBlocks := db.blocks db.blocks = toLoad + db.mtx.Unlock() // Only check overlapping blocks when overlapping compaction is enabled. if db.opts.EnableOverlappingCompaction { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 7dc60a7304..826be61a42 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1352,61 +1352,6 @@ func TestTombstoneCleanFail(t *testing.T) { require.Len(t, intersection(oldBlockDirs, actualBlockDirs), len(actualBlockDirs)-1) } -// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation -// and retention limit policies, when triggered at the same time, -// won't race against each other. -func TestTombstoneCleanRetentionLimitsRace(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - opts := DefaultOptions() - var wg sync.WaitGroup - - // We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones() - // reload try to delete the same block. Without the correct lock placement, it can happen if a - // block is marked for deletion due to retention limits and also has tombstones to be cleaned at - // the same time. - // - // That is something tricky to trigger, so let's try several times just to make sure. - for i := 0; i < 20; i++ { - t.Run(fmt.Sprintf("iteration%d", i), func(t *testing.T) { - db := openTestDB(t, opts, nil) - totalBlocks := 20 - dbDir := db.Dir() - // Generate some blocks with old mint (near epoch). - for j := 0; j < totalBlocks; j++ { - blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1)) - block, err := OpenBlock(nil, blockDir, nil, nil) - require.NoError(t, err) - // Cover block with tombstones so it can be deleted with CleanTombstones() as well. - tomb := tombstones.NewMemTombstones() - tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1}) - block.tombstones = tomb - - db.blocks = append(db.blocks, block) - } - - wg.Add(2) - // Run reload and CleanTombstones together, with a small time window randomization - go func() { - defer wg.Done() - time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) - require.NoError(t, db.reloadBlocks()) - }() - go func() { - defer wg.Done() - time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) - require.NoError(t, db.CleanTombstones()) - }() - - wg.Wait() - - require.NoError(t, db.Close()) - }) - } -} - func intersection(oldBlocks, actualBlocks []string) (intersection []string) { hash := make(map[string]bool) for _, e := range oldBlocks {