prometheus/tsdb/head_bench_test.go
Arve Knudsen 98809e40c6
tsdb: Skip clean series during periodic head chunk mmap (#18272)
tsdb: Skip clean series during periodic head chunk mmap

The periodic mmapHeadChunks cycle previously acquired a per-series
lock on every series, even though typically >99% have nothing to
mmap. This was identified as a CPU bottleneck in Grafana Mimir.

Add a headChunkCount field (sync/atomic.Uint32) to memSeries that
tracks the number of head chunks. It is incremented in
cutNewHeadChunk and the histogram new-chunk paths, and reset by
mmapChunks and truncateChunksBefore. mmapHeadChunks uses a lock-free
Load to skip series with fewer than 2 head chunks, avoiding the
per-series lock for clean series.

sync/atomic.Uint32 (4 bytes) is used instead of go.uber.org/atomic
(8 bytes) to fit in existing struct padding without growing
memSeries. Chunk counts are bounded by the 3-byte field in
HeadChunkRef, so cannot overflow uint32.

Also fix pre-existing comment inaccuracies in the touched code:
headChunks.next -> headChunks.prev, mmapHeadChunks() -> mmapChunks()
in the doc comment, and a grammar error.

---------

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
2026-04-14 17:11:35 +02:00

386 lines
12 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"errors"
"fmt"
"math/rand"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/util/compression"
)
type benchAppendFunc func(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction
func appendV1Float(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.Appender(b.Context())
for _, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
require.NoError(b, err)
}
}
return app
}
func appendV2Float(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.AppenderV2(b.Context())
for _, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
ref, err = app.Append(ref, s.Labels(), 0, ts+sampleIndex, float64(ts+sampleIndex), nil, nil, storage.AOptions{})
require.NoError(b, err)
}
}
return app
}
func appendV1FloatOrHistogramWithExemplars(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.Appender(b.Context())
for i, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
require.NoError(b, err)
// Every sample also has an exemplar attached.
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
continue
}
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
ref, err = app.AppendHistogram(ref, s.Labels(), ts, h, nil)
require.NoError(b, err)
// Every histogram sample also has 3 exemplars attached.
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
}
}
return app
}
func appendV2FloatOrHistogramWithExemplars(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var (
err error
ex = make([]exemplar.Exemplar, 3)
)
app := h.AppenderV2(b.Context())
for i, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
aOpts := storage.AOptions{Exemplars: ex[:0]}
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
// Every sample also has an exemplar attached.
aOpts.Exemplars = append(aOpts.Exemplars, exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
ref, err = app.Append(ref, s.Labels(), 0, ts, float64(ts), nil, nil, aOpts)
require.NoError(b, err)
continue
}
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
// Every histogram sample also has 3 exemplars attached.
aOpts.Exemplars = append(aOpts.Exemplars,
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
)
ref, err = app.Append(ref, s.Labels(), 0, ts, 0, h, nil, aOpts)
require.NoError(b, err)
}
}
return app
}
type appendCase struct {
name string
appendFunc benchAppendFunc
}
func appendCases() []appendCase {
return []appendCase{
{
name: "appender=v1/case=floats",
appendFunc: appendV1Float,
},
{
name: "appender=v2/case=floats",
appendFunc: appendV2Float,
},
{
name: "appender=v1/case=floatsHistogramsExemplars",
appendFunc: appendV1FloatOrHistogramWithExemplars,
},
{
name: "appender=v2/case=floatsHistogramsExemplars",
appendFunc: appendV2FloatOrHistogramWithExemplars,
},
}
}
/*
export bench=append && go test \
-run '^$' -bench '^BenchmarkHeadAppender_AppendCommit$' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkHeadAppender_AppendCommit(b *testing.B) {
// NOTE(bwplotka): Previously we also had 1k and 10k series case. There is nothing
// special happening in 100 vs 1k vs 10k, so let's save considerable amount of benchmark time
// for quicker feedback. In return, we add more sample type cases.
// Similarly, we removed the 2 sample in append case.
//
// TODO(bwplotka): This still takes ~6500s (~2h) for -benchtime 5s -count 6 to complete.
// We might want to reduce the time bit more. 5s is really important as the slowest
// case (appender=v1/case=floatsHistogramsExemplars/series=100/samples_per_append=100-2)
// in 5s yields only 255 iters 23184892 ns/op. Perhaps -benchtime=300x would be better?
seriesCounts := []int{10, 100}
series := genSeries(100, 10, 0, 0) // Only using the generated labels.
for _, appendCase := range appendCases() {
for _, seriesCount := range seriesCounts {
for _, samplesPerAppend := range []int64{1, 5, 100} {
b.Run(fmt.Sprintf("%s/series=%d/samples_per_append=%d", appendCase.name, seriesCount, samplesPerAppend), func(b *testing.B) {
opts := newTestHeadDefaultOptions(10000, false)
opts.EnableExemplarStorage = true // We benchmark with exemplars, benchmark with them.
h, _ := newTestHeadWithOptions(b, compression.None, opts)
ts := int64(1000)
// Init series, that's not what we're benchmarking here.
app := appendCase.appendFunc(b, h, ts, series[:seriesCount], samplesPerAppend)
require.NoError(b, app.Commit())
ts += 1000 // should increment more than highest samplesPerAppend
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
app := appendCase.appendFunc(b, h, ts, series[:seriesCount], samplesPerAppend)
require.NoError(b, app.Commit())
ts += 1000 // should increment more than highest samplesPerAppend
}
})
}
}
}
}
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
chunkDir := b.TempDir()
// Put a series, select it. GC it and then access it.
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
for i := 0; b.Loop(); i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false)
}
}
func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
chunkDir := b.TempDir()
// Put a series, select it. GC it and then access it.
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
var count atomic.Int64
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
i := count.Inc()
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))), false)
}
})
}
func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
chunkDir := b.TempDir()
// Put a series, select it. GC it and then access it.
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
// Mock the PreCreation() callback to fail on each series.
opts.SeriesCallback = failingSeriesLifecycleCallback{}
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
for i := 0; b.Loop(); i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false)
}
}
type failingSeriesLifecycleCallback struct{}
func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") }
func (failingSeriesLifecycleCallback) PostCreation(labels.Labels) {}
func (failingSeriesLifecycleCallback) PostDeletion(map[chunks.HeadSeriesRef]labels.Labels) {}
func BenchmarkMmapHeadChunks(b *testing.B) {
for _, seriesCount := range []int{1000, 10000, 100000} {
for _, readyPct := range []float64{0.01, 0.1, 1.0} {
readyCount := max(int(float64(seriesCount)*readyPct), 1)
b.Run(fmt.Sprintf("series=%d/ready=%d", seriesCount, readyCount), func(b *testing.B) {
db := newTestDB(b)
db.DisableCompactions()
chunkRange := DefaultBlockDuration
// Create all series in batches.
refs := make([]storage.SeriesRef, seriesCount)
lblsPerSeries := make([]labels.Labels, seriesCount)
ts := int64(0)
const batchSize = 1000
for batchStart := 0; batchStart < seriesCount; batchStart += batchSize {
app := db.Appender(b.Context())
batchEnd := min(batchStart+batchSize, seriesCount)
for i := batchStart; i < batchEnd; i++ {
lbls := labels.FromStrings("__name__", "bench", "i", strconv.Itoa(i))
lblsPerSeries[i] = lbls
ref, err := app.Append(0, lbls, ts, float64(i))
require.NoError(b, err)
refs[i] = ref
}
require.NoError(b, app.Commit())
}
// Pick a random, spread-out set of series to make ready each iteration.
// Using a Fisher-Yates shuffle prefix ensures coverage across stripes.
rng := rand.New(rand.NewSource(42))
perm := rng.Perm(seriesCount)
readyIndices := perm[:readyCount]
// makeReady appends a sample past the chunk range boundary to
// trigger a new head chunk on each ready series. Two appends
// suffice: one near the current ts, one past nextAt.
makeReady := func() {
ts++ // small increment for first sample
app := db.Appender(b.Context())
for _, idx := range readyIndices {
var err error
refs[idx], err = app.Append(refs[idx], lblsPerSeries[idx], ts, float64(ts))
require.NoError(b, err)
}
require.NoError(b, app.Commit())
ts += chunkRange // jump past nextAt to force chunk cut
app = db.Appender(b.Context())
for _, idx := range readyIndices {
var err error
refs[idx], err = app.Append(refs[idx], lblsPerSeries[idx], ts, float64(ts))
require.NoError(b, err)
}
require.NoError(b, app.Commit())
}
makeReady()
b.ResetTimer()
b.ReportAllocs()
for range b.N {
db.ForceHeadMMap()
b.StopTimer()
makeReady()
b.StartTimer()
}
})
}
}
}