From b880cea613577f50377832efcf801e0172d2ad1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Mon, 2 Oct 2023 14:47:00 +0100 Subject: [PATCH 1/5] Fix locks in db.reloadBlocks() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This partially reverts ae3d392aa9c3a5c5f92f8116738c5b32c98b09a7. ae3d392aa9c3a5c5f92f8116738c5b32c98b09a7 added a call to db.mtx.Lock() that lasts for the entire duration of db.reloadBlocks(), previous db.mtx would be locked only during critical part of db.reloadBlocks(). The motivation was to protect against races: https://github.com/prometheus/prometheus/pull/8007/commits/9e0351e161dcf60cd6d06437b6a2221805910498#r555699794 The 'reloads' being mentioned are (I think) reloadBlocks() calls, rather than db.reload() or other methods. TestTombstoneCleanRetentionLimitsRace was added to catch this but I wasn't able to ever get any error out of it, even after disabling all calls to db.mtx in reloadBlocks() and CleanTombstones(). To make things more complicated CleanupTombstones() itself calls reloadBlocks(), so it seems that the real issue is that we might have concurrent calls to reloadBlocks(). The problem with this change is that db.reloadBlocks() can take a very long time, that's because it might need to load very large blocks from disk, which is slow. While db.mtx is locked a large chunk of the db is locked, including queries, since db.mtx read lock is needed for db.Querier() call. One of the issues this manifests itself as is a gap in all metrics and blocked queries just after a large block compaction happens. When compaction merges multiple day-or-more blocks into a week-or-more block it create a single very big block. After that block is written it needs to be loaded and that seems to be taking many seconds (30-45), during which mtx is held and everything is blocked. Turns out that there is another lock that is more fine grained and aimed at this specific use case: // cmtx ensures that compactions and deletions don't run simultaneously. cmtx sync.Mutex All calls to reloadBlocks() are wrapped inside cmtx lock. The only exception is db.reload() which this change fixes. We can't add cmtx lock inside reloadBlocks() itself because it's called by a number of functions, some of which are already holding cmtx. Looking at the code I think it is sufficient to hold cmtx and skip a reloadBlocks() wide mtx call. Signed-off-by: Łukasz Mierzwa --- tsdb/db.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 2d35e3fb00..ea8cbfff0b 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -992,9 +992,14 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn db.metrics.maxBytes.Set(float64(maxBytes)) db.metrics.retentionDuration.Set((time.Duration(opts.RetentionDuration) * time.Millisecond).Seconds()) + // Calling db.reload() calls db.reloadBlocks() which requires cmtx to be locked. + db.cmtx.Lock() if err := db.reload(); err != nil { + db.cmtx.Unlock() return nil, err } + db.cmtx.Unlock() + // Set the min valid time for the ingested samples // to be no lower than the maxt of the last block. minValidTime := int64(math.MinInt64) @@ -1568,13 +1573,9 @@ func (db *DB) reloadBlocks() (err error) { db.metrics.reloads.Inc() }() - // Now that we reload TSDB every minute, there is a high chance for a race condition with a reload - // triggered by CleanTombstones(). We need to lock the reload to avoid the situation where - // a normal reload and CleanTombstones try to delete the same block. - db.mtx.Lock() - defer db.mtx.Unlock() - + db.mtx.RLock() loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory) + db.mtx.RUnlock() if err != nil { return err } @@ -1643,8 +1644,10 @@ func (db *DB) reloadBlocks() (err error) { }) // Swap new blocks first for subsequently created readers to be seen. + db.mtx.Lock() oldBlocks := db.blocks db.blocks = toLoad + db.mtx.Unlock() // Only check overlapping blocks when overlapping compaction is enabled. if db.opts.EnableOverlappingCompaction { From 92788d313ad15cf80190eb6113bb01ff5e4f9828 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Mon, 2 Oct 2023 16:05:34 +0100 Subject: [PATCH 2/5] Remove TestTombstoneCleanRetentionLimitsRace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This test ensures that running db.reloadBlocks() and db.CleanTombstones() at the same time doesn't race. The problem is that CleanTombstones() is a public method while reloadBlocks() is internal. CleanTombstones() sets db.cmtx lock while reloadBlocks() is not protected by any locks at all, it expects the public method through which it was called to do it. So having a race between these two is not unexpected and we shouldn't really be testing this. db.cmtx ensures that no other function can be modifying the list of open blocks and so the scenario tested here cannot happen. If it would happen it would be only because some other method doesn't aquire db.ctmx lock, something this test cannot detect. Signed-off-by: Łukasz Mierzwa --- tsdb/db_test.go | 55 ------------------------------------------------- 1 file changed, 55 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index b858e6f524..a2861b7bf0 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1350,61 +1350,6 @@ func TestTombstoneCleanFail(t *testing.T) { require.Len(t, intersection(oldBlockDirs, actualBlockDirs), len(actualBlockDirs)-1) } -// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation -// and retention limit policies, when triggered at the same time, -// won't race against each other. -func TestTombstoneCleanRetentionLimitsRace(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - opts := DefaultOptions() - var wg sync.WaitGroup - - // We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones() - // reload try to delete the same block. Without the correct lock placement, it can happen if a - // block is marked for deletion due to retention limits and also has tombstones to be cleaned at - // the same time. - // - // That is something tricky to trigger, so let's try several times just to make sure. - for i := 0; i < 20; i++ { - t.Run(fmt.Sprintf("iteration%d", i), func(t *testing.T) { - db := openTestDB(t, opts, nil) - totalBlocks := 20 - dbDir := db.Dir() - // Generate some blocks with old mint (near epoch). - for j := 0; j < totalBlocks; j++ { - blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1)) - block, err := OpenBlock(nil, blockDir, nil, nil) - require.NoError(t, err) - // Cover block with tombstones so it can be deleted with CleanTombstones() as well. - tomb := tombstones.NewMemTombstones() - tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1}) - block.tombstones = tomb - - db.blocks = append(db.blocks, block) - } - - wg.Add(2) - // Run reload and CleanTombstones together, with a small time window randomization - go func() { - defer wg.Done() - time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) - require.NoError(t, db.reloadBlocks()) - }() - go func() { - defer wg.Done() - time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) - require.NoError(t, db.CleanTombstones()) - }() - - wg.Wait() - - require.NoError(t, db.Close()) - }) - } -} - func intersection(oldBlocks, actualBlocks []string) (intersection []string) { hash := make(map[string]bool) for _, e := range oldBlocks { From d106b3beb7f9069bbf00c63e7b6fcccceb97ffb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Wed, 18 Oct 2023 09:29:59 +0100 Subject: [PATCH 3/5] Wrap db.blocks read in a read lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We don't hold db.mtx lock when trying to read db.blocks here so we need a read lock around this loop. Signed-off-by: Łukasz Mierzwa --- tsdb/db.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tsdb/db.go b/tsdb/db.go index ea8cbfff0b..001515081d 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1518,7 +1518,9 @@ func (db *DB) compactBlocks() (err error) { default: } + db.mtx.RLock() uids, err := db.compactor.Compact(db.dir, plan, db.blocks) + db.mtx.RUnlock() if err != nil { return fmt.Errorf("compact %s: %w", plan, err) } @@ -1601,11 +1603,13 @@ func (db *DB) reloadBlocks() (err error) { if len(corrupted) > 0 { // Corrupted but no child loaded for it. // Close all new blocks to release the lock for windows. + db.mtx.RLock() for _, block := range loadable { if _, open := getBlock(db.blocks, block.Meta().ULID); !open { block.Close() } } + db.mtx.RUnlock() errs := tsdb_errors.NewMulti() for ulid, err := range corrupted { if err != nil { From a1740cd2e799807922d50f60b71d96aa6e3a1f09 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Wed, 28 Aug 2024 10:47:40 +0100 Subject: [PATCH 4/5] Remove unnecessary locks Compact() is an uppercase function that deals with locks on its own, so we shouldn't have a lock around it. Signed-off-by: Lukasz Mierzwa --- tsdb/db.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 001515081d..ccb0fa62bd 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1518,9 +1518,7 @@ func (db *DB) compactBlocks() (err error) { default: } - db.mtx.RLock() uids, err := db.compactor.Compact(db.dir, plan, db.blocks) - db.mtx.RUnlock() if err != nil { return fmt.Errorf("compact %s: %w", plan, err) } From e3728122b2ae1f35f4a7f9322f1bac0aadd44fcc Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Thu, 9 Jan 2025 17:20:10 +0000 Subject: [PATCH 5/5] Update comments for methods that require a lock Signed-off-by: Lukasz Mierzwa --- tsdb/db.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index ccb0fa62bd..9ab150c5b4 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1368,6 +1368,7 @@ func (db *DB) CompactOOOHead(ctx context.Context) error { // Callback for testing. var compactOOOHeadTestingCallback func() +// The db.cmtx mutex should be held before calling this method. func (db *DB) compactOOOHead(ctx context.Context) error { if !db.oooWasEnabled.Load() { return nil @@ -1422,6 +1423,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error { // compactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given. // Each ULID in the result corresponds to a block in a unique time range. +// The db.cmtx mutex should be held before calling this method. func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID, err error) { start := time.Now() @@ -1466,7 +1468,7 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID } // compactHead compacts the given RangeHead. -// The compaction mutex should be held before calling this method. +// The db.cmtx should be held before calling this method. func (db *DB) compactHead(head *RangeHead) error { uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil) if err != nil { @@ -1492,7 +1494,7 @@ func (db *DB) compactHead(head *RangeHead) error { } // compactBlocks compacts all the eligible on-disk blocks. -// The compaction mutex should be held before calling this method. +// The db.cmtx should be held before calling this method. func (db *DB) compactBlocks() (err error) { // Check for compactions of multiple blocks. for { @@ -1549,6 +1551,7 @@ func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) { } // reload reloads blocks and truncates the head and its WAL. +// The db.cmtx mutex should be held before calling this method. func (db *DB) reload() error { if err := db.reloadBlocks(); err != nil { return fmt.Errorf("reloadBlocks: %w", err) @@ -1565,6 +1568,7 @@ func (db *DB) reload() error { // reloadBlocks reloads blocks without touching head. // Blocks that are obsolete due to replacement or retention will be deleted. +// The db.cmtx mutex should be held before calling this method. func (db *DB) reloadBlocks() (err error) { defer func() { if err != nil {