From 2fb680a229ee907e71e332066ed41f84f7714319 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Fri, 29 Aug 2025 07:58:27 +0100 Subject: [PATCH] feat(storage): add new CombinedAppender interface and compatibility layer Signed-off-by: bwplotka --- model/metadata/metadata.go | 4 + storage/compatibility.go | 147 +++++++++++++++++++++++++++++++++++++ storage/interface.go | 40 +++++++++- 3 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 storage/compatibility.go diff --git a/model/metadata/metadata.go b/model/metadata/metadata.go index 1b7e63e0f3..3c95472df4 100644 --- a/model/metadata/metadata.go +++ b/model/metadata/metadata.go @@ -21,3 +21,7 @@ type Metadata struct { Unit string `json:"unit"` Help string `json:"help"` } + +func (m Metadata) IsEmpty() bool { + return (m.Type == "" || m.Type == model.MetricTypeUnknown) && m.Unit == "" && m.Help == "" +} diff --git a/storage/compatibility.go b/storage/compatibility.go new file mode 100644 index 0000000000..f7f639eb73 --- /dev/null +++ b/storage/compatibility.go @@ -0,0 +1,147 @@ +package storage + +import ( + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" +) + +// AsAppender exposes new combined appender implementation as old appender. +// +// Because old appender interface was not combined, you need extra appender pieces +// to handle old behaviour of disconnected exemplars, metadata and sample cts. +func AsAppender(appender CombinedAppender, exemplar ExemplarAppender, metadata MetadataUpdater, ct CreatedTimestampAppender) Appender { + return &asAppender{app: appender, exemplar: exemplar, metadata: metadata, ct: ct} +} + +type asAppender struct { + opts CombinedAppendOptions + app CombinedAppender + + exemplar ExemplarAppender + metadata MetadataUpdater + ct CreatedTimestampAppender +} + +func (a *asAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) { + return a.app.AppendSample(ref, l, metadata.Metadata{}, 0, t, v, nil) +} + +func (a *asAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) { + return a.app.AppendHistogram(ref, l, metadata.Metadata{}, 0, t, h, fh, nil) +} + +func (a *asAppender) Commit() error { + return a.app.Commit() +} + +func (a *asAppender) Rollback() error { + return a.app.Rollback() +} + +func (a *asAppender) SetOptions(opts *AppendOptions) { + if opts == nil { + return + } + + a.opts.DiscardOutOfOrder = opts.DiscardOutOfOrder + a.app.SetOptions(&a.opts) +} + +func (a *asAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, t int64, ct int64) (SeriesRef, error) { + return a.ct.AppendCTZeroSample(ref, l, t, ct) +} + +func (a *asAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t int64, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) { + a.opts.AppendCTAsZero = true + a.app.SetOptions(&a.opts) + return a.app.AppendHistogram(ref, l, metadata.Metadata{}, ct, t, h, fh, nil) +} + +func (a *asAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) { + return a.exemplar.AppendExemplar(ref, l, e) +} + +func (a *asAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) { + return a.metadata.UpdateMetadata(ref, l, m) +} + +// AsCombinedAppender exposes old appender implementation as a new combined appender. +func AsCombinedAppender(appender Appender) CombinedAppender { + return &asCombinedAppender{app: appender} +} + +type asCombinedAppender struct { + app Appender + opts CombinedAppendOptions +} + +func (a *asCombinedAppender) AppendSample(ref SeriesRef, ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (_ SeriesRef, err error) { + if ct != 0 && a.opts.AppendCTAsZero { + ref, err = a.app.AppendCTZeroSample(ref, ls, t, ct) + if err != nil { + return ref, err + } + } + ref, err = a.app.Append(ref, ls, t, v) + if err != nil { + return ref, err + } + + // Check feature flag? + for _, e := range es { + ref, err = a.app.AppendExemplar(ref, ls, e) + if err != nil { + return ref, err + } + } + // Check feature flag? + if !meta.IsEmpty() { + ref, err = a.app.UpdateMetadata(ref, ls, meta) + if err != nil { + return ref, err + } + } + return ref, err +} + +func (a *asCombinedAppender) AppendHistogram(ref SeriesRef, ls labels.Labels, meta metadata.Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (_ SeriesRef, err error) { + if ct != 0 && a.opts.AppendCTAsZero { + ref, err = a.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, fh) + if err != nil { + return ref, err + } + } + ref, err = a.app.AppendHistogram(ref, ls, t, h, fh) + if err != nil { + return ref, err + } + // Check feature flag? + for _, e := range es { + ref, err = a.app.AppendExemplar(ref, ls, e) + if err != nil { + return ref, err + } + } + // Check feature flag? + if !meta.IsEmpty() { + ref, err = a.app.UpdateMetadata(ref, ls, meta) + if err != nil { + return ref, err + } + } + return ref, err +} + +func (a *asCombinedAppender) Commit() error { return a.app.Commit() } + +func (a *asCombinedAppender) Rollback() error { return a.app.Rollback() } + +func (a *asCombinedAppender) SetOptions(opts *CombinedAppendOptions) { + if opts == nil { + return + } + a.opts = *opts + a.app.SetOptions(&AppendOptions{DiscardOutOfOrder: opts.DiscardOutOfOrder}) +} diff --git a/storage/interface.go b/storage/interface.go index 5684460db0..86f5e8072c 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -255,6 +255,42 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) { return f(mint, maxt) } +type CombinedAppendOptions struct { + DiscardOutOfOrder bool + + // AppendCTAsZero ensures that CT is appended as fake zero sample on all append methods. + AppendCTAsZero bool +} + +// CombinedAppender provides batched appends against a storage. +// It must be completed with a call to Commit or Rollback and must not be reused afterwards. +// +// Operations on the Appender interface are not goroutine-safe. +// +// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender. +// The behaviour is undefined if samples of different types are appended to the same series in a single Commit(). +type CombinedAppender interface { + // AppendSample appends a sample and related exemplars, metadata, and created timestamp to the storage. + AppendSample(ref SeriesRef, ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (SeriesRef, error) + + // AppendHistogram appends a histogram and related exemplars, metadata, and created timestamp to the storage. + AppendHistogram(ref SeriesRef, ls labels.Labels, meta metadata.Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (SeriesRef, error) + + // Commit submits the collected samples and purges the batch. If Commit + // returns a non-nil error, it also rolls back all modifications made in + // the appender so far, as Rollback would do. In any case, an Appender + // must not be used anymore after Commit has been called. + Commit() error + + // Rollback rolls back all modifications made in the appender so far. + // Appender has to be discarded after rollback. + Rollback() error + + // SetOptions configures the appender with specific append options such as + // discarding out-of-order samples even if out-of-order is enabled in the TSDB. + SetOptions(opts *CombinedAppendOptions) +} + type AppendOptions struct { DiscardOutOfOrder bool } @@ -266,6 +302,8 @@ type AppendOptions struct { // // The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender. // The behaviour is undefined if samples of different types are appended to the same series in a single Commit(). +// +// DEPRECATED: Use CombinedAppender instead, this interface and TSDB support for it will be removed at one point. type Appender interface { // Append adds a sample pair for the given series. // An optional series reference can be provided to accelerate calls. @@ -300,7 +338,7 @@ type Appender interface { // GetRef is an extra interface on Appenders used by downstream projects // (e.g. Cortex) to avoid maintaining a parallel set of references. type GetRef interface { - // Returns reference number that can be used to pass to Appender.Append(), + // GetRef returns reference number that can be used to pass to Appender.Append(), // and a set of labels that will not cause another copy when passed to Appender.Append(). // 0 means the appender does not have a reference to this series. // hash should be a hash of lset.