fix interworking with real TSDB

Appending metadata and exemplars need non zero series reference to work.

Add test with some simple test cases for appending samples and
histograms and exemplars.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2025-08-05 18:56:06 +02:00
parent 7a82d77c4c
commit 358410fbf6
No known key found for this signature in database
GPG Key ID: 47A8F9CE80FD7C7F
6 changed files with 441 additions and 15 deletions

View File

@ -38,6 +38,8 @@ type CombinedAppender interface {
// AppendSample appends a histogram and related exemplars, metadata, and
// created timestamp to the storage.
AppendHistogram(metricFamilyName string, ls labels.Labels, meta metadata.Metadata, t, ct int64, h *histogram.Histogram, es []exemplar.Exemplar) error
// Commit finalizes the ongoing transaction in storage.
Commit() error
}
// NewCombinedAppender creates a combined appender that sets start times and
@ -91,11 +93,6 @@ func (b *combinedAppender) AppendSample(_ string, rawls labels.Labels, meta meta
exists = false
}
if !exists {
ref, err = b.app.UpdateMetadata(0, ls, meta)
if err != nil {
b.samplesAppendedWithoutMetadata.Add(1)
b.logger.Debug("Error while updating metadata from OTLP", "err", err)
}
if ct != 0 && b.ingestCTZeroSample {
ref, err = b.app.AppendCTZeroSample(ref, ls, t, ct)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
@ -116,13 +113,27 @@ func (b *combinedAppender) AppendSample(_ string, rawls labels.Labels, meta meta
b.logger.Error("Error when appending float sample from OTLP", "err", err.Error(), "series", ls.String(), "timestamp", t)
}
}
ref = b.appendExemplars(ref, ls, es)
if ref == 0 {
// We cannot update metadata or add exemplars on non existent series.
return
}
if !exists {
b.refs[hash] = labelsRef{
ref: ref,
ls: ls,
}
// If this is the first time we see this series, set the metadata.
ref, err = b.app.UpdateMetadata(0, ls, meta)
if err != nil {
b.samplesAppendedWithoutMetadata.Add(1)
b.logger.Debug("Error while updating metadata from OTLP", "err", err)
}
}
b.appendExemplars(ref, ls, es)
return
}
@ -136,12 +147,7 @@ func (b *combinedAppender) AppendHistogram(_ string, rawls labels.Labels, meta m
exists = false
}
if !exists {
ref, err = b.app.UpdateMetadata(0, ls, meta)
if err != nil {
b.samplesAppendedWithoutMetadata.Add(1)
b.logger.Debug("Error while updating metadata from OTLP", "err", err)
}
if b.ingestCTZeroSample {
if ct != 0 && b.ingestCTZeroSample {
ref, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
// Even for the first sample OOO is a common scenario because
@ -151,6 +157,7 @@ func (b *combinedAppender) AppendHistogram(_ string, rawls labels.Labels, meta m
}
}
}
ref, err = b.app.AppendHistogram(ref, ls, t, h, nil)
if err != nil {
// Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
@ -161,16 +168,34 @@ func (b *combinedAppender) AppendHistogram(_ string, rawls labels.Labels, meta m
b.logger.Error("Error when appending histogram sample from OTLP", "err", err.Error(), "series", ls.String(), "timestamp", t)
}
}
ref = b.appendExemplars(ref, ls, es)
if ref == 0 {
// We cannot update metadata or add exemplars on non existent series.
return
}
if !exists {
b.refs[hash] = labelsRef{
ref: ref,
ls: ls,
}
// If this is the first time we see this series, set the metadata.
ref, err = b.app.UpdateMetadata(0, ls, meta)
if err != nil {
b.samplesAppendedWithoutMetadata.Add(1)
b.logger.Debug("Error while updating metadata from OTLP", "err", err)
}
}
b.appendExemplars(ref, ls, es)
return
}
func (b *combinedAppender) Commit() error {
return b.app.Commit()
}
func (b *combinedAppender) appendExemplars(ref storage.SeriesRef, ls modelLabels.Labels, es []exemplar.Exemplar) storage.SeriesRef {
var err error
for _, e := range es {

View File

@ -14,18 +14,33 @@
package prometheusremotewrite
import (
"context"
"math"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
modelLabels "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/testutil"
)
type mockCombinedAppender struct {
pendingSamples []combinedSample
pendingHistograms []combinedHistogram
samples []combinedSample
histograms []combinedHistogram
}
@ -51,7 +66,7 @@ type combinedHistogram struct {
}
func (m *mockCombinedAppender) AppendSample(metricFamilyName string, ls labels.Labels, meta metadata.Metadata, t, ct int64, v float64, es []exemplar.Exemplar) error {
m.samples = append(m.samples, combinedSample{
m.pendingSamples = append(m.pendingSamples, combinedSample{
metricFamilyName: metricFamilyName,
ls: ls,
meta: meta,
@ -64,7 +79,7 @@ func (m *mockCombinedAppender) AppendSample(metricFamilyName string, ls labels.L
}
func (m *mockCombinedAppender) AppendHistogram(metricFamilyName string, ls labels.Labels, meta metadata.Metadata, t, ct int64, h *histogram.Histogram, es []exemplar.Exemplar) error {
m.histograms = append(m.histograms, combinedHistogram{
m.pendingHistograms = append(m.pendingHistograms, combinedHistogram{
metricFamilyName: metricFamilyName,
ls: ls,
meta: meta,
@ -76,6 +91,381 @@ func (m *mockCombinedAppender) AppendHistogram(metricFamilyName string, ls label
return nil
}
func (m *mockCombinedAppender) Commit() error {
m.samples = append(m.samples, m.pendingSamples...)
m.pendingSamples = m.pendingSamples[:0]
m.histograms = append(m.histograms, m.pendingHistograms...)
m.pendingHistograms = m.pendingHistograms[:0]
return nil
}
func requireEqual(t testing.TB, expected, actual interface{}, msgAndArgs ...interface{}) {
testutil.RequireEqualWithOptions(t, expected, actual, []cmp.Option{cmp.AllowUnexported(combinedSample{}, combinedHistogram{})}, msgAndArgs...)
}
// TestCombinedAppenderOnTSDB runs some basic tests on a real TSDB to check
// that the combinedAppender works on a real TSDB.
func TestCombinedAppenderOnTSDB(t *testing.T) {
t.Run("ingestCTZeroSample=false", func(t *testing.T) { testCombinedAppenderOnTSDB(t, false) })
t.Run("ingestCTZeroSample=true", func(t *testing.T) { testCombinedAppenderOnTSDB(t, true) })
}
func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
t.Helper()
now := time.Now()
testExemplars := []exemplar.Exemplar{
{
Labels: modelLabels.FromStrings("tracid", "122"),
Value: 1337,
},
{
Labels: modelLabels.FromStrings("tracid", "132"),
Value: 7777,
},
}
expectedExemplars := []exemplar.QueryResult{
{
SeriesLabels: modelLabels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
),
Exemplars: testExemplars,
},
}
testCases := map[string]struct {
appendFunc func(CombinedAppender) error
expectedSamples []sample
expectedExemplars []exemplar.QueryResult
}{
"single float sample, zero CT": {
appendFunc: func(app CombinedAppender) error {
return app.AppendSample("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), 0, 42.0, testExemplars)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
f: 42.0,
},
},
expectedExemplars: expectedExemplars,
},
"single float sample, very old CT": {
appendFunc: func(app CombinedAppender) error {
return app.AppendSample("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), 1, 42.0, nil)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
f: 42.0,
},
},
},
"single float sample, normal CT": {
appendFunc: func(app CombinedAppender) error {
return app.AppendSample("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), now.Add(-2*time.Minute).UnixMilli(), 42.0, nil)
},
expectedSamples: []sample{
{
ctZero: true,
t: now.Add(-2 * time.Minute).UnixMilli(),
},
{
t: now.UnixMilli(),
f: 42.0,
},
},
},
"single float sample, CT name time as sample": {
appendFunc: func(app CombinedAppender) error {
return app.AppendSample("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), now.UnixMilli(), 42.0, nil)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
f: 42.0,
},
},
},
"single float sample, CT in the future of the sample": {
appendFunc: func(app CombinedAppender) error {
return app.AppendSample("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), now.Add(time.Minute).UnixMilli(), 42.0, nil)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
f: 42.0,
},
},
},
"single histogram sample, zero CT": {
appendFunc: func(app CombinedAppender) error {
return app.AppendHistogram("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), 0, tsdbutil.GenerateTestHistogram(42), testExemplars)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
h: tsdbutil.GenerateTestHistogram(42),
},
},
expectedExemplars: expectedExemplars,
},
"single histogram sample, very old CT": {
appendFunc: func(app CombinedAppender) error {
return app.AppendHistogram("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), 1, tsdbutil.GenerateTestHistogram(42), nil)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
h: tsdbutil.GenerateTestHistogram(42),
},
},
},
"single histogram sample, normal CT": {
appendFunc: func(app CombinedAppender) error {
return app.AppendHistogram("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), now.Add(-2*time.Minute).UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)
},
expectedSamples: []sample{
{
ctZero: true,
t: now.Add(-2 * time.Minute).UnixMilli(),
h: &histogram.Histogram{},
},
{
t: now.UnixMilli(),
h: tsdbutil.GenerateTestHistogram(42),
},
},
},
"single histogram sample, CT name time as sample": {
appendFunc: func(app CombinedAppender) error {
return app.AppendHistogram("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
h: tsdbutil.GenerateTestHistogram(42),
},
},
},
"single histogram sample, CT in the future of the sample": {
appendFunc: func(app CombinedAppender) error {
return app.AppendHistogram("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), now.Add(time.Minute).UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
h: tsdbutil.GenerateTestHistogram(42),
},
},
},
"multiple float samples": {
appendFunc: func(app CombinedAppender) error {
err := app.AppendSample("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), 0, 42.0, nil)
if err != nil {
return err
}
return app.AppendSample("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.Add(15*time.Second).UnixMilli(), 0, 62.0, nil)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
f: 42.0,
},
{
t: now.Add(15 * time.Second).UnixMilli(),
f: 62.0,
},
},
},
"multiple histogram samples": {
appendFunc: func(app CombinedAppender) error {
err := app.AppendHistogram("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.UnixMilli(), 0, tsdbutil.GenerateTestHistogram(42), nil)
if err != nil {
return err
}
return app.AppendHistogram("test_bytes_total", labels.FromStrings(
model.MetricNameLabel, "test_bytes_total",
"foo", "bar",
), metadata.Metadata{
Type: model.MetricTypeCounter,
Unit: "bytes",
Help: "some help",
}, now.Add(15*time.Second).UnixMilli(), 0, tsdbutil.GenerateTestHistogram(62), nil)
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
h: tsdbutil.GenerateTestHistogram(42),
},
{
t: now.Add(15 * time.Second).UnixMilli(),
h: tsdbutil.GenerateTestHistogram(62),
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
dir := t.TempDir()
opts := tsdb.DefaultOptions()
opts.EnableExemplarStorage = true
opts.MaxExemplars = 100
opts.EnableNativeHistograms = true
db, err := tsdb.Open(dir, promslog.NewNopLogger(), prometheus.NewRegistry(), opts, nil)
require.NoError(t, err)
t.Cleanup(func() { db.Close() })
ctx := context.Background()
capp := NewCombinedAppender(db.Appender(ctx), promslog.NewNopLogger(), prometheus.NewRegistry(), ingestCTZeroSample)
require.NoError(t, tc.appendFunc(capp))
require.NoError(t, capp.Commit())
q, err := db.Querier(int64(math.MinInt64), int64(math.MaxInt64))
require.NoError(t, err)
ss := q.Select(ctx, false, &storage.SelectHints{
Start: int64(math.MinInt64),
End: int64(math.MaxInt64),
}, modelLabels.MustNewMatcher(modelLabels.MatchEqual, model.MetricNameLabel, "test_bytes_total"))
require.NoError(t, ss.Err())
require.True(t, ss.Next())
series := ss.At()
it := series.Iterator(nil)
for _, sample := range tc.expectedSamples {
if !ingestCTZeroSample && sample.ctZero {
continue
}
if sample.h == nil {
require.Equal(t, chunkenc.ValFloat, it.Next())
ts, v := it.At()
require.Equal(t, sample.t, ts)
require.Equal(t, sample.f, v)
} else {
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h := it.AtHistogram(nil)
require.Equal(t, sample.t, ts)
require.Equal(t, sample.h.Count, h.Count)
}
}
require.False(t, ss.Next())
eq, err := db.ExemplarQuerier(ctx)
require.NoError(t, err)
exResult, err := eq.Select(int64(math.MinInt64), int64(math.MaxInt64), []*modelLabels.Matcher{modelLabels.MustNewMatcher(modelLabels.MatchEqual, model.MetricNameLabel, "test_bytes_total")})
require.NoError(t, err)
if tc.expectedExemplars == nil {
tc.expectedExemplars = []exemplar.QueryResult{}
}
require.Equal(t, tc.expectedExemplars, exResult)
})
}
}
type sample struct {
ctZero bool
t int64
f float64
h *histogram.Histogram
}

View File

@ -520,6 +520,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
tt.scope,
metadata.Metadata{},
)
require.NoError(t, mockAppender.Commit())
requireEqual(t, tt.want(), mockAppender.samples)
require.Empty(t, converter.conflicts)
@ -698,6 +699,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
tt.scope,
metadata.Metadata{},
)
require.NoError(t, mockAppender.Commit())
requireEqual(t, tt.want(), mockAppender.samples)
require.Empty(t, converter.conflicts)

View File

@ -877,6 +877,8 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) {
require.NoError(t, err)
require.Empty(t, annots)
require.NoError(t, mockAppender.Commit())
requireEqual(t, tt.wantSeries(), mockAppender.histograms)
require.Empty(t, converter.conflicts)
})
@ -1351,6 +1353,8 @@ func TestPrometheusConverter_addCustomBucketsHistogramDataPoints(t *testing.T) {
require.NoError(t, err)
require.Empty(t, annots)
require.NoError(t, mockAppender.Commit())
requireEqual(t, tt.wantSeries(), mockAppender.histograms)
require.Empty(t, converter.conflicts)
})

View File

@ -288,6 +288,9 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
}
}
err := c.appender.Commit()
errs = multierr.Append(errs, err)
return
}

View File

@ -127,6 +127,7 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) {
tt.scope,
metadata.Metadata{},
)
require.NoError(t, mockAppender.Commit())
requireEqual(t, tt.want(), mockAppender.samples)
require.Empty(t, converter.conflicts)
@ -368,6 +369,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) {
tt.scope,
metadata.Metadata{},
)
require.NoError(t, mockAppender.Commit())
requireEqual(t, tt.want(), mockAppender.samples)
require.Empty(t, converter.conflicts)