prometheus/storage/fanout_test.go
Bartlomiej Plotka 9657c23c37
storage: optimized fanoutAppenderV2 (#17976)
* storage: add BenchmarkFanoutAppenderV2

Signed-off-by: bwplotka <bwplotka@gmail.com>

* fix: optimized fanoutAppenderV2

Signed-off-by: bwplotka <bwplotka@gmail.com>

* optimized more

Signed-off-by: bwplotka <bwplotka@gmail.com>

---------

Signed-off-by: bwplotka <bwplotka@gmail.com>
2026-02-02 07:04:30 +00:00

605 lines
18 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage_test
import (
"context"
"errors"
"strconv"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
)
func TestFanout_SelectSorted(t *testing.T) {
inputLabel := labels.FromStrings(model.MetricNameLabel, "a")
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
inputTotalSize := 0
ctx := context.Background()
priStorage := teststorage.New(t)
app1 := priStorage.Appender(ctx)
app1.Append(0, inputLabel, 0, 0)
inputTotalSize++
app1.Append(0, inputLabel, 1000, 1)
inputTotalSize++
app1.Append(0, inputLabel, 2000, 2)
inputTotalSize++
err := app1.Commit()
require.NoError(t, err)
remoteStorage1 := teststorage.New(t)
app2 := remoteStorage1.Appender(ctx)
app2.Append(0, inputLabel, 3000, 3)
inputTotalSize++
app2.Append(0, inputLabel, 4000, 4)
inputTotalSize++
app2.Append(0, inputLabel, 5000, 5)
inputTotalSize++
err = app2.Commit()
require.NoError(t, err)
remoteStorage2 := teststorage.New(t)
app3 := remoteStorage2.Appender(ctx)
app3.Append(0, inputLabel, 6000, 6)
inputTotalSize++
app3.Append(0, inputLabel, 7000, 7)
inputTotalSize++
app3.Append(0, inputLabel, 8000, 8)
inputTotalSize++
err = app3.Commit()
require.NoError(t, err)
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
t.Run("querier", func(t *testing.T) {
querier, err := fanoutStorage.Querier(0, 8000)
require.NoError(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
require.NoError(t, err)
seriesSet := querier.Select(ctx, true, nil, matcher)
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
require.Equal(t, labelsResult, outputLabel)
require.Len(t, result, inputTotalSize)
})
t.Run("chunk querier", func(t *testing.T) {
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
require.NoError(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
require.NoError(t, err)
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(ctx, true, nil, matcher))
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
require.NoError(t, seriesSet.Err())
require.Equal(t, labelsResult, outputLabel)
require.Len(t, result, inputTotalSize)
})
}
func TestFanout_SelectSorted_AppenderV2(t *testing.T) {
inputLabel := labels.FromStrings(model.MetricNameLabel, "a")
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
inputTotalSize := 0
priStorage := teststorage.New(t)
app1 := priStorage.AppenderV2(t.Context())
_, err := app1.Append(0, inputLabel, 0, 0, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app1.Append(0, inputLabel, 0, 1000, 1, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app1.Append(0, inputLabel, 0, 2000, 2, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
require.NoError(t, app1.Commit())
remoteStorage1 := teststorage.New(t)
app2 := remoteStorage1.AppenderV2(t.Context())
_, err = app2.Append(0, inputLabel, 0, 3000, 3, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app2.Append(0, inputLabel, 0, 4000, 4, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app2.Append(0, inputLabel, 0, 5000, 5, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
require.NoError(t, app2.Commit())
remoteStorage2 := teststorage.New(t)
app3 := remoteStorage2.AppenderV2(t.Context())
_, err = app3.Append(0, inputLabel, 0, 6000, 6, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app3.Append(0, inputLabel, 0, 7000, 7, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app3.Append(0, inputLabel, 0, 8000, 8, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
require.NoError(t, app3.Commit())
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
t.Run("querier", func(t *testing.T) {
querier, err := fanoutStorage.Querier(0, 8000)
require.NoError(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
require.NoError(t, err)
seriesSet := querier.Select(t.Context(), true, nil, matcher)
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
require.Equal(t, labelsResult, outputLabel)
require.Len(t, result, inputTotalSize)
})
t.Run("chunk querier", func(t *testing.T) {
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
require.NoError(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
require.NoError(t, err)
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(t.Context(), true, nil, matcher))
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
require.NoError(t, seriesSet.Err())
require.Equal(t, labelsResult, outputLabel)
require.Len(t, result, inputTotalSize)
})
}
func TestFanoutErrors(t *testing.T) {
workingStorage := teststorage.New(t)
cases := []struct {
primary storage.Storage
secondary storage.Storage
warning error
err error
}{
{
primary: workingStorage,
secondary: errStorage{},
warning: errSelect,
err: nil,
},
{
primary: errStorage{},
secondary: workingStorage,
warning: nil,
err: errSelect,
},
}
for _, tc := range cases {
fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary)
t.Run("samples", func(t *testing.T) {
querier, err := fanoutStorage.Querier(0, 8000)
require.NoError(t, err)
defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
ss := querier.Select(context.Background(), true, nil, matcher)
// Exhaust.
for ss.Next() {
ss.At()
}
if tc.err != nil {
require.EqualError(t, ss.Err(), tc.err.Error())
}
if tc.warning != nil {
w := ss.Warnings()
require.NotEmpty(t, w, "warnings expected")
require.EqualError(t, w.AsErrors()[0], tc.warning.Error())
}
})
t.Run("chunks", func(t *testing.T) {
t.Skip("enable once TestStorage and TSDB implements ChunkQuerier")
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
require.NoError(t, err)
defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
ss := querier.Select(context.Background(), true, nil, matcher)
// Exhaust.
for ss.Next() {
ss.At()
}
if tc.err != nil {
require.EqualError(t, ss.Err(), tc.err.Error())
}
if tc.warning != nil {
w := ss.Warnings()
require.NotEmpty(t, w, "warnings expected")
require.EqualError(t, w.AsErrors()[0], tc.warning.Error())
}
})
}
}
var errSelect = errors.New("select error")
type errStorage struct{}
type errQuerier struct{}
func (errStorage) Querier(_, _ int64) (storage.Querier, error) {
return errQuerier{}, nil
}
type errChunkQuerier struct{ errQuerier }
func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) {
return errChunkQuerier{}, nil
}
func (errStorage) Appender(context.Context) storage.Appender { return nil }
func (errStorage) AppenderV2(context.Context) storage.AppenderV2 { return nil }
func (errStorage) StartTime() (int64, error) { return 0, nil }
func (errStorage) Close() error { return nil }
func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(errSelect)
}
func (errQuerier) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, errors.New("label values error")
}
func (errQuerier) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, errors.New("label names error")
}
func (errQuerier) Close() error { return nil }
func (errChunkQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
return storage.ErrChunkSeriesSet(errSelect)
}
type mockStorage struct {
app storage.Appendable
appV2 storage.AppendableV2
storage.Storage
}
func (m mockStorage) Appender(ctx context.Context) storage.Appender {
return m.app.Appender(ctx)
}
func (m mockStorage) AppenderV2(ctx context.Context) storage.AppenderV2 {
return m.appV2.AppenderV2(ctx)
}
type sample = teststorage.Sample
func withoutExemplars(s []sample) (ret []sample) {
ret = make([]sample, len(s))
copy(ret, s)
for i := range ret {
ret[i].ES = nil
}
return ret
}
type fanoutAppenderTestCase struct {
name string
primary *teststorage.Appendable
secondary *teststorage.Appendable
expectAppendErr bool
expectExemplarError bool
expectCommitError bool
expectPrimarySamples []sample
expectSecondarySamples []sample
}
func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase {
appErr := errors.New("append test error")
exErr := errors.New("exemplar test error")
commitErr := errors.New("commit test error")
return []fanoutAppenderTestCase{
{
name: "both works",
primary: teststorage.NewAppendable(),
secondary: teststorage.NewAppendable(),
expectPrimarySamples: expected,
expectSecondarySamples: expected,
},
{
name: "primary errors",
primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr),
secondary: teststorage.NewAppendable(),
expectAppendErr: true,
expectExemplarError: true,
expectCommitError: true,
},
{
name: "exemplar errors",
primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil),
secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil),
expectAppendErr: false,
expectExemplarError: true,
expectCommitError: false,
expectPrimarySamples: withoutExemplars(expected),
expectSecondarySamples: withoutExemplars(expected),
},
{
name: "secondary errors",
primary: teststorage.NewAppendable(),
secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr),
expectAppendErr: true,
expectExemplarError: true,
expectCommitError: true,
expectPrimarySamples: expected,
},
}
}
func TestFanoutAppender(t *testing.T) {
h := tsdbutil.GenerateTestHistogram(0)
fh := tsdbutil.GenerateTestFloatHistogram(0)
ex := exemplar.Exemplar{Value: 1}
expected := []sample{
{L: labels.FromStrings(model.MetricNameLabel, "metric1"), V: 1, ES: []exemplar.Exemplar{ex}},
{L: labels.FromStrings(model.MetricNameLabel, "metric2"), T: 1, H: h},
{L: labels.FromStrings(model.MetricNameLabel, "metric3"), T: 2, FH: fh},
}
for _, tt := range fanoutAppenderTestCases(expected) {
t.Run(tt.name, func(t *testing.T) {
f := storage.NewFanout(nil, mockStorage{app: tt.primary}, mockStorage{app: tt.secondary})
app := f.Appender(t.Context())
ref, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), 0, 1)
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = app.AppendExemplar(ref, labels.FromStrings(model.MetricNameLabel, "metric1"), ex)
if tt.expectExemplarError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric2"), 1, h, nil)
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric3"), 2, nil, fh)
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
err = app.Commit()
if tt.expectCommitError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Nil(t, tt.primary.PendingSamples())
testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples())
require.Nil(t, tt.primary.RolledbackSamples())
require.Nil(t, tt.secondary.PendingSamples())
testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples())
require.Nil(t, tt.secondary.RolledbackSamples())
})
}
}
func TestFanoutAppenderV2(t *testing.T) {
h := tsdbutil.GenerateTestHistogram(0)
fh := tsdbutil.GenerateTestFloatHistogram(0)
ex := exemplar.Exemplar{Value: 1}
expected := []sample{
{L: labels.FromStrings(model.MetricNameLabel, "metric1"), ST: -1, V: 1, ES: []exemplar.Exemplar{ex}},
{L: labels.FromStrings(model.MetricNameLabel, "metric2"), ST: -2, T: 1, H: h},
{L: labels.FromStrings(model.MetricNameLabel, "metric3"), ST: -3, T: 2, FH: fh},
}
for _, tt := range fanoutAppenderTestCases(expected) {
t.Run(tt.name, func(t *testing.T) {
f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary})
app := f.AppenderV2(t.Context())
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), -1, 0, 1, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{ex},
})
switch {
case tt.expectAppendErr:
require.Error(t, err)
case tt.expectExemplarError:
var pErr *storage.AppendPartialError
require.ErrorAs(t, err, &pErr)
// One for primary, one for secondary.
// This is because in V2 flow we must append sample even when first append partially failed with exemplars.
// Filtering out exemplars is neither feasible, nor important.
require.Len(t, pErr.ExemplarErrors, 2)
default:
require.NoError(t, err)
}
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric2"), -2, 1, 0, h, nil, storage.AOptions{})
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric3"), -3, 2, 0, nil, fh, storage.AOptions{})
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
err = app.Commit()
if tt.expectCommitError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Nil(t, tt.primary.PendingSamples())
testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples())
require.Nil(t, tt.primary.RolledbackSamples())
require.Nil(t, tt.secondary.PendingSamples())
testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples())
require.Nil(t, tt.secondary.RolledbackSamples())
})
}
}
// Recommended CLI invocation:
/*
export bench=fanoutAppender && go test ./storage/... \
-run '^$' -bench '^BenchmarkFanoutAppenderV2' \
-benchtime 2s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkFanoutAppenderV2(b *testing.B) {
ex := []exemplar.Exemplar{{Value: 1}}
var series []labels.Labels
for i := range 1000 {
series = append(series, labels.FromStrings(model.MetricNameLabel, "metric1", "i", strconv.Itoa(i)))
}
for _, tt := range fanoutAppenderTestCases(nil) {
// Turn our mock appender into ~noop for no allocs.
tt.primary.SkipRecording(true)
tt.secondary.SkipRecording(true)
b.Run(tt.name, func(b *testing.B) {
f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary})
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
app := f.AppenderV2(b.Context())
for _, s := range series {
// Purposefully skip errors as we want to benchmark error cases too (majority of the fanout logic).
_, _ = app.Append(0, s, 0, 0, 1, nil, nil, storage.AOptions{
Exemplars: ex,
})
}
require.NoError(b, app.Rollback())
}
})
}
}