mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-04 20:06:12 +02:00
Add unit test to detect race in Head.deleteSeriesByID
Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
parent
5504d39b3e
commit
daeef8b7b6
@ -7246,3 +7246,77 @@ func TestHistogramStalenessConversionMetrics(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestWALReplayRaceWithStaleSeriesCompaction verifies that deleteSeriesByID correctly locks the
|
||||
// hash shard (not only the ref shard) when deleting from the hashes map.
|
||||
// The race only occurs when Prometheus restarts after having done a stale series compaction because
|
||||
// deleteSeriesByID is not used otherwise.
|
||||
func TestWALReplayRaceWithStaleSeriesCompaction(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(1000, false)
|
||||
// A small stripe size ensures many series share hash shards, increasing
|
||||
// the likelihood that deleteSeriesByID and getOrCreateWithOptionalID
|
||||
// contend on the same shard during WAL replay.
|
||||
opts.StripeSize = 32
|
||||
head, _ := newTestHeadWithOptions(t, compression.None, opts)
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
appendSample := func(lbls labels.Labels, ts int64, val float64) {
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Append(0, lbls, ts, val)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
// Step 1: Create a batch of series and make them stale.
|
||||
const numStaleSeries = 500
|
||||
staleLbls := make([]labels.Labels, numStaleSeries)
|
||||
for i := range numStaleSeries {
|
||||
staleLbls[i] = labels.FromStrings("__name__", "stale_metric", "i", strconv.Itoa(i))
|
||||
appendSample(staleLbls[i], 100, float64(i))
|
||||
}
|
||||
for _, lbl := range staleLbls {
|
||||
appendSample(lbl, 200, math.Float64frombits(value.StaleNaN))
|
||||
}
|
||||
require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries())
|
||||
|
||||
// Step 2: Truncate stale series. This removes them from the Head and
|
||||
// writes tombstone records (with Mint=MinInt64, Maxt=MaxInt64) to the WAL.
|
||||
staleRefs := make([]storage.SeriesRef, 0, numStaleSeries)
|
||||
for i := range numStaleSeries {
|
||||
ms := head.series.getByHash(staleLbls[i].Hash(), staleLbls[i])
|
||||
require.NotNil(t, ms)
|
||||
staleRefs = append(staleRefs, storage.SeriesRef(ms.ref))
|
||||
}
|
||||
require.NoError(t, head.truncateStaleSeries(staleRefs, 300))
|
||||
require.Equal(t, uint64(0), head.NumStaleSeries())
|
||||
require.Equal(t, uint64(0), head.NumSeries())
|
||||
|
||||
// Step 3: Add new series AFTER the truncation. In the WAL, these series
|
||||
// records appear after the tombstone records. During replay, the main
|
||||
// goroutine will create these series (via getOrCreateWithOptionalID, which
|
||||
// accesses hashes[hashShard] under locks[hashShard]) concurrently with
|
||||
// the walSubsetProcessor goroutines deleting the stale series (via
|
||||
// deleteSeriesByID, which must also lock the correct hashShard).
|
||||
const numNewSeries = 500
|
||||
for i := range numNewSeries {
|
||||
lbl := labels.FromStrings("__name__", "new_metric", "i", strconv.Itoa(i))
|
||||
appendSample(lbl, 300, float64(i))
|
||||
}
|
||||
require.Equal(t, uint64(numNewSeries), head.NumSeries())
|
||||
|
||||
// Step 4: Close and re-open the Head to trigger WAL replay.
|
||||
// With the buggy locking, the race detector should catch the data race
|
||||
// between the main goroutine (creating series) and worker goroutines
|
||||
// (deleting stale series) during replay.
|
||||
require.NoError(t, head.Close())
|
||||
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(head.opts.ChunkDirRoot, "wal"), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, wal, nil, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(0)) // Should not cause a race here.
|
||||
|
||||
require.Equal(t, uint64(0), head.NumStaleSeries())
|
||||
require.Equal(t, uint64(numNewSeries), head.NumSeries())
|
||||
require.NoError(t, head.Close())
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user