mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-04 12:01:06 +02:00
Merge pull request #18068 from prometheus/codesome/stale-race
tsdb: Fix locking in Head.deleteSeriesByID
This commit is contained in:
commit
05fac7cba1
20
tsdb/head.go
20
tsdb/head.go
@ -2156,15 +2156,24 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
|
||||
)
|
||||
|
||||
for _, ref := range refs {
|
||||
// Delete the reference from the series map.
|
||||
// Copying getByID here to avoid locking and unlocking twice.
|
||||
refShard := int(ref) & (h.series.size - 1)
|
||||
h.series.locks[refShard].Lock()
|
||||
|
||||
// Copying getByID here to avoid locking and unlocking twice.
|
||||
series := h.series.series[refShard][ref]
|
||||
if series == nil {
|
||||
h.series.locks[refShard].Unlock()
|
||||
continue
|
||||
}
|
||||
delete(h.series.series[refShard], series.ref)
|
||||
h.series.locks[refShard].Unlock()
|
||||
|
||||
// Delete the reference from the hash.
|
||||
hash := series.lset.Hash()
|
||||
hashShard := int(hash) & (h.series.size - 1)
|
||||
h.series.locks[hashShard].Lock()
|
||||
h.series.hashes[hashShard].del(hash, series.ref)
|
||||
h.series.locks[hashShard].Unlock()
|
||||
|
||||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
@ -2172,9 +2181,6 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
|
||||
staleSeriesDeleted++
|
||||
}
|
||||
|
||||
hash := series.lset.Hash()
|
||||
hashShard := int(hash) & (h.series.size - 1)
|
||||
|
||||
chunksRemoved += len(series.mmappedChunks)
|
||||
if series.headChunks != nil {
|
||||
chunksRemoved += series.headChunks.len()
|
||||
@ -2182,10 +2188,6 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||
h.series.hashes[hashShard].del(hash, series.ref)
|
||||
delete(h.series.series[refShard], series.ref)
|
||||
|
||||
h.series.locks[refShard].Unlock()
|
||||
}
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(len(deleted)))
|
||||
|
||||
@ -7246,3 +7246,77 @@ func TestHistogramStalenessConversionMetrics(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestWALReplayRaceWithStaleSeriesCompaction verifies that deleteSeriesByID correctly locks the
|
||||
// hash shard (not only the ref shard) when deleting from the hashes map.
|
||||
// The race only occurs when Prometheus restarts after having done a stale series compaction because
|
||||
// deleteSeriesByID is not used otherwise.
|
||||
func TestWALReplayRaceWithStaleSeriesCompaction(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(1000, false)
|
||||
// A small stripe size ensures many series share hash shards, increasing
|
||||
// the likelihood that deleteSeriesByID and getOrCreateWithOptionalID
|
||||
// contend on the same shard during WAL replay.
|
||||
opts.StripeSize = 32
|
||||
head, _ := newTestHeadWithOptions(t, compression.None, opts)
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
appendSample := func(lbls labels.Labels, ts int64, val float64) {
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Append(0, lbls, ts, val)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
// Step 1: Create a batch of series and make them stale.
|
||||
const numStaleSeries = 500
|
||||
staleLbls := make([]labels.Labels, numStaleSeries)
|
||||
for i := range numStaleSeries {
|
||||
staleLbls[i] = labels.FromStrings("__name__", "stale_metric", "i", strconv.Itoa(i))
|
||||
appendSample(staleLbls[i], 100, float64(i))
|
||||
}
|
||||
for _, lbl := range staleLbls {
|
||||
appendSample(lbl, 200, math.Float64frombits(value.StaleNaN))
|
||||
}
|
||||
require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries())
|
||||
|
||||
// Step 2: Truncate stale series. This removes them from the Head and
|
||||
// writes tombstone records (with Mint=MinInt64, Maxt=MaxInt64) to the WAL.
|
||||
staleRefs := make([]storage.SeriesRef, 0, numStaleSeries)
|
||||
for i := range numStaleSeries {
|
||||
ms := head.series.getByHash(staleLbls[i].Hash(), staleLbls[i])
|
||||
require.NotNil(t, ms)
|
||||
staleRefs = append(staleRefs, storage.SeriesRef(ms.ref))
|
||||
}
|
||||
require.NoError(t, head.truncateStaleSeries(staleRefs, 300))
|
||||
require.Equal(t, uint64(0), head.NumStaleSeries())
|
||||
require.Equal(t, uint64(0), head.NumSeries())
|
||||
|
||||
// Step 3: Add new series AFTER the truncation. In the WAL, these series
|
||||
// records appear after the tombstone records. During replay, the main
|
||||
// goroutine will create these series (via getOrCreateWithOptionalID, which
|
||||
// accesses hashes[hashShard] under locks[hashShard]) concurrently with
|
||||
// the walSubsetProcessor goroutines deleting the stale series (via
|
||||
// deleteSeriesByID, which must also lock the correct hashShard).
|
||||
const numNewSeries = 500
|
||||
for i := range numNewSeries {
|
||||
lbl := labels.FromStrings("__name__", "new_metric", "i", strconv.Itoa(i))
|
||||
appendSample(lbl, 300, float64(i))
|
||||
}
|
||||
require.Equal(t, uint64(numNewSeries), head.NumSeries())
|
||||
|
||||
// Step 4: Close and re-open the Head to trigger WAL replay.
|
||||
// With the buggy locking, the race detector should catch the data race
|
||||
// between the main goroutine (creating series) and worker goroutines
|
||||
// (deleting stale series) during replay.
|
||||
require.NoError(t, head.Close())
|
||||
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(head.opts.ChunkDirRoot, "wal"), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, wal, nil, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(0)) // Should not cause a race here.
|
||||
|
||||
require.Equal(t, uint64(0), head.NumStaleSeries())
|
||||
require.Equal(t, uint64(numNewSeries), head.NumSeries())
|
||||
require.NoError(t, head.Close())
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user