diff --git a/tsdb/head.go b/tsdb/head.go index 6fe42c8cf2..e6d75e109f 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2156,15 +2156,24 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) { ) for _, ref := range refs { + // Delete the reference from the series map. + // Copying getByID here to avoid locking and unlocking twice. refShard := int(ref) & (h.series.size - 1) h.series.locks[refShard].Lock() - - // Copying getByID here to avoid locking and unlocking twice. series := h.series.series[refShard][ref] if series == nil { h.series.locks[refShard].Unlock() continue } + delete(h.series.series[refShard], series.ref) + h.series.locks[refShard].Unlock() + + // Delete the reference from the hash. + hash := series.lset.Hash() + hashShard := int(hash) & (h.series.size - 1) + h.series.locks[hashShard].Lock() + h.series.hashes[hashShard].del(hash, series.ref) + h.series.locks[hashShard].Unlock() if value.IsStaleNaN(series.lastValue) || (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || @@ -2172,9 +2181,6 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) { staleSeriesDeleted++ } - hash := series.lset.Hash() - hashShard := int(hash) & (h.series.size - 1) - chunksRemoved += len(series.mmappedChunks) if series.headChunks != nil { chunksRemoved += series.headChunks.len() @@ -2182,10 +2188,6 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) { deleted[storage.SeriesRef(series.ref)] = struct{}{} series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) - h.series.hashes[hashShard].del(hash, series.ref) - delete(h.series.series[refShard], series.ref) - - h.series.locks[refShard].Unlock() } h.metrics.seriesRemoved.Add(float64(len(deleted))) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7b8ae0ecbd..6dbe3599e4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -7246,3 +7246,77 @@ func TestHistogramStalenessConversionMetrics(t *testing.T) { }) } } + +// TestWALReplayRaceWithStaleSeriesCompaction verifies that deleteSeriesByID correctly locks the +// hash shard (not only the ref shard) when deleting from the hashes map. +// The race only occurs when Prometheus restarts after having done a stale series compaction because +// deleteSeriesByID is not used otherwise. +func TestWALReplayRaceWithStaleSeriesCompaction(t *testing.T) { + opts := newTestHeadDefaultOptions(1000, false) + // A small stripe size ensures many series share hash shards, increasing + // the likelihood that deleteSeriesByID and getOrCreateWithOptionalID + // contend on the same shard during WAL replay. + opts.StripeSize = 32 + head, _ := newTestHeadWithOptions(t, compression.None, opts) + require.NoError(t, head.Init(0)) + + appendSample := func(lbls labels.Labels, ts int64, val float64) { + app := head.Appender(context.Background()) + _, err := app.Append(0, lbls, ts, val) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + // Step 1: Create a batch of series and make them stale. + const numStaleSeries = 500 + staleLbls := make([]labels.Labels, numStaleSeries) + for i := range numStaleSeries { + staleLbls[i] = labels.FromStrings("__name__", "stale_metric", "i", strconv.Itoa(i)) + appendSample(staleLbls[i], 100, float64(i)) + } + for _, lbl := range staleLbls { + appendSample(lbl, 200, math.Float64frombits(value.StaleNaN)) + } + require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries()) + + // Step 2: Truncate stale series. This removes them from the Head and + // writes tombstone records (with Mint=MinInt64, Maxt=MaxInt64) to the WAL. + staleRefs := make([]storage.SeriesRef, 0, numStaleSeries) + for i := range numStaleSeries { + ms := head.series.getByHash(staleLbls[i].Hash(), staleLbls[i]) + require.NotNil(t, ms) + staleRefs = append(staleRefs, storage.SeriesRef(ms.ref)) + } + require.NoError(t, head.truncateStaleSeries(staleRefs, 300)) + require.Equal(t, uint64(0), head.NumStaleSeries()) + require.Equal(t, uint64(0), head.NumSeries()) + + // Step 3: Add new series AFTER the truncation. In the WAL, these series + // records appear after the tombstone records. During replay, the main + // goroutine will create these series (via getOrCreateWithOptionalID, which + // accesses hashes[hashShard] under locks[hashShard]) concurrently with + // the walSubsetProcessor goroutines deleting the stale series (via + // deleteSeriesByID, which must also lock the correct hashShard). + const numNewSeries = 500 + for i := range numNewSeries { + lbl := labels.FromStrings("__name__", "new_metric", "i", strconv.Itoa(i)) + appendSample(lbl, 300, float64(i)) + } + require.Equal(t, uint64(numNewSeries), head.NumSeries()) + + // Step 4: Close and re-open the Head to trigger WAL replay. + // With the buggy locking, the race detector should catch the data race + // between the main goroutine (creating series) and worker goroutines + // (deleting stale series) during replay. + require.NoError(t, head.Close()) + + wal, err := wlog.NewSize(nil, nil, filepath.Join(head.opts.ChunkDirRoot, "wal"), 32768, compression.None) + require.NoError(t, err) + head, err = NewHead(nil, nil, wal, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) // Should not cause a race here. + + require.Equal(t, uint64(0), head.NumStaleSeries()) + require.Equal(t, uint64(numNewSeries), head.NumSeries()) + require.NoError(t, head.Close()) +}