From 49ea7b05ab88149a790581fa61daee4243975758 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Mon, 12 Jan 2026 08:45:26 +0000 Subject: [PATCH] feat(storage)[PART4b]: add AppenderV2 to the rest of storage.Storage impl Signed-off-by: bwplotka --- cmd/prometheus/main.go | 17 ++++++ storage/fanout.go | 64 ++++++++++++++++++++ storage/fanout_test.go | 116 +++++++++++++++++++++++++++++++++++- storage/interface.go | 29 ++++++--- storage/interface_append.go | 1 + storage/remote/storage.go | 7 +++ storage/remote/write.go | 50 +++++++++++++--- tsdb/head_append_v2.go | 1 + 8 files changed, 267 insertions(+), 18 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ee60e58b2e..8b82049f50 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1746,6 +1746,14 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender { return notReadyAppender{} } +// AppenderV2 implements the Storage interface. +func (s *readyStorage) AppenderV2(ctx context.Context) storage.AppenderV2 { + if x := s.get(); x != nil { + return x.AppenderV2(ctx) + } + return notReadyAppenderV2{} +} + type notReadyAppender struct{} // SetOptions does nothing in this appender implementation. @@ -1779,6 +1787,15 @@ func (notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (notReadyAppender) Rollback() error { return tsdb.ErrNotReady } +type notReadyAppenderV2 struct{} + +func (notReadyAppenderV2) Append(storage.SeriesRef, labels.Labels, int64, int64, float64, *histogram.Histogram, *histogram.FloatHistogram, storage.AOptions) (storage.SeriesRef, error) { + return 0, tsdb.ErrNotReady +} +func (notReadyAppenderV2) Commit() error { return tsdb.ErrNotReady } + +func (notReadyAppenderV2) Rollback() error { return tsdb.ErrNotReady } + // Close implements the Storage interface. func (s *readyStorage) Close() error { if x := s.get(); x != nil { diff --git a/storage/fanout.go b/storage/fanout.go index 246a955b73..33c609029b 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -130,6 +130,19 @@ func (f *fanout) Appender(ctx context.Context) Appender { } } +func (f *fanout) AppenderV2(ctx context.Context) AppenderV2 { + primary := f.primary.AppenderV2(ctx) + secondaries := make([]AppenderV2, 0, len(f.secondaries)) + for _, storage := range f.secondaries { + secondaries = append(secondaries, storage.AppenderV2(ctx)) + } + return &fanoutAppenderV2{ + logger: f.logger, + primary: primary, + secondaries: secondaries, + } +} + // Close closes the storage and all its underlying resources. func (f *fanout) Close() error { errs := tsdb_errors.NewMulti(f.primary.Close()) @@ -270,3 +283,54 @@ func (f *fanoutAppender) Rollback() (err error) { } return nil } + +type fanoutAppenderV2 struct { + logger *slog.Logger + + primary AppenderV2 + secondaries []AppenderV2 +} + +func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) { + ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil { + return 0, err + } + } + return ref, nil +} + +func (f *fanoutAppenderV2) Commit() (err error) { + err = f.primary.Commit() + + for _, appender := range f.secondaries { + if err == nil { + err = appender.Commit() + } else { + if rollbackErr := appender.Rollback(); rollbackErr != nil { + f.logger.Error("Squashed rollback error on commit", "err", rollbackErr) + } + } + } + return err +} + +func (f *fanoutAppenderV2) Rollback() (err error) { + err = f.primary.Rollback() + + for _, appender := range f.secondaries { + rollbackErr := appender.Rollback() + switch { + case err == nil: + err = rollbackErr + case rollbackErr != nil: + f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr) + } + } + return nil +} diff --git a/storage/fanout_test.go b/storage/fanout_test.go index ed4cf17696..fb2f8dd553 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -132,6 +132,115 @@ func TestFanout_SelectSorted(t *testing.T) { }) } +func TestFanout_SelectSorted_AppenderV2(t *testing.T) { + inputLabel := labels.FromStrings(model.MetricNameLabel, "a") + outputLabel := labels.FromStrings(model.MetricNameLabel, "a") + + inputTotalSize := 0 + + priStorage := teststorage.New(t) + defer priStorage.Close() + app1 := priStorage.AppenderV2(t.Context()) + _, err := app1.Append(0, inputLabel, 0, 0, 0, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app1.Append(0, inputLabel, 0, 1000, 1, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app1.Append(0, inputLabel, 0, 2000, 2, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + require.NoError(t, app1.Commit()) + + remoteStorage1 := teststorage.New(t) + defer remoteStorage1.Close() + app2 := remoteStorage1.AppenderV2(t.Context()) + _, err = app2.Append(0, inputLabel, 0, 3000, 3, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app2.Append(0, inputLabel, 0, 4000, 4, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app2.Append(0, inputLabel, 0, 5000, 5, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + require.NoError(t, app2.Commit()) + + remoteStorage2 := teststorage.New(t) + defer remoteStorage2.Close() + + app3 := remoteStorage2.AppenderV2(t.Context()) + _, err = app3.Append(0, inputLabel, 0, 6000, 6, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app3.Append(0, inputLabel, 0, 7000, 7, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app3.Append(0, inputLabel, 0, 8000, 8, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + + require.NoError(t, app3.Commit()) + + fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2) + + t.Run("querier", func(t *testing.T) { + querier, err := fanoutStorage.Querier(0, 8000) + require.NoError(t, err) + defer querier.Close() + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") + require.NoError(t, err) + + seriesSet := querier.Select(t.Context(), true, nil, matcher) + + result := make(map[int64]float64) + var labelsResult labels.Labels + var iterator chunkenc.Iterator + for seriesSet.Next() { + series := seriesSet.At() + seriesLabels := series.Labels() + labelsResult = seriesLabels + iterator := series.Iterator(iterator) + for iterator.Next() == chunkenc.ValFloat { + timestamp, value := iterator.At() + result[timestamp] = value + } + } + + require.Equal(t, labelsResult, outputLabel) + require.Len(t, result, inputTotalSize) + }) + t.Run("chunk querier", func(t *testing.T) { + querier, err := fanoutStorage.ChunkQuerier(0, 8000) + require.NoError(t, err) + defer querier.Close() + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") + require.NoError(t, err) + + seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(t.Context(), true, nil, matcher)) + + result := make(map[int64]float64) + var labelsResult labels.Labels + var iterator chunkenc.Iterator + for seriesSet.Next() { + series := seriesSet.At() + seriesLabels := series.Labels() + labelsResult = seriesLabels + iterator := series.Iterator(iterator) + for iterator.Next() == chunkenc.ValFloat { + timestamp, value := iterator.At() + result[timestamp] = value + } + } + + require.NoError(t, seriesSet.Err()) + require.Equal(t, labelsResult, outputLabel) + require.Len(t, result, inputTotalSize) + }) +} + func TestFanoutErrors(t *testing.T) { workingStorage := teststorage.New(t) defer workingStorage.Close() @@ -224,9 +333,10 @@ type errChunkQuerier struct{ errQuerier } func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) { return errChunkQuerier{}, nil } -func (errStorage) Appender(context.Context) storage.Appender { return nil } -func (errStorage) StartTime() (int64, error) { return 0, nil } -func (errStorage) Close() error { return nil } +func (errStorage) Appender(context.Context) storage.Appender { return nil } +func (errStorage) AppenderV2(context.Context) storage.AppenderV2 { return nil } +func (errStorage) StartTime() (int64, error) { return 0, nil } +func (errStorage) Close() error { return nil } func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return storage.ErrSeriesSet(errSelect) diff --git a/storage/interface.go b/storage/interface.go index 23b8b48a0c..d6ce895d58 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -61,7 +61,8 @@ type SeriesRef uint64 // Appendable allows creating Appender. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type Appendable interface { // Appender returns a new appender for the storage. // @@ -77,10 +78,16 @@ type SampleAndChunkQueryable interface { } // Storage ingests and manages samples, along with various indexes. All methods -// are goroutine-safe. Storage implements storage.Appender. +// are goroutine-safe. type Storage interface { SampleAndChunkQueryable + + // Appendable allows appending to storage. + // WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). + // Appendable will be removed soon (ETA: Q2 2026). Appendable + // AppendableV2 allows appending to storage. + AppendableV2 // StartTime returns the oldest timestamp stored in the storage. StartTime() (int64, error) @@ -261,7 +268,8 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) { // AppendOptions provides options for implementations of the Appender interface. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// AppendOptions will be removed soon (ETA: Q2 2026). type AppendOptions struct { // DiscardOutOfOrder tells implementation that this append should not be out // of order. An OOO append MUST be rejected with storage.ErrOutOfOrderSample @@ -278,7 +286,8 @@ type AppendOptions struct { // I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram // type. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appender will be removed soon (ETA: Q2 2026). type Appender interface { AppenderTransaction @@ -315,7 +324,8 @@ type GetRef interface { // ExemplarAppender provides an interface for adding samples to exemplar storage, which // within Prometheus is in-memory only. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// ExemplarAppender will be removed soon (ETA: Q2 2026). type ExemplarAppender interface { // AppendExemplar adds an exemplar for the given series labels. // An optional reference number can be provided to accelerate calls. @@ -333,7 +343,8 @@ type ExemplarAppender interface { // HistogramAppender provides an interface for appending histograms to the storage. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// HistogramAppender will be removed soon (ETA: Q2 2026). type HistogramAppender interface { // AppendHistogram adds a histogram for the given series labels. An // optional reference number can be provided to accelerate calls. A @@ -365,7 +376,8 @@ type HistogramAppender interface { // MetadataUpdater provides an interface for associating metadata to stored series. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// MetadataUpdater will be removed soon (ETA: Q2 2026). type MetadataUpdater interface { // UpdateMetadata updates a metadata entry for the given series and labels. // A series reference number is returned which can be used to modify the @@ -379,7 +391,8 @@ type MetadataUpdater interface { // StartTimestampAppender provides an interface for appending ST to storage. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// StartTimestampAppender will be removed soon (ETA: Q2 2026). type StartTimestampAppender interface { // AppendSTZeroSample adds synthetic zero sample for the given st timestamp, // which will be associated with given series, labels and the incoming diff --git a/storage/interface_append.go b/storage/interface_append.go index cc7045dbd5..f2dce8e52e 100644 --- a/storage/interface_append.go +++ b/storage/interface_append.go @@ -69,6 +69,7 @@ type AppendV2Options struct { // Exemplars (optional) attached to the appended sample. // Exemplar slice MUST be sorted by Exemplar.TS. // Exemplar slice is unsafe for reuse. + // Duplicate exemplars errors MUST be ignored by implementations. Exemplars []exemplar.Exemplar // RejectOutOfOrder tells implementation that this append should not be out diff --git a/storage/remote/storage.go b/storage/remote/storage.go index f482597249..be75d23383 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -63,6 +63,8 @@ type Storage struct { localStartTimeCallback startTimeCallback } +var _ storage.Storage = &Storage{} + // NewStorage returns a remote.Storage. func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage { if l == nil { @@ -193,6 +195,11 @@ func (s *Storage) Appender(ctx context.Context) storage.Appender { return s.rws.Appender(ctx) } +// AppenderV2 implements storage.Storage. +func (s *Storage) AppenderV2(ctx context.Context) storage.AppenderV2 { + return s.rws.AppenderV2(ctx) +} + // LowestSentTimestamp returns the lowest sent timestamp across all queues. func (s *Storage) LowestSentTimestamp() int64 { return s.rws.LowestSentTimestamp() diff --git a/storage/remote/write.go b/storage/remote/write.go index 92f447d624..91000a1d25 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -238,8 +238,20 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { // Appender implements storage.Storage. func (rws *WriteStorage) Appender(context.Context) storage.Appender { return ×tampTracker{ - writeStorage: rws, - highestRecvTimestamp: rws.highestTimestamp, + baseTimestampTracker: baseTimestampTracker{ + writeStorage: rws, + highestRecvTimestamp: rws.highestTimestamp, + }, + } +} + +// AppenderV2 implements storage.Storage. +func (rws *WriteStorage) AppenderV2(context.Context) storage.AppenderV2 { + return ×tampTrackerV2{ + baseTimestampTracker: baseTimestampTracker{ + writeStorage: rws, + highestRecvTimestamp: rws.highestTimestamp, + }, } } @@ -282,9 +294,9 @@ func (rws *WriteStorage) Close() error { return nil } -type timestampTracker struct { - writeStorage *WriteStorage - appendOptions *storage.AppendOptions +type baseTimestampTracker struct { + writeStorage *WriteStorage + samples int64 exemplars int64 histograms int64 @@ -292,6 +304,12 @@ type timestampTracker struct { highestRecvTimestamp *maxTimestamp } +type timestampTracker struct { + baseTimestampTracker + + appendOptions *storage.AppendOptions +} + func (t *timestampTracker) SetOptions(opts *storage.AppendOptions) { t.appendOptions = opts } @@ -345,7 +363,7 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada } // Commit implements storage.Appender. -func (t *timestampTracker) Commit() error { +func (t *baseTimestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) samplesIn.Add(float64(t.samples)) @@ -356,6 +374,24 @@ func (t *timestampTracker) Commit() error { } // Rollback implements storage.Appender. -func (*timestampTracker) Rollback() error { +func (*baseTimestampTracker) Rollback() error { return nil } + +type timestampTrackerV2 struct { + baseTimestampTracker +} + +func (t *timestampTrackerV2) Append(ref storage.SeriesRef, _ labels.Labels, _, ts int64, _ float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) { + switch { + case fh != nil, h != nil: + t.histograms++ + default: + t.samples++ + } + if ts > t.highestTimestamp { + t.highestTimestamp = ts + } + t.exemplars += int64(len(opts.Exemplars)) + return ref, nil +} diff --git a/tsdb/head_append_v2.go b/tsdb/head_append_v2.go index 241fb42e97..4a62d56741 100644 --- a/tsdb/head_append_v2.go +++ b/tsdb/head_append_v2.go @@ -323,6 +323,7 @@ func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemp if err := a.head.exemplars.ValidateExemplar(s.labels(), e); err != nil { if !errors.Is(err, storage.ErrDuplicateExemplar) && !errors.Is(err, storage.ErrExemplarsDisabled) { // Except duplicates, return partial errors. + // TODO(bwplotka): Add exemplar info into error. errs = append(errs, err) continue }