mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-04 20:06:12 +02:00
feat(teststorage)[PART4a]: Add AppendableV2 support for mock Appendable (#17834)
* feat(teststorage)[PART4a]: Add AppendableV2 support for mock Appendable Signed-off-by: bwplotka <bwplotka@gmail.com> * fix: adjusted AppenderV1 flow for reliability Found in https://github.com/prometheus/prometheus/pull/17838 and by Krajo comment Signed-off-by: bwplotka <bwplotka@gmail.com> * addressed comments Signed-off-by: bwplotka <bwplotka@gmail.com> * fix broken appV2 commit and rollback; added tests Signed-off-by: bwplotka <bwplotka@gmail.com> --------- Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
c7bc56cf6c
commit
3374d2e56f
@ -65,13 +65,17 @@ func (s Sample) String() string {
|
||||
// Print all value types on purpose, to catch bugs for appending multiple sample types at once.
|
||||
h := ""
|
||||
if s.H != nil {
|
||||
h = s.H.String()
|
||||
h = " " + s.H.String()
|
||||
}
|
||||
fh := ""
|
||||
if s.FH != nil {
|
||||
fh = s.FH.String()
|
||||
fh = " " + s.FH.String()
|
||||
}
|
||||
b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T))
|
||||
b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v", s.L.String(), s.V, h, fh, s.ST, s.T))
|
||||
if len(s.ES) > 0 {
|
||||
b.WriteString(fmt.Sprintf(" %v", s.ES))
|
||||
}
|
||||
b.WriteString("\n")
|
||||
return b.String()
|
||||
}
|
||||
|
||||
@ -104,7 +108,8 @@ type Appendable struct {
|
||||
rolledbackSamples []Sample
|
||||
|
||||
// Optional chain (Appender will collect samples, then run next).
|
||||
next storage.Appendable
|
||||
next storage.Appendable
|
||||
nextV2 storage.AppendableV2
|
||||
}
|
||||
|
||||
// NewAppendable returns mock Appendable.
|
||||
@ -112,12 +117,18 @@ func NewAppendable() *Appendable {
|
||||
return &Appendable{}
|
||||
}
|
||||
|
||||
// Then chains another appender from the provided appendable for the Appender calls.
|
||||
// Then chains another appender from the provided Appendable for the Appender calls.
|
||||
func (a *Appendable) Then(appendable storage.Appendable) *Appendable {
|
||||
a.next = appendable
|
||||
return a
|
||||
}
|
||||
|
||||
// ThenV2 chains another appenderV2 from the provided AppendableV2 for the AppenderV2 calls.
|
||||
func (a *Appendable) ThenV2(appendable storage.AppendableV2) *Appendable {
|
||||
a.nextV2 = appendable
|
||||
return a
|
||||
}
|
||||
|
||||
// WithErrs allows injecting errors to the appender.
|
||||
func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendExemplarsError, commitErr error) *Appendable {
|
||||
a.appendErrFn = appendErrFn
|
||||
@ -130,6 +141,9 @@ func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendEx
|
||||
func (a *Appendable) PendingSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
if len(a.pendingSamples) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := make([]Sample, len(a.pendingSamples))
|
||||
copy(ret, a.pendingSamples)
|
||||
@ -140,6 +154,9 @@ func (a *Appendable) PendingSamples() []Sample {
|
||||
func (a *Appendable) ResultSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
if len(a.resultSamples) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := make([]Sample, len(a.resultSamples))
|
||||
copy(ret, a.resultSamples)
|
||||
@ -150,6 +167,9 @@ func (a *Appendable) ResultSamples() []Sample {
|
||||
func (a *Appendable) RolledbackSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
if len(a.rolledbackSamples) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := make([]Sample, len(a.rolledbackSamples))
|
||||
copy(ret, a.rolledbackSamples)
|
||||
@ -205,28 +225,77 @@ func (a *Appendable) String() string {
|
||||
|
||||
var errClosedAppender = errors.New("appender was already committed/rolledback")
|
||||
|
||||
type appender struct {
|
||||
err error
|
||||
next storage.Appender
|
||||
type baseAppender struct {
|
||||
err error
|
||||
|
||||
a *Appendable
|
||||
nextTr storage.AppenderTransaction
|
||||
a *Appendable
|
||||
}
|
||||
|
||||
func (a *appender) checkErr() error {
|
||||
func (a *baseAppender) checkErr() error {
|
||||
a.a.mtx.Lock()
|
||||
defer a.a.mtx.Unlock()
|
||||
|
||||
return a.err
|
||||
}
|
||||
|
||||
func (a *baseAppender) Commit() error {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.a.openAppenders.Dec()
|
||||
|
||||
if a.a.commitErr != nil {
|
||||
return a.a.commitErr
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.resultSamples = append(a.a.resultSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
a.err = errClosedAppender
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
if a.nextTr != nil {
|
||||
return a.nextTr.Commit()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *baseAppender) Rollback() error {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.a.openAppenders.Dec()
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.rolledbackSamples = append(a.a.rolledbackSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
a.err = errClosedAppender
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
if a.nextTr != nil {
|
||||
return a.nextTr.Rollback()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type appender struct {
|
||||
baseAppender
|
||||
|
||||
next storage.Appender
|
||||
}
|
||||
|
||||
func (a *Appendable) Appender(ctx context.Context) storage.Appender {
|
||||
ret := &appender{a: a}
|
||||
ret := &appender{baseAppender: baseAppender{a: a}}
|
||||
if a.openAppenders.Inc() > 1 {
|
||||
ret.err = errors.New("teststorage.Appendable.Appender() concurrent use is not supported; attempted opening new Appender() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed")
|
||||
}
|
||||
|
||||
if a.next != nil {
|
||||
ret.next = a.next.Appender(ctx)
|
||||
app := a.next.Appender(ctx)
|
||||
ret.next, ret.nextTr = app, app
|
||||
} else if a.nextV2 != nil {
|
||||
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.Appender() invoked with .ThenV2 but no .Then was supplied; likely bug"))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
@ -264,7 +333,7 @@ func computeOrCheckRef(ref storage.SeriesRef, ls labels.Labels) (storage.SeriesR
|
||||
|
||||
if storage.SeriesRef(h) != ref {
|
||||
// Check for buggy ref while we at it.
|
||||
return 0, errors.New("teststorage.appender: found input ref not matching labels; potential bug in Appendable user")
|
||||
return 0, errors.New("teststorage.appender: found input ref not matching labels; potential bug in Appendable usage")
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
@ -297,6 +366,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
|
||||
if a.a.appendExemplarsError != nil {
|
||||
return 0, a.a.appendExemplarsError
|
||||
}
|
||||
var appended bool
|
||||
|
||||
a.a.mtx.Lock()
|
||||
// NOTE(bwplotka): Eventually exemplar has to be attached to a series and soon
|
||||
@ -306,11 +376,12 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
|
||||
for ; i >= 0; i-- { // Attach exemplars to the last matching sample.
|
||||
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
|
||||
a.a.pendingSamples[i].ES = append(a.a.pendingSamples[i].ES, e)
|
||||
appended = true
|
||||
break
|
||||
}
|
||||
}
|
||||
a.a.mtx.Unlock()
|
||||
if i < 0 {
|
||||
if !appended {
|
||||
return 0, fmt.Errorf("teststorage.appender: exemplar appender without series; ref %v; l %v; exemplar: %v", ref, l, e)
|
||||
}
|
||||
|
||||
@ -336,6 +407,8 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var updated bool
|
||||
|
||||
a.a.mtx.Lock()
|
||||
// NOTE(bwplotka): Eventually metadata has to be attached to a series and soon
|
||||
// the AppenderV2 will guarantee that for TSDB. Assume this from the mock perspective
|
||||
@ -344,11 +417,12 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta
|
||||
for ; i >= 0; i-- { // Attach metadata to the last matching sample.
|
||||
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
|
||||
a.a.pendingSamples[i].M = m
|
||||
updated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
a.a.mtx.Unlock()
|
||||
if i < 0 {
|
||||
if !updated {
|
||||
return 0, fmt.Errorf("teststorage.appender: metadata update without series; ref %v; l %v; m: %v", ref, l, m)
|
||||
}
|
||||
|
||||
@ -358,42 +432,75 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta
|
||||
return computeOrCheckRef(ref, l)
|
||||
}
|
||||
|
||||
func (a *appender) Commit() error {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.a.openAppenders.Dec()
|
||||
type appenderV2 struct {
|
||||
baseAppender
|
||||
|
||||
if a.a.commitErr != nil {
|
||||
return a.a.commitErr
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.resultSamples = append(a.a.resultSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
a.err = errClosedAppender
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
if a.a.next != nil {
|
||||
return a.next.Commit()
|
||||
}
|
||||
return nil
|
||||
next storage.AppenderV2
|
||||
}
|
||||
|
||||
func (a *appender) Rollback() error {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return err
|
||||
func (a *Appendable) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
||||
ret := &appenderV2{baseAppender: baseAppender{a: a}}
|
||||
if a.openAppenders.Inc() > 1 {
|
||||
ret.err = errors.New("teststorage.Appendable.AppenderV2() concurrent use is not supported; attempted opening new AppenderV2() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed")
|
||||
}
|
||||
|
||||
if a.nextV2 != nil {
|
||||
app := a.nextV2.AppenderV2(ctx)
|
||||
ret.next, ret.nextTr = app, app
|
||||
} else if a.next != nil {
|
||||
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.AppenderV2() invoked with .Then but no .ThenV2 was supplied; likely bug"))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if a.a.appendErrFn != nil {
|
||||
if err := a.a.appendErrFn(ls); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
defer a.a.openAppenders.Dec()
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.rolledbackSamples = append(a.a.rolledbackSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
a.err = errClosedAppender
|
||||
var es []exemplar.Exemplar
|
||||
if len(opts.Exemplars) > 0 {
|
||||
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
|
||||
es = make([]exemplar.Exemplar, len(opts.Exemplars))
|
||||
copy(es, opts.Exemplars)
|
||||
}
|
||||
a.a.pendingSamples = append(a.a.pendingSamples, Sample{
|
||||
MF: opts.MetricFamilyName,
|
||||
M: opts.Metadata,
|
||||
L: ls,
|
||||
ST: st, T: t,
|
||||
V: v, H: h, FH: fh,
|
||||
ES: es,
|
||||
})
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
var partialErr error
|
||||
if a.a.appendExemplarsError != nil {
|
||||
var exErrs []error
|
||||
for range opts.Exemplars {
|
||||
exErrs = append(exErrs, a.a.appendExemplarsError)
|
||||
}
|
||||
if len(exErrs) > 0 {
|
||||
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
|
||||
}
|
||||
}
|
||||
|
||||
if a.next != nil {
|
||||
return a.next.Rollback()
|
||||
ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts)
|
||||
if err != nil {
|
||||
return ref, err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
ref, err = computeOrCheckRef(ref, ls)
|
||||
if err != nil {
|
||||
return ref, err
|
||||
}
|
||||
return ref, partialErr
|
||||
}
|
||||
|
||||
@ -19,62 +19,191 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func testAppendableV1(t *testing.T, appTest *Appendable, a storage.Appendable) {
|
||||
for _, commit := range []bool{true, false} {
|
||||
appTest.ResultReset()
|
||||
|
||||
app := a.Appender(t.Context())
|
||||
|
||||
ref1, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "test_metric1", "app", "v1"), 1, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
h := tsdbutil.GenerateTestHistogram(0)
|
||||
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "test_metric2", "app", "v1"), 2, h, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
fh := tsdbutil.GenerateTestFloatHistogram(0)
|
||||
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "test_metric3", "app", "v1"), 3, nil, fh)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Update meta of first series.
|
||||
m1 := metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}
|
||||
_, err = app.UpdateMetadata(ref1, labels.FromStrings(model.MetricNameLabel, "test_metric1", "app", "v1"), m1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Add exemplars to the first series.
|
||||
e1 := exemplar.Exemplar{Labels: labels.FromStrings(model.MetricNameLabel, "yolo"), HasTs: true, Ts: 1}
|
||||
_, err = app.AppendExemplar(ref1, labels.FromStrings(model.MetricNameLabel, "test_metric1", "app", "v1"), e1)
|
||||
require.NoError(t, err)
|
||||
|
||||
exp := []Sample{
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric1", "app", "v1"), M: m1, T: 1, V: 2, ES: []exemplar.Exemplar{e1}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "app", "v1"), T: 2, H: h},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric3", "app", "v1"), T: 3, FH: fh},
|
||||
}
|
||||
testutil.RequireEqual(t, exp, appTest.PendingSamples())
|
||||
require.Nil(t, appTest.ResultSamples())
|
||||
require.Nil(t, appTest.RolledbackSamples())
|
||||
|
||||
if commit {
|
||||
require.NoError(t, app.Commit())
|
||||
require.Nil(t, appTest.PendingSamples())
|
||||
testutil.RequireEqual(t, exp, appTest.ResultSamples())
|
||||
require.Nil(t, appTest.RolledbackSamples())
|
||||
break
|
||||
}
|
||||
|
||||
require.NoError(t, app.Rollback())
|
||||
require.Nil(t, appTest.PendingSamples())
|
||||
require.Nil(t, appTest.ResultSamples())
|
||||
testutil.RequireEqual(t, exp, appTest.RolledbackSamples())
|
||||
}
|
||||
}
|
||||
|
||||
func testAppendableV2(t *testing.T, appTest *Appendable, a storage.AppendableV2) {
|
||||
for _, commit := range []bool{true, false} {
|
||||
appTest.ResultReset()
|
||||
|
||||
app := a.AppenderV2(t.Context())
|
||||
|
||||
m1 := metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}
|
||||
e1 := exemplar.Exemplar{Labels: labels.FromStrings(model.MetricNameLabel, "yolo"), HasTs: true, Ts: 1}
|
||||
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "test_metric1", "app", "v2"), -1, 1, 2, nil, nil, storage.AOptions{
|
||||
MetricFamilyName: "test_metric1",
|
||||
Metadata: m1,
|
||||
Exemplars: []exemplar.Exemplar{e1},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
h := tsdbutil.GenerateTestHistogram(0)
|
||||
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "test_metric2", "app", "v2"), -2, 2, 0, h, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
fh := tsdbutil.GenerateTestFloatHistogram(0)
|
||||
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "test_metric3", "app", "v2"), -3, 3, 0, nil, fh, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
exp := []Sample{
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric1", "app", "v2"), MF: "test_metric1", M: m1, ST: -1, T: 1, V: 2, ES: []exemplar.Exemplar{e1}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "app", "v2"), ST: -2, T: 2, H: h},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric3", "app", "v2"), ST: -3, T: 3, FH: fh},
|
||||
}
|
||||
testutil.RequireEqual(t, exp, appTest.PendingSamples())
|
||||
require.Nil(t, appTest.ResultSamples())
|
||||
require.Nil(t, appTest.RolledbackSamples())
|
||||
|
||||
if commit {
|
||||
require.NoError(t, app.Commit())
|
||||
require.Nil(t, appTest.PendingSamples())
|
||||
testutil.RequireEqual(t, exp, appTest.ResultSamples())
|
||||
require.Nil(t, appTest.RolledbackSamples())
|
||||
break
|
||||
}
|
||||
|
||||
require.NoError(t, app.Rollback())
|
||||
require.Nil(t, appTest.PendingSamples())
|
||||
require.Nil(t, appTest.ResultSamples())
|
||||
testutil.RequireEqual(t, exp, appTest.RolledbackSamples())
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppendable(t *testing.T) {
|
||||
appTest := NewAppendable()
|
||||
testAppendableV1(t, appTest, appTest)
|
||||
testAppendableV2(t, appTest, appTest)
|
||||
}
|
||||
|
||||
func TestAppendable_Then(t *testing.T) {
|
||||
nextAppTest := NewAppendable()
|
||||
app := NewAppendable().Then(nextAppTest)
|
||||
|
||||
// Ensure next mock record all the appends when appending to app.
|
||||
testAppendableV1(t, nextAppTest, app)
|
||||
|
||||
// V2 should fail as Then was supplied with Appendable V1.
|
||||
require.Error(t, app.AppenderV2(t.Context()).Commit())
|
||||
}
|
||||
|
||||
func TestAppendable_ThenV2(t *testing.T) {
|
||||
nextAppTest := NewAppendable()
|
||||
app := NewAppendable().ThenV2(nextAppTest)
|
||||
|
||||
// Ensure next mock record all the appends when appending to app.
|
||||
testAppendableV2(t, nextAppTest, app)
|
||||
|
||||
// V1 should fail as ThenV2 was supplied with Appendable V2.
|
||||
require.Error(t, app.Appender(t.Context()).Commit())
|
||||
}
|
||||
|
||||
// TestSample_RequireEqual ensures standard testutil.RequireEqual is enough for comparisons.
|
||||
// This is thanks to the fact metadata has now Equals method.
|
||||
func TestSample_RequireEqual(t *testing.T) {
|
||||
a := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
|
||||
}
|
||||
testutil.RequireEqual(t, a, a)
|
||||
|
||||
b1 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2_diff", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123}, // test_metric2_diff is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2_diff", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123}, // test_metric2_diff is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
|
||||
}
|
||||
requireNotEqual(t, a, b1)
|
||||
|
||||
b2 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo2")}}}, // exemplar is different.
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo2")}}}, // exemplar is different.
|
||||
}
|
||||
requireNotEqual(t, a, b2)
|
||||
|
||||
b3 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123, T: 123}, // Timestamp is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123, T: 123}, // Timestamp is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
|
||||
}
|
||||
requireNotEqual(t, a, b3)
|
||||
|
||||
b4 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 456.456}, // Value is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 456.456}, // Value is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
|
||||
}
|
||||
requireNotEqual(t, a, b4)
|
||||
|
||||
b5 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter2", Unit: "metric", Help: "some help text"}}, // Different type.
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter2", Unit: "metric", Help: "some help text"}}, // Different type.
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
|
||||
}
|
||||
requireNotEqual(t, a, b5)
|
||||
}
|
||||
@ -129,3 +258,40 @@ func TestConcurrentAppender_ReturnsErrAppender(t *testing.T) {
|
||||
require.Error(t, app.Commit())
|
||||
require.Error(t, app.Rollback())
|
||||
}
|
||||
|
||||
func TestConcurrentAppenderV2_ReturnsErrAppender(t *testing.T) {
|
||||
a := NewAppendable()
|
||||
|
||||
// Non-concurrent multiple use if fine.
|
||||
app := a.AppenderV2(t.Context())
|
||||
require.Equal(t, int32(1), a.openAppenders.Load())
|
||||
require.NoError(t, app.Commit())
|
||||
// Repeated commit fails.
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
app = a.AppenderV2(t.Context())
|
||||
require.NoError(t, app.Rollback())
|
||||
// Commit after rollback fails.
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
a.WithErrs(
|
||||
nil,
|
||||
nil,
|
||||
errors.New("commit err"),
|
||||
)
|
||||
app = a.AppenderV2(t.Context())
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
a.WithErrs(nil, nil, nil)
|
||||
app = a.AppenderV2(t.Context())
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, int32(0), a.openAppenders.Load())
|
||||
|
||||
// Concurrent use should return appender that errors.
|
||||
_ = a.AppenderV2(t.Context())
|
||||
app = a.AppenderV2(t.Context())
|
||||
_, err := app.Append(0, labels.EmptyLabels(), 0, 0, 0, nil, nil, storage.AOptions{})
|
||||
require.Error(t, err)
|
||||
require.Error(t, app.Commit())
|
||||
require.Error(t, app.Rollback())
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user