mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-11 02:41:03 +01:00
* storage: add BenchmarkFanoutAppenderV2 Signed-off-by: bwplotka <bwplotka@gmail.com> * fix: optimized fanoutAppenderV2 Signed-off-by: bwplotka <bwplotka@gmail.com> * optimized more Signed-off-by: bwplotka <bwplotka@gmail.com> --------- Signed-off-by: bwplotka <bwplotka@gmail.com>
605 lines
18 KiB
Go
605 lines
18 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strconv"
|
|
"testing"
|
|
|
|
"github.com/prometheus/common/model"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
|
"github.com/prometheus/prometheus/util/annotations"
|
|
"github.com/prometheus/prometheus/util/teststorage"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
func TestFanout_SelectSorted(t *testing.T) {
|
|
inputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
|
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
|
|
|
inputTotalSize := 0
|
|
ctx := context.Background()
|
|
|
|
priStorage := teststorage.New(t)
|
|
app1 := priStorage.Appender(ctx)
|
|
app1.Append(0, inputLabel, 0, 0)
|
|
inputTotalSize++
|
|
app1.Append(0, inputLabel, 1000, 1)
|
|
inputTotalSize++
|
|
app1.Append(0, inputLabel, 2000, 2)
|
|
inputTotalSize++
|
|
err := app1.Commit()
|
|
require.NoError(t, err)
|
|
|
|
remoteStorage1 := teststorage.New(t)
|
|
app2 := remoteStorage1.Appender(ctx)
|
|
app2.Append(0, inputLabel, 3000, 3)
|
|
inputTotalSize++
|
|
app2.Append(0, inputLabel, 4000, 4)
|
|
inputTotalSize++
|
|
app2.Append(0, inputLabel, 5000, 5)
|
|
inputTotalSize++
|
|
err = app2.Commit()
|
|
require.NoError(t, err)
|
|
|
|
remoteStorage2 := teststorage.New(t)
|
|
|
|
app3 := remoteStorage2.Appender(ctx)
|
|
app3.Append(0, inputLabel, 6000, 6)
|
|
inputTotalSize++
|
|
app3.Append(0, inputLabel, 7000, 7)
|
|
inputTotalSize++
|
|
app3.Append(0, inputLabel, 8000, 8)
|
|
inputTotalSize++
|
|
|
|
err = app3.Commit()
|
|
require.NoError(t, err)
|
|
|
|
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
|
|
|
|
t.Run("querier", func(t *testing.T) {
|
|
querier, err := fanoutStorage.Querier(0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
|
require.NoError(t, err)
|
|
|
|
seriesSet := querier.Select(ctx, true, nil, matcher)
|
|
|
|
result := make(map[int64]float64)
|
|
var labelsResult labels.Labels
|
|
var iterator chunkenc.Iterator
|
|
for seriesSet.Next() {
|
|
series := seriesSet.At()
|
|
seriesLabels := series.Labels()
|
|
labelsResult = seriesLabels
|
|
iterator := series.Iterator(iterator)
|
|
for iterator.Next() == chunkenc.ValFloat {
|
|
timestamp, value := iterator.At()
|
|
result[timestamp] = value
|
|
}
|
|
}
|
|
|
|
require.Equal(t, labelsResult, outputLabel)
|
|
require.Len(t, result, inputTotalSize)
|
|
})
|
|
t.Run("chunk querier", func(t *testing.T) {
|
|
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
|
require.NoError(t, err)
|
|
|
|
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(ctx, true, nil, matcher))
|
|
|
|
result := make(map[int64]float64)
|
|
var labelsResult labels.Labels
|
|
var iterator chunkenc.Iterator
|
|
for seriesSet.Next() {
|
|
series := seriesSet.At()
|
|
seriesLabels := series.Labels()
|
|
labelsResult = seriesLabels
|
|
iterator := series.Iterator(iterator)
|
|
for iterator.Next() == chunkenc.ValFloat {
|
|
timestamp, value := iterator.At()
|
|
result[timestamp] = value
|
|
}
|
|
}
|
|
|
|
require.NoError(t, seriesSet.Err())
|
|
require.Equal(t, labelsResult, outputLabel)
|
|
require.Len(t, result, inputTotalSize)
|
|
})
|
|
}
|
|
|
|
func TestFanout_SelectSorted_AppenderV2(t *testing.T) {
|
|
inputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
|
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
|
|
|
inputTotalSize := 0
|
|
|
|
priStorage := teststorage.New(t)
|
|
app1 := priStorage.AppenderV2(t.Context())
|
|
_, err := app1.Append(0, inputLabel, 0, 0, 0, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
_, err = app1.Append(0, inputLabel, 0, 1000, 1, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
_, err = app1.Append(0, inputLabel, 0, 2000, 2, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
require.NoError(t, app1.Commit())
|
|
|
|
remoteStorage1 := teststorage.New(t)
|
|
app2 := remoteStorage1.AppenderV2(t.Context())
|
|
_, err = app2.Append(0, inputLabel, 0, 3000, 3, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
_, err = app2.Append(0, inputLabel, 0, 4000, 4, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
_, err = app2.Append(0, inputLabel, 0, 5000, 5, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
require.NoError(t, app2.Commit())
|
|
|
|
remoteStorage2 := teststorage.New(t)
|
|
app3 := remoteStorage2.AppenderV2(t.Context())
|
|
_, err = app3.Append(0, inputLabel, 0, 6000, 6, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
_, err = app3.Append(0, inputLabel, 0, 7000, 7, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
_, err = app3.Append(0, inputLabel, 0, 8000, 8, nil, nil, storage.AOptions{})
|
|
require.NoError(t, err)
|
|
inputTotalSize++
|
|
|
|
require.NoError(t, app3.Commit())
|
|
|
|
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
|
|
|
|
t.Run("querier", func(t *testing.T) {
|
|
querier, err := fanoutStorage.Querier(0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
|
require.NoError(t, err)
|
|
|
|
seriesSet := querier.Select(t.Context(), true, nil, matcher)
|
|
|
|
result := make(map[int64]float64)
|
|
var labelsResult labels.Labels
|
|
var iterator chunkenc.Iterator
|
|
for seriesSet.Next() {
|
|
series := seriesSet.At()
|
|
seriesLabels := series.Labels()
|
|
labelsResult = seriesLabels
|
|
iterator := series.Iterator(iterator)
|
|
for iterator.Next() == chunkenc.ValFloat {
|
|
timestamp, value := iterator.At()
|
|
result[timestamp] = value
|
|
}
|
|
}
|
|
|
|
require.Equal(t, labelsResult, outputLabel)
|
|
require.Len(t, result, inputTotalSize)
|
|
})
|
|
t.Run("chunk querier", func(t *testing.T) {
|
|
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
|
require.NoError(t, err)
|
|
|
|
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(t.Context(), true, nil, matcher))
|
|
|
|
result := make(map[int64]float64)
|
|
var labelsResult labels.Labels
|
|
var iterator chunkenc.Iterator
|
|
for seriesSet.Next() {
|
|
series := seriesSet.At()
|
|
seriesLabels := series.Labels()
|
|
labelsResult = seriesLabels
|
|
iterator := series.Iterator(iterator)
|
|
for iterator.Next() == chunkenc.ValFloat {
|
|
timestamp, value := iterator.At()
|
|
result[timestamp] = value
|
|
}
|
|
}
|
|
|
|
require.NoError(t, seriesSet.Err())
|
|
require.Equal(t, labelsResult, outputLabel)
|
|
require.Len(t, result, inputTotalSize)
|
|
})
|
|
}
|
|
|
|
func TestFanoutErrors(t *testing.T) {
|
|
workingStorage := teststorage.New(t)
|
|
|
|
cases := []struct {
|
|
primary storage.Storage
|
|
secondary storage.Storage
|
|
warning error
|
|
err error
|
|
}{
|
|
{
|
|
primary: workingStorage,
|
|
secondary: errStorage{},
|
|
warning: errSelect,
|
|
err: nil,
|
|
},
|
|
{
|
|
primary: errStorage{},
|
|
secondary: workingStorage,
|
|
warning: nil,
|
|
err: errSelect,
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary)
|
|
|
|
t.Run("samples", func(t *testing.T) {
|
|
querier, err := fanoutStorage.Querier(0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
|
|
ss := querier.Select(context.Background(), true, nil, matcher)
|
|
|
|
// Exhaust.
|
|
for ss.Next() {
|
|
ss.At()
|
|
}
|
|
|
|
if tc.err != nil {
|
|
require.EqualError(t, ss.Err(), tc.err.Error())
|
|
}
|
|
|
|
if tc.warning != nil {
|
|
w := ss.Warnings()
|
|
require.NotEmpty(t, w, "warnings expected")
|
|
require.EqualError(t, w.AsErrors()[0], tc.warning.Error())
|
|
}
|
|
})
|
|
t.Run("chunks", func(t *testing.T) {
|
|
t.Skip("enable once TestStorage and TSDB implements ChunkQuerier")
|
|
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
|
|
require.NoError(t, err)
|
|
defer querier.Close()
|
|
|
|
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
|
|
ss := querier.Select(context.Background(), true, nil, matcher)
|
|
|
|
// Exhaust.
|
|
for ss.Next() {
|
|
ss.At()
|
|
}
|
|
|
|
if tc.err != nil {
|
|
require.EqualError(t, ss.Err(), tc.err.Error())
|
|
}
|
|
|
|
if tc.warning != nil {
|
|
w := ss.Warnings()
|
|
require.NotEmpty(t, w, "warnings expected")
|
|
require.EqualError(t, w.AsErrors()[0], tc.warning.Error())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
var errSelect = errors.New("select error")
|
|
|
|
type errStorage struct{}
|
|
|
|
type errQuerier struct{}
|
|
|
|
func (errStorage) Querier(_, _ int64) (storage.Querier, error) {
|
|
return errQuerier{}, nil
|
|
}
|
|
|
|
type errChunkQuerier struct{ errQuerier }
|
|
|
|
func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) {
|
|
return errChunkQuerier{}, nil
|
|
}
|
|
func (errStorage) Appender(context.Context) storage.Appender { return nil }
|
|
func (errStorage) AppenderV2(context.Context) storage.AppenderV2 { return nil }
|
|
func (errStorage) StartTime() (int64, error) { return 0, nil }
|
|
func (errStorage) Close() error { return nil }
|
|
|
|
func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
|
return storage.ErrSeriesSet(errSelect)
|
|
}
|
|
|
|
func (errQuerier) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
|
return nil, nil, errors.New("label values error")
|
|
}
|
|
|
|
func (errQuerier) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
|
return nil, nil, errors.New("label names error")
|
|
}
|
|
|
|
func (errQuerier) Close() error { return nil }
|
|
|
|
func (errChunkQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
|
|
return storage.ErrChunkSeriesSet(errSelect)
|
|
}
|
|
|
|
type mockStorage struct {
|
|
app storage.Appendable
|
|
appV2 storage.AppendableV2
|
|
storage.Storage
|
|
}
|
|
|
|
func (m mockStorage) Appender(ctx context.Context) storage.Appender {
|
|
return m.app.Appender(ctx)
|
|
}
|
|
|
|
func (m mockStorage) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
|
return m.appV2.AppenderV2(ctx)
|
|
}
|
|
|
|
type sample = teststorage.Sample
|
|
|
|
func withoutExemplars(s []sample) (ret []sample) {
|
|
ret = make([]sample, len(s))
|
|
copy(ret, s)
|
|
for i := range ret {
|
|
ret[i].ES = nil
|
|
}
|
|
return ret
|
|
}
|
|
|
|
type fanoutAppenderTestCase struct {
|
|
name string
|
|
primary *teststorage.Appendable
|
|
secondary *teststorage.Appendable
|
|
|
|
expectAppendErr bool
|
|
expectExemplarError bool
|
|
expectCommitError bool
|
|
|
|
expectPrimarySamples []sample
|
|
expectSecondarySamples []sample
|
|
}
|
|
|
|
func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase {
|
|
appErr := errors.New("append test error")
|
|
exErr := errors.New("exemplar test error")
|
|
commitErr := errors.New("commit test error")
|
|
|
|
return []fanoutAppenderTestCase{
|
|
{
|
|
name: "both works",
|
|
primary: teststorage.NewAppendable(),
|
|
secondary: teststorage.NewAppendable(),
|
|
|
|
expectPrimarySamples: expected,
|
|
expectSecondarySamples: expected,
|
|
},
|
|
{
|
|
name: "primary errors",
|
|
primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr),
|
|
secondary: teststorage.NewAppendable(),
|
|
|
|
expectAppendErr: true,
|
|
expectExemplarError: true,
|
|
expectCommitError: true,
|
|
},
|
|
{
|
|
name: "exemplar errors",
|
|
primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil),
|
|
secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil),
|
|
|
|
expectAppendErr: false,
|
|
expectExemplarError: true,
|
|
expectCommitError: false,
|
|
|
|
expectPrimarySamples: withoutExemplars(expected),
|
|
expectSecondarySamples: withoutExemplars(expected),
|
|
},
|
|
{
|
|
name: "secondary errors",
|
|
primary: teststorage.NewAppendable(),
|
|
secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr),
|
|
|
|
expectAppendErr: true,
|
|
expectExemplarError: true,
|
|
expectCommitError: true,
|
|
|
|
expectPrimarySamples: expected,
|
|
},
|
|
}
|
|
}
|
|
|
|
func TestFanoutAppender(t *testing.T) {
|
|
h := tsdbutil.GenerateTestHistogram(0)
|
|
fh := tsdbutil.GenerateTestFloatHistogram(0)
|
|
ex := exemplar.Exemplar{Value: 1}
|
|
|
|
expected := []sample{
|
|
{L: labels.FromStrings(model.MetricNameLabel, "metric1"), V: 1, ES: []exemplar.Exemplar{ex}},
|
|
{L: labels.FromStrings(model.MetricNameLabel, "metric2"), T: 1, H: h},
|
|
{L: labels.FromStrings(model.MetricNameLabel, "metric3"), T: 2, FH: fh},
|
|
}
|
|
for _, tt := range fanoutAppenderTestCases(expected) {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
f := storage.NewFanout(nil, mockStorage{app: tt.primary}, mockStorage{app: tt.secondary})
|
|
|
|
app := f.Appender(t.Context())
|
|
ref, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), 0, 1)
|
|
if tt.expectAppendErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
_, err = app.AppendExemplar(ref, labels.FromStrings(model.MetricNameLabel, "metric1"), ex)
|
|
if tt.expectExemplarError {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric2"), 1, h, nil)
|
|
if tt.expectAppendErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric3"), 2, nil, fh)
|
|
if tt.expectAppendErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
err = app.Commit()
|
|
if tt.expectCommitError {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
require.Nil(t, tt.primary.PendingSamples())
|
|
testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples())
|
|
require.Nil(t, tt.primary.RolledbackSamples())
|
|
|
|
require.Nil(t, tt.secondary.PendingSamples())
|
|
testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples())
|
|
require.Nil(t, tt.secondary.RolledbackSamples())
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFanoutAppenderV2(t *testing.T) {
|
|
h := tsdbutil.GenerateTestHistogram(0)
|
|
fh := tsdbutil.GenerateTestFloatHistogram(0)
|
|
ex := exemplar.Exemplar{Value: 1}
|
|
|
|
expected := []sample{
|
|
{L: labels.FromStrings(model.MetricNameLabel, "metric1"), ST: -1, V: 1, ES: []exemplar.Exemplar{ex}},
|
|
{L: labels.FromStrings(model.MetricNameLabel, "metric2"), ST: -2, T: 1, H: h},
|
|
{L: labels.FromStrings(model.MetricNameLabel, "metric3"), ST: -3, T: 2, FH: fh},
|
|
}
|
|
|
|
for _, tt := range fanoutAppenderTestCases(expected) {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary})
|
|
|
|
app := f.AppenderV2(t.Context())
|
|
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), -1, 0, 1, nil, nil, storage.AOptions{
|
|
Exemplars: []exemplar.Exemplar{ex},
|
|
})
|
|
switch {
|
|
case tt.expectAppendErr:
|
|
require.Error(t, err)
|
|
case tt.expectExemplarError:
|
|
var pErr *storage.AppendPartialError
|
|
require.ErrorAs(t, err, &pErr)
|
|
// One for primary, one for secondary.
|
|
// This is because in V2 flow we must append sample even when first append partially failed with exemplars.
|
|
// Filtering out exemplars is neither feasible, nor important.
|
|
require.Len(t, pErr.ExemplarErrors, 2)
|
|
default:
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric2"), -2, 1, 0, h, nil, storage.AOptions{})
|
|
if tt.expectAppendErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric3"), -3, 2, 0, nil, fh, storage.AOptions{})
|
|
if tt.expectAppendErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
err = app.Commit()
|
|
if tt.expectCommitError {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
require.Nil(t, tt.primary.PendingSamples())
|
|
testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples())
|
|
require.Nil(t, tt.primary.RolledbackSamples())
|
|
|
|
require.Nil(t, tt.secondary.PendingSamples())
|
|
testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples())
|
|
require.Nil(t, tt.secondary.RolledbackSamples())
|
|
})
|
|
}
|
|
}
|
|
|
|
// Recommended CLI invocation:
|
|
/*
|
|
export bench=fanoutAppender && go test ./storage/... \
|
|
-run '^$' -bench '^BenchmarkFanoutAppenderV2' \
|
|
-benchtime 2s -count 6 -cpu 2 -timeout 999m \
|
|
| tee ${bench}.txt
|
|
*/
|
|
func BenchmarkFanoutAppenderV2(b *testing.B) {
|
|
ex := []exemplar.Exemplar{{Value: 1}}
|
|
|
|
var series []labels.Labels
|
|
for i := range 1000 {
|
|
series = append(series, labels.FromStrings(model.MetricNameLabel, "metric1", "i", strconv.Itoa(i)))
|
|
}
|
|
for _, tt := range fanoutAppenderTestCases(nil) {
|
|
// Turn our mock appender into ~noop for no allocs.
|
|
tt.primary.SkipRecording(true)
|
|
tt.secondary.SkipRecording(true)
|
|
|
|
b.Run(tt.name, func(b *testing.B) {
|
|
f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary})
|
|
|
|
b.ReportAllocs()
|
|
b.ResetTimer()
|
|
for b.Loop() {
|
|
app := f.AppenderV2(b.Context())
|
|
for _, s := range series {
|
|
// Purposefully skip errors as we want to benchmark error cases too (majority of the fanout logic).
|
|
_, _ = app.Append(0, s, 0, 0, 1, nil, nil, storage.AOptions{
|
|
Exemplars: ex,
|
|
})
|
|
}
|
|
require.NoError(b, app.Rollback())
|
|
}
|
|
})
|
|
}
|
|
}
|