feat(storage)[PART4b]: add AppenderV2 to the rest of storage.Storage impl

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-01-12 08:45:26 +00:00
parent 0e2569ad33
commit 49ea7b05ab
8 changed files with 267 additions and 18 deletions

View File

@ -1746,6 +1746,14 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
return notReadyAppender{}
}
// AppenderV2 implements the Storage interface.
func (s *readyStorage) AppenderV2(ctx context.Context) storage.AppenderV2 {
if x := s.get(); x != nil {
return x.AppenderV2(ctx)
}
return notReadyAppenderV2{}
}
type notReadyAppender struct{}
// SetOptions does nothing in this appender implementation.
@ -1779,6 +1787,15 @@ func (notReadyAppender) Commit() error { return tsdb.ErrNotReady }
func (notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
type notReadyAppenderV2 struct{}
func (notReadyAppenderV2) Append(storage.SeriesRef, labels.Labels, int64, int64, float64, *histogram.Histogram, *histogram.FloatHistogram, storage.AOptions) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (notReadyAppenderV2) Commit() error { return tsdb.ErrNotReady }
func (notReadyAppenderV2) Rollback() error { return tsdb.ErrNotReady }
// Close implements the Storage interface.
func (s *readyStorage) Close() error {
if x := s.get(); x != nil {

View File

@ -130,6 +130,19 @@ func (f *fanout) Appender(ctx context.Context) Appender {
}
}
func (f *fanout) AppenderV2(ctx context.Context) AppenderV2 {
primary := f.primary.AppenderV2(ctx)
secondaries := make([]AppenderV2, 0, len(f.secondaries))
for _, storage := range f.secondaries {
secondaries = append(secondaries, storage.AppenderV2(ctx))
}
return &fanoutAppenderV2{
logger: f.logger,
primary: primary,
secondaries: secondaries,
}
}
// Close closes the storage and all its underlying resources.
func (f *fanout) Close() error {
errs := tsdb_errors.NewMulti(f.primary.Close())
@ -270,3 +283,54 @@ func (f *fanoutAppender) Rollback() (err error) {
}
return nil
}
type fanoutAppenderV2 struct {
logger *slog.Logger
primary AppenderV2
secondaries []AppenderV2
}
func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) {
ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppenderV2) Commit() (err error) {
err = f.primary.Commit()
for _, appender := range f.secondaries {
if err == nil {
err = appender.Commit()
} else {
if rollbackErr := appender.Rollback(); rollbackErr != nil {
f.logger.Error("Squashed rollback error on commit", "err", rollbackErr)
}
}
}
return err
}
func (f *fanoutAppenderV2) Rollback() (err error) {
err = f.primary.Rollback()
for _, appender := range f.secondaries {
rollbackErr := appender.Rollback()
switch {
case err == nil:
err = rollbackErr
case rollbackErr != nil:
f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr)
}
}
return nil
}

View File

@ -132,6 +132,115 @@ func TestFanout_SelectSorted(t *testing.T) {
})
}
func TestFanout_SelectSorted_AppenderV2(t *testing.T) {
inputLabel := labels.FromStrings(model.MetricNameLabel, "a")
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
inputTotalSize := 0
priStorage := teststorage.New(t)
defer priStorage.Close()
app1 := priStorage.AppenderV2(t.Context())
_, err := app1.Append(0, inputLabel, 0, 0, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app1.Append(0, inputLabel, 0, 1000, 1, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app1.Append(0, inputLabel, 0, 2000, 2, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
require.NoError(t, app1.Commit())
remoteStorage1 := teststorage.New(t)
defer remoteStorage1.Close()
app2 := remoteStorage1.AppenderV2(t.Context())
_, err = app2.Append(0, inputLabel, 0, 3000, 3, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app2.Append(0, inputLabel, 0, 4000, 4, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app2.Append(0, inputLabel, 0, 5000, 5, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
require.NoError(t, app2.Commit())
remoteStorage2 := teststorage.New(t)
defer remoteStorage2.Close()
app3 := remoteStorage2.AppenderV2(t.Context())
_, err = app3.Append(0, inputLabel, 0, 6000, 6, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app3.Append(0, inputLabel, 0, 7000, 7, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
_, err = app3.Append(0, inputLabel, 0, 8000, 8, nil, nil, storage.AOptions{})
require.NoError(t, err)
inputTotalSize++
require.NoError(t, app3.Commit())
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
t.Run("querier", func(t *testing.T) {
querier, err := fanoutStorage.Querier(0, 8000)
require.NoError(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
require.NoError(t, err)
seriesSet := querier.Select(t.Context(), true, nil, matcher)
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
require.Equal(t, labelsResult, outputLabel)
require.Len(t, result, inputTotalSize)
})
t.Run("chunk querier", func(t *testing.T) {
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
require.NoError(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
require.NoError(t, err)
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(t.Context(), true, nil, matcher))
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
require.NoError(t, seriesSet.Err())
require.Equal(t, labelsResult, outputLabel)
require.Len(t, result, inputTotalSize)
})
}
func TestFanoutErrors(t *testing.T) {
workingStorage := teststorage.New(t)
defer workingStorage.Close()
@ -224,9 +333,10 @@ type errChunkQuerier struct{ errQuerier }
func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) {
return errChunkQuerier{}, nil
}
func (errStorage) Appender(context.Context) storage.Appender { return nil }
func (errStorage) StartTime() (int64, error) { return 0, nil }
func (errStorage) Close() error { return nil }
func (errStorage) Appender(context.Context) storage.Appender { return nil }
func (errStorage) AppenderV2(context.Context) storage.AppenderV2 { return nil }
func (errStorage) StartTime() (int64, error) { return 0, nil }
func (errStorage) Close() error { return nil }
func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(errSelect)

View File

@ -61,7 +61,8 @@ type SeriesRef uint64
// Appendable allows creating Appender.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// Appendable will be removed soon (ETA: Q2 2026).
type Appendable interface {
// Appender returns a new appender for the storage.
//
@ -77,10 +78,16 @@ type SampleAndChunkQueryable interface {
}
// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.Appender.
// are goroutine-safe.
type Storage interface {
SampleAndChunkQueryable
// Appendable allows appending to storage.
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// Appendable will be removed soon (ETA: Q2 2026).
Appendable
// AppendableV2 allows appending to storage.
AppendableV2
// StartTime returns the oldest timestamp stored in the storage.
StartTime() (int64, error)
@ -261,7 +268,8 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
// AppendOptions provides options for implementations of the Appender interface.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// AppendOptions will be removed soon (ETA: Q2 2026).
type AppendOptions struct {
// DiscardOutOfOrder tells implementation that this append should not be out
// of order. An OOO append MUST be rejected with storage.ErrOutOfOrderSample
@ -278,7 +286,8 @@ type AppendOptions struct {
// I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram
// type.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// Appender will be removed soon (ETA: Q2 2026).
type Appender interface {
AppenderTransaction
@ -315,7 +324,8 @@ type GetRef interface {
// ExemplarAppender provides an interface for adding samples to exemplar storage, which
// within Prometheus is in-memory only.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// ExemplarAppender will be removed soon (ETA: Q2 2026).
type ExemplarAppender interface {
// AppendExemplar adds an exemplar for the given series labels.
// An optional reference number can be provided to accelerate calls.
@ -333,7 +343,8 @@ type ExemplarAppender interface {
// HistogramAppender provides an interface for appending histograms to the storage.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// HistogramAppender will be removed soon (ETA: Q2 2026).
type HistogramAppender interface {
// AppendHistogram adds a histogram for the given series labels. An
// optional reference number can be provided to accelerate calls. A
@ -365,7 +376,8 @@ type HistogramAppender interface {
// MetadataUpdater provides an interface for associating metadata to stored series.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// MetadataUpdater will be removed soon (ETA: Q2 2026).
type MetadataUpdater interface {
// UpdateMetadata updates a metadata entry for the given series and labels.
// A series reference number is returned which can be used to modify the
@ -379,7 +391,8 @@ type MetadataUpdater interface {
// StartTimestampAppender provides an interface for appending ST to storage.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
// StartTimestampAppender will be removed soon (ETA: Q2 2026).
type StartTimestampAppender interface {
// AppendSTZeroSample adds synthetic zero sample for the given st timestamp,
// which will be associated with given series, labels and the incoming

View File

@ -69,6 +69,7 @@ type AppendV2Options struct {
// Exemplars (optional) attached to the appended sample.
// Exemplar slice MUST be sorted by Exemplar.TS.
// Exemplar slice is unsafe for reuse.
// Duplicate exemplars errors MUST be ignored by implementations.
Exemplars []exemplar.Exemplar
// RejectOutOfOrder tells implementation that this append should not be out

View File

@ -63,6 +63,8 @@ type Storage struct {
localStartTimeCallback startTimeCallback
}
var _ storage.Storage = &Storage{}
// NewStorage returns a remote.Storage.
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage {
if l == nil {
@ -193,6 +195,11 @@ func (s *Storage) Appender(ctx context.Context) storage.Appender {
return s.rws.Appender(ctx)
}
// AppenderV2 implements storage.Storage.
func (s *Storage) AppenderV2(ctx context.Context) storage.AppenderV2 {
return s.rws.AppenderV2(ctx)
}
// LowestSentTimestamp returns the lowest sent timestamp across all queues.
func (s *Storage) LowestSentTimestamp() int64 {
return s.rws.LowestSentTimestamp()

View File

@ -238,8 +238,20 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
// Appender implements storage.Storage.
func (rws *WriteStorage) Appender(context.Context) storage.Appender {
return &timestampTracker{
writeStorage: rws,
highestRecvTimestamp: rws.highestTimestamp,
baseTimestampTracker: baseTimestampTracker{
writeStorage: rws,
highestRecvTimestamp: rws.highestTimestamp,
},
}
}
// AppenderV2 implements storage.Storage.
func (rws *WriteStorage) AppenderV2(context.Context) storage.AppenderV2 {
return &timestampTrackerV2{
baseTimestampTracker: baseTimestampTracker{
writeStorage: rws,
highestRecvTimestamp: rws.highestTimestamp,
},
}
}
@ -282,9 +294,9 @@ func (rws *WriteStorage) Close() error {
return nil
}
type timestampTracker struct {
writeStorage *WriteStorage
appendOptions *storage.AppendOptions
type baseTimestampTracker struct {
writeStorage *WriteStorage
samples int64
exemplars int64
histograms int64
@ -292,6 +304,12 @@ type timestampTracker struct {
highestRecvTimestamp *maxTimestamp
}
type timestampTracker struct {
baseTimestampTracker
appendOptions *storage.AppendOptions
}
func (t *timestampTracker) SetOptions(opts *storage.AppendOptions) {
t.appendOptions = opts
}
@ -345,7 +363,7 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada
}
// Commit implements storage.Appender.
func (t *timestampTracker) Commit() error {
func (t *baseTimestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms)
samplesIn.Add(float64(t.samples))
@ -356,6 +374,24 @@ func (t *timestampTracker) Commit() error {
}
// Rollback implements storage.Appender.
func (*timestampTracker) Rollback() error {
func (*baseTimestampTracker) Rollback() error {
return nil
}
type timestampTrackerV2 struct {
baseTimestampTracker
}
func (t *timestampTrackerV2) Append(ref storage.SeriesRef, _ labels.Labels, _, ts int64, _ float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
switch {
case fh != nil, h != nil:
t.histograms++
default:
t.samples++
}
if ts > t.highestTimestamp {
t.highestTimestamp = ts
}
t.exemplars += int64(len(opts.Exemplars))
return ref, nil
}

View File

@ -323,6 +323,7 @@ func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemp
if err := a.head.exemplars.ValidateExemplar(s.labels(), e); err != nil {
if !errors.Is(err, storage.ErrDuplicateExemplar) && !errors.Is(err, storage.ErrExemplarsDisabled) {
// Except duplicates, return partial errors.
// TODO(bwplotka): Add exemplar info into error.
errs = append(errs, err)
continue
}