bwplotka 652ea5541b feat(storage): switch to AppenderV2 (to split)
Signed-off-by: bwplotka <bwplotka@gmail.com>
2025-12-01 10:58:28 +00:00

255 lines
6.1 KiB
Go

package teststorage
import (
"bytes"
"context"
"fmt"
"math"
"strings"
"sync"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
)
// Sample represents test, combined sample for mocking storage.AppenderV2.
type Sample struct {
MF string
L labels.Labels
M metadata.Metadata
ST, T int64
V float64
H *histogram.Histogram
FH *histogram.FloatHistogram
ES []exemplar.Exemplar
}
func (s Sample) String() string {
b := bytes.Buffer{}
if s.M.Help != "" {
_, _ = fmt.Fprintf(&b, "HELP %s\n", s.M.Help)
}
if s.M.Type != model.MetricTypeUnknown && s.M.Type != "" {
_, _ = fmt.Fprintf(&b, "type@%s ", s.M.Type)
}
if s.M.Unit != "" {
_, _ = fmt.Fprintf(&b, "unit@%s ", s.M.Unit)
}
h := ""
if s.H != nil {
h = s.H.String()
}
fh := ""
if s.FH != nil {
fh = s.FH.String()
}
_, _ = fmt.Fprintf(&b, "%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T)
return b.String()
}
func (s Sample) exemplarsEqual(other []exemplar.Exemplar) bool {
if len(s.ES) != len(other) {
return false
}
for i := range s.ES {
if !s.ES[i].Equals(other[i]) {
return false
}
}
return true
}
func (s Sample) Equal(other Sample) bool {
return strings.Compare(s.MF, other.MF) == 0 &&
labels.Equal(s.L, other.L) &&
s.M.Equals(other.M) &&
s.ST == other.ST &&
s.T == other.T &&
math.Float64bits(s.V) == math.Float64bits(s.V) && // Compare Float64bits so NaN values which are exactly the same will compare equal.
s.H.Equals(other.H) &&
s.FH.Equals(other.FH) &&
s.exemplarsEqual(other.ES)
}
// Appender is a storage.AppenderV2 mock.
// It allows:
// * recording all samples that were added through the appender.
// * optionally backed by another appender it writes samples through (Next).
// * optionally runs another appender before result recording e.g. to simulate chained validation (Prev)
// TODO(bwplotka): Move to storage/interface/mock or something?
type Appender struct {
Prev storage.AppendableV2 // Optional appender to run before the result collection.
Next storage.AppendableV2 // Optional appender to run after results are collected (e.g. TestStorage).
AppendErr error // Inject appender error on every Append run.
AppendAllExemplarsError error // Inject storage.AppendPartialError for all exemplars.
CommitErr error // Inject commit error.
mtx sync.Mutex // mutex for result writes and ResultSamplesGreaterThan read.
// Recorded results.
PendingSamples []Sample
ResultSamples []Sample
RolledbackSamples []Sample
}
func (a *Appender) ResultReset() {
a.PendingSamples = a.PendingSamples[:0]
a.ResultSamples = a.ResultSamples[:0]
a.RolledbackSamples = a.RolledbackSamples[:0]
}
func (a *Appender) ResultSamplesGreaterThan(than int) bool {
a.mtx.Lock()
defer a.mtx.Unlock()
return len(a.ResultSamples) > than
}
// ResultMetadata returns ResultSamples with samples only containing L and M.
// This is for compatibility with old tests that only focus on metadata.
//
// Deprecated: Rewrite tests to test metadata on ResultSamples instead.
func (a *Appender) ResultMetadata() []Sample {
var ret []Sample
for _, s := range a.ResultSamples {
ret = append(ret, Sample{L: s.L, M: s.M})
}
return ret
}
func (a *Appender) String() string {
var sb strings.Builder
sb.WriteString("committed:\n")
for _, s := range a.ResultSamples {
sb.WriteString("\n")
sb.WriteString(s.String())
}
sb.WriteString("pending:\n")
for _, s := range a.PendingSamples {
sb.WriteString("\n")
sb.WriteString(s.String())
}
sb.WriteString("rolledback:\n")
for _, s := range a.RolledbackSamples {
sb.WriteString("\n")
sb.WriteString(s.String())
}
return sb.String()
}
func NewAppender() *Appender {
return &Appender{}
}
type appender struct {
prev storage.AppenderV2
next storage.AppenderV2
*Appender
}
func (a *Appender) AppenderV2(ctx context.Context) storage.AppenderV2 {
ret := &appender{Appender: a}
if a.Prev != nil {
ret.prev = a.Prev.AppenderV2(ctx)
}
if a.Next != nil {
ret.next = a.Next.AppenderV2(ctx)
}
return ret
}
func (a *appender) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
if a.Prev != nil {
if _, err := a.prev.Append(ref, ls, st, t, v, h, fh, opts); err != nil {
return 0, err
}
}
if a.AppendErr != nil {
return 0, a.AppendErr
}
a.mtx.Lock()
a.PendingSamples = append(a.PendingSamples, Sample{
MF: opts.MetricFamilyName,
M: opts.Metadata,
L: ls,
ST: st, T: t,
V: v, H: h, FH: fh,
ES: opts.Exemplars,
})
a.mtx.Unlock()
var err error
if a.AppendAllExemplarsError != nil {
var exErrs []error
for range opts.Exemplars {
exErrs = append(exErrs, a.AppendAllExemplarsError)
}
if len(exErrs) > 0 {
err = &storage.AppendPartialError{ExemplarErrors: exErrs}
}
if ref == 0 {
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
ref = storage.SeriesRef(ls.Hash())
}
return ref, err
}
if a.next != nil {
return a.next.Append(ref, ls, st, t, v, h, fh, opts)
}
if ref == 0 {
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
ref = storage.SeriesRef(ls.Hash())
}
return ref, nil
}
func (a *appender) Commit() error {
if a.Prev != nil {
if err := a.prev.Commit(); err != nil {
return err
}
}
if a.CommitErr != nil {
return a.CommitErr
}
a.mtx.Lock()
a.ResultSamples = append(a.ResultSamples, a.PendingSamples...)
a.PendingSamples = a.PendingSamples[:0]
a.mtx.Unlock()
if a.next != nil {
return a.next.Commit()
}
return nil
}
func (a *appender) Rollback() error {
if a.prev != nil {
if err := a.prev.Rollback(); err != nil {
return err
}
}
a.mtx.Lock()
a.RolledbackSamples = append(a.RolledbackSamples, a.PendingSamples...)
a.PendingSamples = a.PendingSamples[:0]
a.mtx.Unlock()
if a.next != nil {
return a.next.Rollback()
}
return nil
}