refactor(appenderV2): add TSDB AppenderV2 implementation

Signed-off-by: bwplotka <bwplotka@gmail.com>

tmp

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-11-28 12:43:12 +00:00
parent 50b101bb46
commit a1d088eee8
11 changed files with 1033 additions and 5595 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 The Prometheus Authors // Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
@ -13,7 +13,11 @@
package metadata package metadata
import "github.com/prometheus/common/model" import (
"strings"
"github.com/prometheus/common/model"
)
// Metadata stores a series' metadata information. // Metadata stores a series' metadata information.
type Metadata struct { type Metadata struct {
@ -21,3 +25,22 @@ type Metadata struct {
Unit string `json:"unit"` Unit string `json:"unit"`
Help string `json:"help"` Help string `json:"help"`
} }
// IsEmpty returns true if metadata structure is empty, including unknown type case.
func (m Metadata) IsEmpty() bool {
return (m.Type == "" || m.Type == model.MetricTypeUnknown) && m.Unit == "" && m.Help == ""
}
// Equals returns true if m is semantically the same as other metadata.
func (m Metadata) Equals(other Metadata) bool {
if strings.Compare(m.Unit, other.Unit) != 0 || strings.Compare(m.Help, other.Help) != 0 {
return false
}
if m.Type != "" && m.Type != model.MetricTypeUnknown {
if m.Type != other.Type {
return false
}
}
return true
}

View File

@ -49,6 +49,7 @@ var (
// NOTE(bwplotka): This can be both an instrumentation failure or commonly expected // NOTE(bwplotka): This can be both an instrumentation failure or commonly expected
// behaviour, and we currently don't have a way to determine this. As a result // behaviour, and we currently don't have a way to determine this. As a result
// it's recommended to ignore this error for now. // it's recommended to ignore this error for now.
// TODO(bwplotka): Remove with appender v1 flow; not used in v2.
ErrOutOfOrderST = errors.New("start timestamp out of order, ignoring") ErrOutOfOrderST = errors.New("start timestamp out of order, ignoring")
ErrSTNewerThanSample = errors.New("ST is newer or the same as sample's timestamp, ignoring") ErrSTNewerThanSample = errors.New("ST is newer or the same as sample's timestamp, ignoring")
) )

View File

@ -1131,11 +1131,16 @@ func (db *DB) run(ctx context.Context) {
} }
} }
// Appender opens a new appender against the database. // Appender opens a new Appender against the database.
func (db *DB) Appender(ctx context.Context) storage.Appender { func (db *DB) Appender(ctx context.Context) storage.Appender {
return dbAppender{db: db, Appender: db.head.Appender(ctx)} return dbAppender{db: db, Appender: db.head.Appender(ctx)}
} }
// AppenderV2 opens a new AppenderV2 against the database.
func (db *DB) AppenderV2(ctx context.Context) storage.AppenderV2 {
return dbAppenderV2{db: db, AppenderV2: db.head.AppenderV2(ctx)}
}
// ApplyConfig applies a new config to the DB. // ApplyConfig applies a new config to the DB.
// Behaviour of 'OutOfOrderTimeWindow' is as follows: // Behaviour of 'OutOfOrderTimeWindow' is as follows:
// OOO enabled = oooTimeWindow > 0. OOO disabled = oooTimeWindow is 0. // OOO enabled = oooTimeWindow > 0. OOO disabled = oooTimeWindow is 0.
@ -1249,6 +1254,36 @@ func (a dbAppender) Commit() error {
return err return err
} }
// dbAppenderV2 wraps the DB's head appender and triggers compactions on commit
// if necessary.
type dbAppenderV2 struct {
storage.AppenderV2
db *DB
}
var _ storage.GetRef = dbAppenderV2{}
func (a dbAppenderV2) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
if g, ok := a.AppenderV2.(storage.GetRef); ok {
return g.GetRef(lset, hash)
}
return 0, labels.EmptyLabels()
}
func (a dbAppenderV2) Commit() error {
err := a.AppenderV2.Commit()
// We could just run this check every few minutes practically. But for benchmarks
// and high frequency use cases this is the safer way.
if a.db.head.compactable() {
select {
case a.db.compactc <- struct{}{}:
default:
}
}
return err
}
// waitingForCompactionDelay returns true if the DB is waiting for the Head compaction delay. // waitingForCompactionDelay returns true if the DB is waiting for the Head compaction delay.
// This doesn't guarantee that the Head is really compactable. // This doesn't guarantee that the Head is really compactable.
func (db *DB) waitingForCompactionDelay() bool { func (db *DB) waitingForCompactionDelay() bool {

View File

@ -187,6 +187,20 @@ type HeadOptions struct {
// EnableSharding enables ShardedPostings() support in the Head. // EnableSharding enables ShardedPostings() support in the Head.
EnableSharding bool EnableSharding bool
// EnableSTAsZeroSample represents 'created-timestamp-zero-ingestion' feature flag.
// If true, ST, if non-empty and earlier than sample timestamp, will be stored
// as a zero sample before the actual sample.
//
// The zero sample is best-effort, only debug log on failure is emitted.
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableSTAsZeroSample bool
// EnableMetadataWALRecords represents 'metadata-wal-records' feature flag.
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableMetadataWALRecords bool
} }
const ( const (

View File

@ -165,17 +165,19 @@ func (h *Head) appender() *headAppender {
minValidTime := h.appendableMinValidTime() minValidTime := h.appendableMinValidTime()
appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback. appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback.
return &headAppender{ return &headAppender{
head: h, headAppenderBase: headAppenderBase{
minValidTime: minValidTime, head: h,
mint: math.MaxInt64, minValidTime: minValidTime,
maxt: math.MinInt64, mint: math.MaxInt64,
headMaxt: h.MaxTime(), maxt: math.MinInt64,
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(), headMaxt: h.MaxTime(),
seriesRefs: h.getRefSeriesBuffer(), oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
series: h.getSeriesBuffer(), seriesRefs: h.getRefSeriesBuffer(),
typesInBatch: h.getTypeMap(), series: h.getSeriesBuffer(),
appendID: appendID, typesInBatch: h.getTypeMap(),
cleanupAppendIDsBelow: cleanupAppendIDsBelow, appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
},
} }
} }
@ -382,7 +384,7 @@ func (b *appendBatch) close(h *Head) {
b.exemplars = nil b.exemplars = nil
} }
type headAppender struct { type headAppenderBase struct {
head *Head head *Head
minValidTime int64 // No samples below this timestamp are allowed. minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64 mint, maxt int64
@ -397,7 +399,10 @@ type headAppender struct {
appendID, cleanupAppendIDsBelow uint64 appendID, cleanupAppendIDsBelow uint64
closed bool closed bool
hints *storage.AppendOptions }
type headAppender struct {
headAppenderBase
hints *storage.AppendOptions
} }
func (a *headAppender) SetOptions(opts *storage.AppendOptions) { func (a *headAppender) SetOptions(opts *storage.AppendOptions) {
@ -525,7 +530,7 @@ func (a *headAppender) AppendSTZeroSample(ref storage.SeriesRef, lset labels.Lab
return storage.SeriesRef(s.ref), nil return storage.SeriesRef(s.ref), nil
} }
func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) { func (a *headAppenderBase) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) {
// Ensure no empty labels have gotten through. // Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty() lset = lset.WithoutEmpty()
if lset.IsEmpty() { if lset.IsEmpty() {
@ -550,7 +555,7 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo
// getCurrentBatch returns the current batch if it fits the provided sampleType // getCurrentBatch returns the current batch if it fits the provided sampleType
// for the provided series. Otherwise, it adds a new batch and returns it. // for the provided series. Otherwise, it adds a new batch and returns it.
func (a *headAppender) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch { func (a *headAppenderBase) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch {
h := a.head h := a.head
newBatch := func() *appendBatch { newBatch := func() *appendBatch {
@ -1043,7 +1048,7 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels,
var _ storage.GetRef = &headAppender{} var _ storage.GetRef = &headAppender{}
func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) { func (a *headAppenderBase) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
s := a.head.series.getByHash(hash, lset) s := a.head.series.getByHash(hash, lset)
if s == nil { if s == nil {
return 0, labels.EmptyLabels() return 0, labels.EmptyLabels()
@ -1053,7 +1058,7 @@ func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
} }
// log writes all headAppender's data to the WAL. // log writes all headAppender's data to the WAL.
func (a *headAppender) log() error { func (a *headAppenderBase) log() error {
if a.head.wal == nil { if a.head.wal == nil {
return nil return nil
} }
@ -1185,7 +1190,7 @@ type appenderCommitContext struct {
} }
// commitExemplars adds all exemplars from the provided batch to the head's exemplar storage. // commitExemplars adds all exemplars from the provided batch to the head's exemplar storage.
func (a *headAppender) commitExemplars(b *appendBatch) { func (a *headAppenderBase) commitExemplars(b *appendBatch) {
// No errors logging to WAL, so pass the exemplars along to the in memory storage. // No errors logging to WAL, so pass the exemplars along to the in memory storage.
for _, e := range b.exemplars { for _, e := range b.exemplars {
s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref)) s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref))
@ -1205,7 +1210,7 @@ func (a *headAppender) commitExemplars(b *appendBatch) {
} }
} }
func (acc *appenderCommitContext) collectOOORecords(a *headAppender) { func (acc *appenderCommitContext) collectOOORecords(a *headAppenderBase) {
if a.head.wbl == nil { if a.head.wbl == nil {
// WBL is not enabled. So no need to collect. // WBL is not enabled. So no need to collect.
acc.wblSamples = nil acc.wblSamples = nil
@ -1310,7 +1315,7 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld
// operations on the series after appending the samples. // operations on the series after appending the samples.
// //
// There are also specific functions to commit histograms and float histograms. // There are also specific functions to commit histograms and float histograms.
func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) { func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool var ok, chunkCreated bool
var series *memSeries var series *memSeries
@ -1466,7 +1471,7 @@ func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext)
} }
// For details on the commitHistograms function, see the commitFloats docs. // For details on the commitHistograms function, see the commitFloats docs.
func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitContext) { func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool var ok, chunkCreated bool
var series *memSeries var series *memSeries
@ -1575,7 +1580,7 @@ func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitConte
} }
// For details on the commitFloatHistograms function, see the commitFloats docs. // For details on the commitFloatHistograms function, see the commitFloats docs.
func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) { func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool var ok, chunkCreated bool
var series *memSeries var series *memSeries
@ -1697,7 +1702,7 @@ func commitMetadata(b *appendBatch) {
} }
} }
func (a *headAppender) unmarkCreatedSeriesAsPendingCommit() { func (a *headAppenderBase) unmarkCreatedSeriesAsPendingCommit() {
for _, s := range a.series { for _, s := range a.series {
s.Lock() s.Lock()
s.pendingCommit = false s.pendingCommit = false
@ -1707,7 +1712,7 @@ func (a *headAppender) unmarkCreatedSeriesAsPendingCommit() {
// Commit writes to the WAL and adds the data to the Head. // Commit writes to the WAL and adds the data to the Head.
// TODO(codesome): Refactor this method to reduce indentation and make it more readable. // TODO(codesome): Refactor this method to reduce indentation and make it more readable.
func (a *headAppender) Commit() (err error) { func (a *headAppenderBase) Commit() (err error) {
if a.closed { if a.closed {
return ErrAppenderClosed return ErrAppenderClosed
} }
@ -2238,7 +2243,7 @@ func handleChunkWriteError(err error) {
} }
// Rollback removes the samples and exemplars from headAppender and writes any series to WAL. // Rollback removes the samples and exemplars from headAppender and writes any series to WAL.
func (a *headAppender) Rollback() (err error) { func (a *headAppenderBase) Rollback() (err error) {
if a.closed { if a.closed {
return ErrAppenderClosed return ErrAppenderClosed
} }

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -14,7 +14,6 @@
package tsdb package tsdb
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
@ -32,6 +31,223 @@ import (
"github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/compression"
) )
type benchAppendFunc func(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction
func appendV1Float(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.Appender(b.Context())
for _, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
require.NoError(b, err)
}
}
return app
}
func appendV2Float(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.AppenderV2(b.Context())
for _, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
ref, err = app.Append(ref, s.Labels(), 0, ts+sampleIndex, float64(ts+sampleIndex), nil, nil, storage.AOptions{})
require.NoError(b, err)
}
}
return app
}
func appendV1FloatOrHistogramWithExemplars(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.Appender(b.Context())
for i, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
require.NoError(b, err)
// Every sample also has an exemplar attached.
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
continue
}
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
ref, err = app.AppendHistogram(ref, s.Labels(), ts, h, nil)
require.NoError(b, err)
// Every histogram sample also has 3 exemplars attached.
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
}
}
return app
}
func appendV2FloatOrHistogramWithExemplars(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var (
err error
ex = make([]exemplar.Exemplar, 3)
)
app := h.AppenderV2(b.Context())
for i, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
aOpts := storage.AOptions{Exemplars: ex[:0]}
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
// Every sample also has an exemplar attached.
aOpts.Exemplars = append(aOpts.Exemplars, exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
ref, err = app.Append(ref, s.Labels(), 0, ts, float64(ts), nil, nil, aOpts)
require.NoError(b, err)
continue
}
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
// Every histogram sample also has 3 exemplars attached.
aOpts.Exemplars = append(aOpts.Exemplars,
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
)
ref, err = app.Append(ref, s.Labels(), 0, ts, 0, h, nil, aOpts)
require.NoError(b, err)
}
}
return app
}
type appendCase struct {
name string
appendFunc benchAppendFunc
}
func appendCases() []appendCase {
return []appendCase{
{
name: "appender=v1/case=floats",
appendFunc: appendV1Float,
},
{
name: "appender=v2/case=floats",
appendFunc: appendV2Float,
},
{
name: "appender=v1/case=floatsHistogramsExemplars",
appendFunc: appendV1FloatOrHistogramWithExemplars,
},
{
name: "appender=v2/case=floatsHistogramsExemplars",
appendFunc: appendV2FloatOrHistogramWithExemplars,
},
}
}
/*
export bench=append && go test \
-run '^$' -bench '^BenchmarkHeadAppender_AppendCommit$' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkHeadAppender_AppendCommit(b *testing.B) {
// NOTE(bwplotka): Previously we also had 1k and 10k series case. There is nothing
// special happening in 100 vs 1k vs 10k, so let's save considerable amount of benchmark time
// for quicker feedback. In return, we add more sample type cases.
// Similarly, we removed the 2 sample in append case.
seriesCounts := []int{10, 100}
series := genSeries(100, 10, 0, 0) // Only using the generated labels.
for _, appendCase := range appendCases() {
for _, seriesCount := range seriesCounts {
for _, samplesPerAppend := range []int64{1, 5, 100} {
b.Run(fmt.Sprintf("%s/series=%d/samples_per_append=%d", appendCase.name, seriesCount, samplesPerAppend), func(b *testing.B) {
opts := newTestHeadDefaultOptions(10000, false)
opts.EnableExemplarStorage = true // We benchmark with exemplars, benchmark with them.
h, _ := newTestHeadWithOptions(b, compression.None, opts)
b.Cleanup(func() { require.NoError(b, h.Close()) })
ts := int64(1000)
// Init series, that's not what we're benchmarking here.
app := appendCase.appendFunc(b, h, ts, series[:seriesCount], samplesPerAppend)
require.NoError(b, app.Commit())
ts += 1000 // should increment more than highest samplesPerAppend
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
app := appendCase.appendFunc(b, h, ts, series[:seriesCount], samplesPerAppend)
require.NoError(b, app.Commit())
ts += 1000 // should increment more than highest samplesPerAppend
}
})
}
}
}
}
func BenchmarkHeadStripeSeriesCreate(b *testing.B) { func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
chunkDir := b.TempDir() chunkDir := b.TempDir()
// Put a series, select it. GC it and then access it. // Put a series, select it. GC it and then access it.
@ -86,86 +302,6 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
} }
} }
func BenchmarkHead_WalCommit(b *testing.B) {
seriesCounts := []int{100, 1000, 10000}
series := genSeries(10000, 10, 0, 0) // Only using the generated labels.
appendSamples := func(b *testing.B, app storage.Appender, seriesCount int, ts int64) {
var err error
for i, s := range series[:seriesCount] {
var ref storage.SeriesRef
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
ref, err = app.Append(ref, s.Labels(), ts, float64(ts))
} else {
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
ref, err = app.AppendHistogram(ref, s.Labels(), ts, h, nil)
}
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts,
})
require.NoError(b, err)
}
}
for _, seriesCount := range seriesCounts {
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
for _, commits := range []int64{1, 2} { // To test commits that create new series and when the series already exists.
b.Run(fmt.Sprintf("%d commits", commits), func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
b.StopTimer()
h, w := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() {
if h != nil {
h.Close()
}
if w != nil {
w.Close()
}
})
app := h.Appender(context.Background())
appendSamples(b, app, seriesCount, 0)
b.StartTimer()
require.NoError(b, app.Commit())
if commits == 2 {
b.StopTimer()
app = h.Appender(context.Background())
appendSamples(b, app, seriesCount, 1)
b.StartTimer()
require.NoError(b, app.Commit())
}
b.StopTimer()
h.Close()
h = nil
w.Close()
w = nil
}
})
}
})
}
}
type failingSeriesLifecycleCallback struct{} type failingSeriesLifecycleCallback struct{}
func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") } func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") }

View File

@ -1,173 +0,0 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/util/compression"
)
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
chunkDir := b.TempDir()
// Put a series, select it. GC it and then access it.
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
for i := 0; b.Loop(); i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false)
}
}
func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
chunkDir := b.TempDir()
// Put a series, select it. GC it and then access it.
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
var count atomic.Int64
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
i := count.Inc()
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))), false)
}
})
}
func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
chunkDir := b.TempDir()
// Put a series, select it. GC it and then access it.
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
// Mock the PreCreation() callback to fail on each series.
opts.SeriesCallback = failingSeriesLifecycleCallback{}
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
for i := 0; b.Loop(); i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false)
}
}
func BenchmarkHead_WalCommit(b *testing.B) {
seriesCounts := []int{100, 1000, 10000}
series := genSeries(10000, 10, 0, 0) // Only using the generated labels.
appendSamples := func(b *testing.B, app storage.Appender, seriesCount int, ts int64) {
var err error
for i, s := range series[:seriesCount] {
var ref storage.SeriesRef
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
ref, err = app.Append(ref, s.Labels(), ts, float64(ts))
} else {
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
ref, err = app.AppendHistogram(ref, s.Labels(), ts, h, nil)
}
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts,
})
require.NoError(b, err)
}
}
for _, seriesCount := range seriesCounts {
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
for _, commits := range []int64{1, 2} { // To test commits that create new series and when the series already exists.
b.Run(fmt.Sprintf("%d commits", commits), func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
b.StopTimer()
h, w := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() {
if h != nil {
h.Close()
}
if w != nil {
w.Close()
}
})
app := h.Appender(context.Background())
appendSamples(b, app, seriesCount, 0)
b.StartTimer()
require.NoError(b, app.Commit())
if commits == 2 {
b.StopTimer()
app = h.Appender(context.Background())
appendSamples(b, app, seriesCount, 1)
b.StartTimer()
require.NoError(b, app.Commit())
}
b.StopTimer()
h.Close()
h = nil
w.Close()
w = nil
}
})
}
})
}
}
type failingSeriesLifecycleCallback struct{}
func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") }
func (failingSeriesLifecycleCallback) PostCreation(labels.Labels) {}
func (failingSeriesLifecycleCallback) PostDeletion(map[chunks.HeadSeriesRef]labels.Labels) {}

View File

@ -107,49 +107,6 @@ func BenchmarkCreateSeries(b *testing.B) {
} }
} }
func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) {
seriesCounts := []int{100, 1000, 10000}
series := genSeries(10000, 10, 0, 0)
for _, seriesCount := range seriesCounts {
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
for _, samplesPerAppend := range []int64{1, 2, 5, 100} {
b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) {
h, _ := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() { require.NoError(b, h.Close()) })
ts := int64(1000)
appendSamples := func() error {
var err error
app := h.Appender(context.Background())
for _, s := range series[:seriesCount] {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
if err != nil {
return err
}
}
}
ts += 1000 // should increment more than highest samplesPerAppend
return app.Commit()
}
// Init series, that's not what we're benchmarking here.
require.NoError(b, appendSamples())
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
require.NoError(b, appendSamples())
}
})
}
})
}
}
func populateTestWL(t testing.TB, w *wlog.WL, recs []any, buf []byte) []byte { func populateTestWL(t testing.TB, w *wlog.WL, recs []any, buf []byte) []byte {
var enc record.Encoder var enc record.Encoder
for _, r := range recs { for _, r := range recs {
@ -5941,7 +5898,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
} }
} }
func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) { func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) {
dir := t.TempDir() dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err) require.NoError(t, err)
@ -6284,6 +6241,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
} }
// TODO(bwplotka): Bad benchmark (no b.Loop/b.N), fix or remove.
func BenchmarkCuttingHeadHistogramChunks(b *testing.B) { func BenchmarkCuttingHeadHistogramChunks(b *testing.B) {
const ( const (
numSamples = 50000 numSamples = 50000

View File

@ -44,14 +44,14 @@ type testValue struct {
type sampleTypeScenario struct { type sampleTypeScenario struct {
sampleType string sampleType string
appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) appendFunc func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)
sampleFunc func(ts, value int64) sample sampleFunc func(ts, value int64) sample
} }
var sampleTypeScenarios = map[string]sampleTypeScenario{ var sampleTypeScenarios = map[string]sampleTypeScenario{
float: { float: {
sampleType: sampleMetricTypeFloat, sampleType: sampleMetricTypeFloat,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, f: float64(value)} s := sample{t: ts, f: float64(value)}
ref, err := appender.Append(0, lbls, ts, s.f) ref, err := appender.Append(0, lbls, ts, s.f)
return ref, s, err return ref, s, err
@ -62,7 +62,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
}, },
intHistogram: { intHistogram: {
sampleType: sampleMetricTypeHistogram, sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(value)} s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil) ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err return ref, s, err
@ -73,7 +73,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
}, },
floatHistogram: { floatHistogram: {
sampleType: sampleMetricTypeHistogram, sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(value)} s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh) ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err return ref, s, err
@ -84,7 +84,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
}, },
customBucketsIntHistogram: { customBucketsIntHistogram: {
sampleType: sampleMetricTypeHistogram, sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(value)} s := sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil) ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err return ref, s, err
@ -95,7 +95,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
}, },
customBucketsFloatHistogram: { customBucketsFloatHistogram: {
sampleType: sampleMetricTypeHistogram, sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestCustomBucketsFloatHistogram(value)} s := sample{t: ts, fh: tsdbutil.GenerateTestCustomBucketsFloatHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh) ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err return ref, s, err
@ -106,7 +106,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
}, },
gaugeIntHistogram: { gaugeIntHistogram: {
sampleType: sampleMetricTypeHistogram, sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(value)} s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil) ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err return ref, s, err
@ -117,7 +117,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
}, },
gaugeFloatHistogram: { gaugeFloatHistogram: {
sampleType: sampleMetricTypeHistogram, sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(value)} s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh) ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err return ref, s, err