From 7ebff91cfd019fd30abbf2ae38a67b33968ffc42 Mon Sep 17 00:00:00 2001 From: Minh Nguyen <148210689+pipiland2612@users.noreply.github.com> Date: Thu, 13 Nov 2025 10:53:12 +0200 Subject: [PATCH] OTLP Receiver: Only update metadata to WAL when metadata-wal-records feature is enabled (#17472) OTLP Receiver: Only update metadata to WAL when metadata-wal-records feature is enabled. --------- Signed-off-by: pipiland2612 --- .../combined_appender.go | 28 +- .../combined_appender_test.go | 290 ++++++++++++------ .../metrics_to_prw_test.go | 2 +- storage/remote/write_handler.go | 6 +- storage/remote/write_test.go | 1 + web/api/v1/api.go | 1 + 6 files changed, 221 insertions(+), 107 deletions(-) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go index 1441aecb6d..9ed114567d 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go @@ -82,11 +82,12 @@ func NewCombinedAppenderMetrics(reg prometheus.Registerer) CombinedAppenderMetri // NewCombinedAppender creates a combined appender that sets start times and // updates metadata for each series only once, and appends samples and // exemplars for each call. -func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestCTZeroSample bool, metrics CombinedAppenderMetrics) CombinedAppender { +func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestCTZeroSample, appendMetadata bool, metrics CombinedAppenderMetrics) CombinedAppender { return &combinedAppender{ app: app, logger: logger, ingestCTZeroSample: ingestCTZeroSample, + appendMetadata: appendMetadata, refs: make(map[uint64]seriesRef), samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata, outOfOrderExemplars: metrics.outOfOrderExemplars, @@ -106,6 +107,7 @@ type combinedAppender struct { samplesAppendedWithoutMetadata prometheus.Counter outOfOrderExemplars prometheus.Counter ingestCTZeroSample bool + appendMetadata bool // Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs. // To detect hash collision it also stores the labels. // There is no overflow/conflict list, the TSDB will handle that part. @@ -189,17 +191,10 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat return err } - if !exists || series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit { - updateRefs = true - // If this is the first time we see this series, set the metadata. - _, err := b.app.UpdateMetadata(ref, ls, meta) - if err != nil { - b.samplesAppendedWithoutMetadata.Add(1) - b.logger.Warn("Error while updating metadata from OTLP", "err", err) - } - } + metadataChanged := exists && (series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit) - if updateRefs { + // Update cache if references changed or metadata changed. + if updateRefs || metadataChanged { b.refs[hash] = seriesRef{ ref: ref, ct: ct, @@ -208,6 +203,17 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat } } + // Update metadata in storage if enabled and needed. + if b.appendMetadata && (!exists || metadataChanged) { + // Only update metadata in WAL if the metadata-wal-records feature is enabled. + // Without this feature, metadata is not persisted to WAL. + _, err := b.app.UpdateMetadata(ref, ls, meta) + if err != nil { + b.samplesAppendedWithoutMetadata.Add(1) + b.logger.Warn("Error while updating metadata from OTLP", "err", err) + } + } + b.appendExemplars(ref, ls, es) return err diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go index a914277f92..7d79637803 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "errors" + "fmt" "math" "testing" "time" @@ -412,13 +413,13 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { reg := prometheus.NewRegistry() cappMetrics := NewCombinedAppenderMetrics(reg) app := db.Appender(ctx) - capp := NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics) + capp := NewCombinedAppender(app, logger, ingestCTZeroSample, false, cappMetrics) tc.appendFunc(t, capp) require.NoError(t, app.Commit()) if tc.extraAppendFunc != nil { app = db.Appender(ctx) - capp = NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics) + capp = NewCombinedAppender(app, logger, ingestCTZeroSample, false, cappMetrics) tc.extraAppendFunc(t, capp) require.NoError(t, app.Commit()) } @@ -501,7 +502,7 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) { t.Run("happy case with CT zero, reference is passed and reused", func(t *testing.T) { app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) @@ -512,109 +513,38 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) { }, })) - require.Len(t, app.records, 6) + require.Len(t, app.records, 5) requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0]) ref := app.records[0].outRef require.NotZero(t, ref) requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) - requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3]) - requireEqualOpAndRef(t, "Append", ref, app.records[4]) - requireEqualOpAndRef(t, "AppendExemplar", ref, app.records[5]) + requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[2]) + requireEqualOpAndRef(t, "Append", ref, app.records[3]) + requireEqualOpAndRef(t, "AppendExemplar", ref, app.records[4]) }) t.Run("error on second CT ingest doesn't update the reference", func(t *testing.T) { app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) app.appendCTZeroSampleError = errors.New("test error") require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 3, 4, 62.0, nil)) - require.Len(t, app.records, 5) + require.Len(t, app.records, 4) requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0]) ref := app.records[0].outRef require.NotZero(t, ref) requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) - requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3]) - require.Zero(t, app.records[3].outRef, "the second AppendCTZeroSample returned 0") - requireEqualOpAndRef(t, "Append", ref, app.records[4]) - }) - - t.Run("updateMetadata called when meta help changes", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - newMetadata := floatMetadata - newMetadata.Help = "some other help" - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil)) - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil)) - - require.Len(t, app.records, 7) - requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0]) - ref := app.records[0].outRef - require.NotZero(t, ref) - requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) - requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3]) - requireEqualOpAndRef(t, "Append", ref, app.records[4]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5]) - requireEqualOpAndRef(t, "Append", ref, app.records[6]) - }) - - t.Run("updateMetadata called when meta unit changes", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - newMetadata := floatMetadata - newMetadata.Unit = "seconds" - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil)) - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil)) - - require.Len(t, app.records, 7) - requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0]) - ref := app.records[0].outRef - require.NotZero(t, ref) - requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) - requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3]) - requireEqualOpAndRef(t, "Append", ref, app.records[4]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5]) - requireEqualOpAndRef(t, "Append", ref, app.records[6]) - }) - - t.Run("updateMetadata called when meta type changes", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - newMetadata := floatMetadata - newMetadata.Type = model.MetricTypeGauge - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil)) - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil)) - - require.Len(t, app.records, 7) - requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0]) - ref := app.records[0].outRef - require.NotZero(t, ref) - requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) - requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3]) - requireEqualOpAndRef(t, "Append", ref, app.records[4]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5]) - requireEqualOpAndRef(t, "Append", ref, app.records[6]) + requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[2]) + require.Zero(t, app.records[2].outRef, "the second AppendCTZeroSample returned 0") + requireEqualOpAndRef(t, "Append", ref, app.records[3]) }) t.Run("metadata, exemplars are not updated if append failed", func(t *testing.T) { app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) app.appendError = errors.New("test error") require.Error(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 0, 1, 42.0, []exemplar.Exemplar{ { @@ -632,7 +562,7 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) { t.Run("metadata, exemplars are updated if append failed but reference is valid", func(t *testing.T) { app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) newMetadata := floatMetadata newMetadata.Help = "some other help" @@ -661,7 +591,7 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) { t.Run("simulate conflict with existing series", func(t *testing.T) { app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) ls := labels.FromStrings( model.MetricNameLabel, "test_bytes_total", @@ -688,23 +618,21 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) { }, })) - require.Len(t, app.records, 7) + require.Len(t, app.records, 5) requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0]) ref := app.records[0].outRef require.NotZero(t, ref) requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) - requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[3]) - newRef := app.records[3].outRef + requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[2]) + newRef := app.records[2].outRef require.NotEqual(t, ref, newRef, "the second AppendCTZeroSample returned a different reference") - requireEqualOpAndRef(t, "Append", newRef, app.records[4]) - requireEqualOpAndRef(t, "UpdateMetadata", newRef, app.records[5]) - requireEqualOpAndRef(t, "AppendExemplar", newRef, app.records[6]) + requireEqualOpAndRef(t, "Append", newRef, app.records[3]) + requireEqualOpAndRef(t, "AppendExemplar", newRef, app.records[4]) }) t.Run("check that invoking AppendHistogram returns an error for nil histogram", func(t *testing.T) { app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) ls := labels.FromStrings( model.MetricNameLabel, "test_bytes_total", @@ -713,6 +641,101 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) { err := capp.AppendHistogram(ls, Metadata{}, 4, 2, nil, nil) require.Error(t, err) }) + + for _, appendMetadata := range []bool{false, true} { + t.Run(fmt.Sprintf("appendMetadata=%t", appendMetadata), func(t *testing.T) { + app := &appenderRecorder{} + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, appendMetadata, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + + require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) + + if appendMetadata { + require.Len(t, app.records, 3) + requireEqualOp(t, "AppendCTZeroSample", app.records[0]) + requireEqualOp(t, "Append", app.records[1]) + requireEqualOp(t, "UpdateMetadata", app.records[2]) + } else { + require.Len(t, app.records, 2) + requireEqualOp(t, "AppendCTZeroSample", app.records[0]) + requireEqualOp(t, "Append", app.records[1]) + } + }) + } +} + +// TestCombinedAppenderMetadataChanges verifies that UpdateMetadata is called +// when metadata fields change (help, unit, or type). +func TestCombinedAppenderMetadataChanges(t *testing.T) { + seriesLabels := labels.FromStrings( + model.MetricNameLabel, "test_metric", + "foo", "bar", + ) + + baseMetadata := Metadata{ + Metadata: metadata.Metadata{ + Type: model.MetricTypeCounter, + Unit: "bytes", + Help: "original help", + }, + MetricFamilyName: "test_metric", + } + + tests := []struct { + name string + modifyMetadata func(Metadata) Metadata + }{ + { + name: "help changes", + modifyMetadata: func(m Metadata) Metadata { + m.Help = "new help text" + return m + }, + }, + { + name: "unit changes", + modifyMetadata: func(m Metadata) Metadata { + m.Unit = "seconds" + return m + }, + }, + { + name: "type changes", + modifyMetadata: func(m Metadata) Metadata { + m.Type = model.MetricTypeGauge + return m + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + app := &appenderRecorder{} + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + + newMetadata := tt.modifyMetadata(baseMetadata) + + require.NoError(t, capp.AppendSample(seriesLabels.Copy(), baseMetadata, 1, 2, 42.0, nil)) + require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil)) + require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil)) + + // Verify expected operations. + require.Len(t, app.records, 7) + requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0]) + ref := app.records[0].outRef + require.NotZero(t, ref) + requireEqualOpAndRef(t, "Append", ref, app.records[1]) + requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) + requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3]) + requireEqualOpAndRef(t, "Append", ref, app.records[4]) + requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5]) + requireEqualOpAndRef(t, "Append", ref, app.records[6]) + }) + } +} + +func requireEqualOp(t *testing.T, expectedOp string, actual appenderRecord) { + t.Helper() + require.Equal(t, expectedOp, actual.op) } func requireEqualOpAndRef(t *testing.T, expectedOp string, expectedRef storage.SeriesRef, actual appenderRecord) { @@ -833,3 +856,82 @@ func (a *appenderRecorder) Rollback() error { func (*appenderRecorder) SetOptions(_ *storage.AppendOptions) { panic("not implemented") } + +func TestMetadataChangedLogic(t *testing.T) { + seriesLabels := labels.FromStrings(model.MetricNameLabel, "test_metric", "foo", "bar") + baseMetadata := Metadata{ + Metadata: metadata.Metadata{Type: model.MetricTypeCounter, Unit: "bytes", Help: "original"}, + MetricFamilyName: "test_metric", + } + + tests := []struct { + name string + appendMetadata bool + modifyMetadata func(Metadata) Metadata + expectWALCall bool + verifyCached func(*testing.T, metadata.Metadata) + }{ + { + name: "appendMetadata=false, no change", + appendMetadata: false, + modifyMetadata: func(m Metadata) Metadata { return m }, + expectWALCall: false, + verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "original", m.Help) }, + }, + { + name: "appendMetadata=false, help changes - cache updated, no WAL", + appendMetadata: false, + modifyMetadata: func(m Metadata) Metadata { m.Help = "changed"; return m }, + expectWALCall: false, + verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "changed", m.Help) }, + }, + { + name: "appendMetadata=true, help changes - cache and WAL updated", + appendMetadata: true, + modifyMetadata: func(m Metadata) Metadata { m.Help = "changed"; return m }, + expectWALCall: true, + verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "changed", m.Help) }, + }, + { + name: "appendMetadata=true, unit changes", + appendMetadata: true, + modifyMetadata: func(m Metadata) Metadata { m.Unit = "seconds"; return m }, + expectWALCall: true, + verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "seconds", m.Unit) }, + }, + { + name: "appendMetadata=true, type changes", + appendMetadata: true, + modifyMetadata: func(m Metadata) Metadata { m.Type = model.MetricTypeGauge; return m }, + expectWALCall: true, + verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, model.MetricTypeGauge, m.Type) }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + app := &appenderRecorder{} + capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, tt.appendMetadata, NewCombinedAppenderMetrics(prometheus.NewRegistry())) + + require.NoError(t, capp.AppendSample(seriesLabels.Copy(), baseMetadata, 1, 2, 42.0, nil)) + + modifiedMetadata := tt.modifyMetadata(baseMetadata) + app.records = nil + require.NoError(t, capp.AppendSample(seriesLabels.Copy(), modifiedMetadata, 1, 3, 43.0, nil)) + + hash := seriesLabels.Hash() + cached, exists := capp.(*combinedAppender).refs[hash] + require.True(t, exists) + tt.verifyCached(t, cached.meta) + + updateMetadataCalled := false + for _, record := range app.records { + if record.op == "UpdateMetadata" { + updateMetadataCalled = true + break + } + } + require.Equal(t, tt.expectWALCall, updateMetadataCalled) + }) + } +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index 341ee797cf..5675249153 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -1067,7 +1067,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { for b.Loop() { app := &noOpAppender{} - mockAppender := NewCombinedAppender(app, noOpLogger, false, appMetrics) + mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics) converter := NewPrometheusConverter(mockAppender) annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings) require.NoError(b, err) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index e8559dd00e..5d1c561802 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -501,6 +501,8 @@ type OTLPOptions struct { // IngestCTZeroSample enables writing zero samples based on the start time // of metrics. IngestCTZeroSample bool + // AppendMetadata enables writing metadata to WAL when metadata-wal-records feature is enabled. + AppendMetadata bool } // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and @@ -519,6 +521,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appenda lookbackDelta: opts.LookbackDelta, ingestCTZeroSample: opts.IngestCTZeroSample, enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels, + appendMetadata: opts.AppendMetadata, // Register metrics. metrics: otlptranslator.NewCombinedAppenderMetrics(reg), } @@ -561,6 +564,7 @@ type rwExporter struct { lookbackDelta time.Duration ingestCTZeroSample bool enableTypeAndUnitLabels bool + appendMetadata bool // Metrics. metrics otlptranslator.CombinedAppenderMetrics @@ -572,7 +576,7 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er Appender: rw.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } - combinedAppender := otlptranslator.NewCombinedAppender(app, rw.logger, rw.ingestCTZeroSample, rw.metrics) + combinedAppender := otlptranslator.NewCombinedAppender(app, rw.logger, rw.ingestCTZeroSample, rw.appendMetadata, rw.metrics) converter := otlptranslator.NewPrometheusConverter(combinedAppender) annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{ AddMetricSuffixes: otlpCfg.TranslationStrategy.ShouldAddSuffixes(), diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 6103a7f262..975caccd6c 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -921,6 +921,7 @@ func TestOTLPWriteHandler(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { otlpOpts := OTLPOptions{ EnableTypeAndUnitLabels: testCase.typeAndUnitLabels, + AppendMetadata: true, } appendable := handleOTLP(t, exportRequest, testCase.otlpCfg, otlpOpts) for _, sample := range testCase.expectedSamples { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index baddedd495..793e5a0075 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -348,6 +348,7 @@ func NewAPI( LookbackDelta: lookbackDelta, IngestCTZeroSample: ctZeroIngestionEnabled, EnableTypeAndUnitLabels: enableTypeAndUnitLabels, + AppendMetadata: appendMetadata, }) }